/* SPDX-License-Identifier: BSD-3-Clause
 * Copyright(c) 2010-2014 Intel Corporation
 */

/*
 * This sample application is a simple multi-process application which
 * demostrates sharing of queues and memory pools between processes, and
 * using those queues/pools for communication between the processes.
 *
 * Application is designed to run with two processes, a primary and a
 * secondary, and each accepts commands on the commandline, the most
 * important of which is "send", which just sends a string to the other
 * process.
 */

#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <stdarg.h>
#include <inttypes.h>
#include <stdarg.h>
#include <errno.h>
#include <unistd.h>
#include <termios.h>
#include <sys/queue.h>

#include <rte_common.h>
#include <rte_memory.h>
#include <rte_launch.h>
#include <rte_eal.h>
#include <rte_per_lcore.h>
#include <rte_lcore.h>
#include <rte_debug.h>
#include <rte_atomic.h>
#include <rte_branch_prediction.h>
#include <rte_ring.h>
#include <rte_log.h>
#include <rte_mempool.h>
#include <cmdline_rdline.h>
#include <cmdline_parse.h>
#include <cmdline_parse_string.h>
#include <cmdline_socket.h>
#include <cmdline.h>
#include <pthread.h>
#include "mp_commands.h"

#define RTE_LOGTYPE_APP RTE_LOGTYPE_USER1
#define LOG1 1
// #define buff_size 10*1024*1024
#define buff_size 1024
FILE *fp0;

// static const char *_MSG_POOL = "MSG_POOL";
// static const char *_SEC_2_PRI = "SEC_2_PRI";
// static const char *_PRI_2_SEC = "PRI_2_SEC";

typedef struct buffer_10mb {
	char *buff;
	size_t len; /* position to write next char to */
	size_t max_len; /* == length of buff */
	int is_flushing;
} buffer_10mb;

struct rte_ring *send_ring, *recv_ring;
struct rte_mempool *message_pool;
volatile int quit = 0;
volatile int is_slave_ready = 0;
static const char *MBUF_POOL = "MBUF_POOL";
struct buffer_10mb * log_buff;
struct buffer_10mb * log_buff0;
struct buffer_10mb * log_buff1;
struct buffer_10mb * log_buff2;
struct buffer_10mb *log_buffers[3];
uint16_t current_buf;


// const char *nl_char = '\n';
// const char *rn_char = '\r\n';
// const char *r_char = '\r';
const char *ter_char = '\0';
#define TEMP_STRING_SIZE 1000
char temp_string[TEMP_STRING_SIZE];
char *ts0;

#include <stdint.h>
#include <inttypes.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_cycles.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>

#define RX_RING_SIZE 1024
#define TX_RING_SIZE 1024
struct rte_mempool *mbuf_pool;

#define NUM_MBUFS 8191
#define MBUF_CACHE_SIZE 250
#define BURST_SIZE 32

static const struct rte_eth_conf port_conf_default = {
	.rxmode = {
		.max_rx_pkt_len = ETHER_MAX_LEN,
	},
};

static struct {
	uint64_t total_cycles;
	uint64_t total_pkts;
} latency_numbers;


static uint16_t
add_timestamps(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
		struct rte_mbuf **pkts, uint16_t nb_pkts,
		uint16_t max_pkts __rte_unused, void *_ __rte_unused)
{
	unsigned i;
	uint64_t now = rte_rdtsc();

	for (i = 0; i < nb_pkts; i++)
		pkts[i]->udata64 = now;
	return nb_pkts;
}

static uint16_t
calc_latency(uint16_t port __rte_unused, uint16_t qidx __rte_unused,
		struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused)
{
	uint64_t cycles = 0;
	uint64_t now = rte_rdtsc();
	unsigned i;

	for (i = 0; i < nb_pkts; i++)
		cycles += now - pkts[i]->udata64;
	latency_numbers.total_cycles += cycles;
	latency_numbers.total_pkts += nb_pkts;

	if (latency_numbers.total_pkts > (100 * 1000 * 1000ULL)) {
		printf("Latency = %"PRIu64" cycles\n",
		latency_numbers.total_cycles / latency_numbers.total_pkts);
		latency_numbers.total_cycles = latency_numbers.total_pkts = 0;
	}
	return nb_pkts;
}

/*
 * Initialises a given port using global settings and with the rx buffers
 * coming from the mbuf_pool passed as parameter
 */
static inline int
port_init(uint16_t port, struct rte_mempool *mbuf_pool)
{
	struct rte_eth_conf port_conf = port_conf_default;
	const uint16_t rx_rings = 1, tx_rings = 1;
	uint16_t nb_rxd = RX_RING_SIZE;
	uint16_t nb_txd = TX_RING_SIZE;
	int retval;
	uint16_t q;
	struct rte_eth_dev_info dev_info;
	struct rte_eth_txconf txconf;

	if (!rte_eth_dev_is_valid_port(port))
		return -1;

	rte_eth_dev_info_get(port, &dev_info);
	if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
		port_conf.txmode.offloads |=
			DEV_TX_OFFLOAD_MBUF_FAST_FREE;

	retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
	if (retval != 0)
		return retval;

	retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);
	if (retval != 0)
		return retval;

	for (q = 0; q < rx_rings; q++) {
		retval = rte_eth_rx_queue_setup(port, q, nb_rxd,
				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
		if (retval < 0)
			return retval;
	}

	txconf = dev_info.default_txconf;
	txconf.offloads = port_conf.txmode.offloads;
	for (q = 0; q < tx_rings; q++) {
		retval = rte_eth_tx_queue_setup(port, q, nb_txd,
				rte_eth_dev_socket_id(port), &txconf);
		if (retval < 0)
			return retval;
	}

	retval  = rte_eth_dev_start(port);
	if (retval < 0)
		return retval;

	struct ether_addr addr;

	rte_eth_macaddr_get(port, &addr);
	printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
			" %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
			(unsigned)port,
			addr.addr_bytes[0], addr.addr_bytes[1],
			addr.addr_bytes[2], addr.addr_bytes[3],
			addr.addr_bytes[4], addr.addr_bytes[5]);

	rte_eth_promiscuous_enable(port);
	rte_eth_add_rx_callback(port, 0, add_timestamps, NULL);
	rte_eth_add_tx_callback(port, 0, calc_latency, NULL);

	return 0;
}

void
init_log_buffers()
{
	log_buff0 = malloc(sizeof(buffer_10mb));
	log_buff0->buff = malloc(buff_size * sizeof(char)); // 10 MB
	log_buff0->len = 0;
	log_buff0->max_len = buff_size;
	log_buff0->is_flushing = 0;

	log_buff1 = malloc(sizeof(buffer_10mb));
	log_buff1->buff = malloc(buff_size * sizeof(char)); // 10 MB
	log_buff1->len = 0;
	log_buff1->max_len = buff_size;
	log_buff1->is_flushing = 0;

	log_buff2 = malloc(sizeof(buffer_10mb));
	log_buff2->buff = malloc(buff_size * sizeof(char)); // 10 MB
	log_buff2->len = 0;
	log_buff2->max_len = buff_size;
	log_buff2->is_flushing = 0;

	log_buffers[0] = log_buff0;
	log_buffers[1] = log_buff1;
	log_buffers[2] = log_buff2;
	current_buf = 0;
}

// static int
// lcore_recv(__attribute__((unused)) void *arg)
// {
// 	unsigned lcore_id = rte_lcore_id();

// 	printf("Starting core %u\n", lcore_id);
// 	while (!quit){
// 		void *msg;
// 		if (rte_ring_dequeue(recv_ring, &msg) < 0){
// 			usleep(5);
// 			continue;
// 		}
// 		printf("core %u: Received '%s'\n", lcore_id, (char *)msg);
// 		rte_mempool_put(message_pool, msg);
// 	}

// 	return 0;
// }


int
log_this_shit(const char *format, ...)
{	
	va_list arg;
	int done;
	// Prepare log string
	va_start (arg, format);
	done = vsnprintf (ts0, TEMP_STRING_SIZE, format, arg);
	va_end(arg);

	size_t log_len = strlen(ts0);
	if (log_len < 1) return -1;
	while (1) {
		size_t free_cell = (log_buffers[current_buf]->max_len) - log_buffers[current_buf]->len -2;
		if (free_cell < log_len) {
			printf("!!!!!!!!  current_buf: %d full\n", current_buf);
			current_buf = (current_buf +3 +1) % 3;
			continue;
		}

		snprintf(log_buffers[current_buf]->buff + log_buffers[current_buf]->len, log_len +1, "%s\r", ts0);
		log_buffers[current_buf]->len += log_len;	// exclude the \0 character of snprintf
		return done;
	}
}


static int
lcore_main_send(__attribute__((unused)) void *arg)
{
	int count = 0;
	int count2 = 0;
	printf("%s begin in 3 sec ...\n");
	usleep(1000*1000*3);

	while (!quit && (count2 < 5000)) {
	//while (1) {
		//count2--;
		//if(!is_slave_ready) {
			// if (count % 1000 != 0) {
			// 	printf("count = %d\n", count);
			// 	count++;
			// 	usleep(10);
			// 	// continue;
			// }
		//}
		usleep(1000);
		struct Message
		{
			char data[10];
		};
		struct ether_hdr *eth_hdr;
		struct Message obj = {{'H', 'e', 'l', 'l', 'o', '2', '0', '1', '8'}};
		struct Message *msg;
		struct ether_addr s_addr = {{0x14, 0x02, 0xEC, 0x89, 0x8D, 0x24}};
		struct ether_addr d_addr = {{0x14, 0x02, 0xEC, 0x89, 0xED, 0x54}};
		uint16_t ether_type = 0x0a00;

		struct rte_mbuf *pkt[BURST_SIZE];
		int i;
		for (i = 0; i < BURST_SIZE; i++)
		{
			pkt[i] = rte_pktmbuf_alloc(mbuf_pool);
			eth_hdr = rte_pktmbuf_mtod(pkt[i], struct ether_hdr *);
			eth_hdr->d_addr = d_addr;
			eth_hdr->s_addr = s_addr;
			eth_hdr->ether_type = ether_type; 
			msg = (struct Message *)(rte_pktmbuf_mtod(pkt[i], char *)  + sizeof(struct ether_hdr)) ;
			*msg = obj;
			int pkt_size = sizeof(struct Message)  + sizeof(struct ether_hdr) ;
			pkt[i]->data_len = pkt_size;
			pkt[i]->pkt_len = pkt_size;
		}

		uint16_t nb_tx = rte_eth_tx_burst(0, 0, pkt, BURST_SIZE);
		// printf("nb_tx: %d at interation %d\n", nb_tx, count2);
#if LOG1
		log_this_shit("log_this_shit:  nb_tx: %d at %d\n", nb_tx, count2);
#endif
		for (int a = 0; a < 32; a++) {
			rte_pktmbuf_free(pkt[a]);
		}
		count2++;
	}
	return 0;
}

void *log_buffers_flusher(void *vargp)
{
	uint16_t i = 0;
	int j = 0;
	while(1) {
		if (!quit) {
			i = (current_buf + 3 -1) % 3;	// get index of buffer before current buffer in array
			if (log_buffers[i]->len != 0) {
				log_buffers[i]->is_flushing = 1;
				// TODO: optimize here ...
				snprintf(log_buffers[i]->buff + log_buffers[i]->len++, 1, "%s", ter_char);
				fwrite(log_buffers[i]->buff, log_buffers[i]->len -1, sizeof(char), fp0);
				// restore size and status of buffer
				log_buffers[i]->len = 0;
				log_buffers[i]->is_flushing = 0;
			}
			continue;
		}
		// Experience ends, lets go home. Cleaning all
		for (int i=0; i<3; i++) {
			if (log_buffers[i]->len == 0) continue;
			snprintf(log_buffers[i]->buff + log_buffers[i]->len++, 1, "%s", ter_char);
			fwrite(log_buffers[i]->buff, log_buffers[i]->len -1, sizeof(char), fp0);
		}
		return;    // or exit(0) if compiler complains;
	}
}

int
main(int argc, char **argv)
{
	// const unsigned flags = 0;
	// const unsigned ring_size = 64;
	// const unsigned pool_size = 1024;
	// const unsigned pool_cache = 32;
	// const unsigned priv_data_sz = 0;

	int ret;
	unsigned lcore_id;

	ret = rte_eal_init(argc, argv);
	if (ret < 0)
		rte_exit(EXIT_FAILURE, "Cannot init EAL\n");
	/*
	if (rte_eal_process_type() == RTE_PROC_PRIMARY){
		send_ring = rte_ring_create(_PRI_2_SEC, ring_size, rte_socket_id(), flags);
		recv_ring = rte_ring_create(_SEC_2_PRI, ring_size, rte_socket_id(), flags);
		message_pool = rte_mempool_create(_MSG_POOL, pool_size,
				STR_TOKEN_SIZE, pool_cache, priv_data_sz,
				NULL, NULL, NULL, NULL,
				rte_socket_id(), flags);
	} else {
		recv_ring = rte_ring_lookup(_PRI_2_SEC);
		send_ring = rte_ring_lookup(_SEC_2_PRI);
		message_pool = rte_mempool_lookup(_MSG_POOL);
	}
	if (send_ring == NULL)
		rte_exit(EXIT_FAILURE, "Problem getting sending ring\n");
	if (recv_ring == NULL)
		rte_exit(EXIT_FAILURE, "Problem getting receiving ring\n");
	if (message_pool == NULL)
		rte_exit(EXIT_FAILURE, "Problem getting message pool\n");

	RTE_LOG(INFO, APP, "Finished Process Init.\n");
	*/

	int nb_ports = 2;
	mbuf_pool = rte_pktmbuf_pool_create(MBUF_POOL,
		NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0,
		RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
	if (mbuf_pool == NULL)
		rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");

	/* init log */
	#if LOG1
	log_buff = malloc(sizeof(buffer_10mb));
	log_buff->buff = malloc(buff_size * sizeof(char)); // 10 MB
	log_buff->len = 0;
	log_buff->max_len = buff_size;
	ts0 = malloc(sizeof(char) * TEMP_STRING_SIZE);
	init_log_buffers();

	fp0 = fopen("o_wr.txt", "w");
	fputs("############ BEGIN LOG ############\n", fp0);
	fclose(fp0);
	fp0 = fopen("o_wr.txt", "a");
	if (fp0 == NULL) rte_exit(EXIT_FAILURE, "Fail open output log fp0\n");
	pthread_t tid;
	pthread_create(&tid, NULL, log_buffers_flusher, (void *)&tid);
	#endif

	/* initialize all ports */
	uint16_t portid;
	RTE_ETH_FOREACH_DEV(portid)
		if (port_init(portid, mbuf_pool) != 0)
			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8"\n",
					portid);

	/* call lcore_recv() on every slave lcore, which is core 1 in this case */
	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
		rte_eal_remote_launch(lcore_main_send, NULL, lcore_id);
	}
	
	/* call cmd prompt on master lcore */
	struct cmdline *cl = cmdline_stdin_new(simple_mp_ctx, "\nsimple_mp > ");
	if (cl == NULL)
		rte_exit(EXIT_FAILURE, "Cannot create cmdline instance\n");
	cmdline_interact(cl);
	
	cmdline_stdin_exit(cl);
	rte_eal_mp_wait_lcore();

	pthread_join(tid, NULL);
	fclose(fp0);
	// TODO: free ...
	return 0;
}