/* SPDX-License-Identifier: BSD-3-Clause * Copyright(c) 2010-2014 Intel Corporation */ /* * This sample application is a simple multi-process application which * demostrates sharing of queues and memory pools between processes, and * using those queues/pools for communication between the processes. * * Application is designed to run with two processes, a primary and a * secondary, and each accepts commands on the commandline, the most * important of which is "send", which just sends a string to the other * process. */ #include <stdio.h> #include <string.h> #include <stdint.h> #include <stdarg.h> #include <inttypes.h> #include <stdarg.h> #include <errno.h> #include <unistd.h> #include <termios.h> #include <sys/queue.h> #include <rte_common.h> #include <rte_memory.h> #include <rte_launch.h> #include <rte_eal.h> #include <rte_per_lcore.h> #include <rte_lcore.h> #include <rte_debug.h> #include <rte_atomic.h> #include <rte_branch_prediction.h> #include <rte_ring.h> #include <rte_log.h> #include <rte_mempool.h> #include <cmdline_rdline.h> #include <cmdline_parse.h> #include <cmdline_parse_string.h> #include <cmdline_socket.h> #include <cmdline.h> #include <pthread.h> #include "mp_commands.h" #define RTE_LOGTYPE_APP RTE_LOGTYPE_USER1 #define LOG1 1 // #define buff_size 10*1024*1024 #define buff_size 1024 FILE *fp0; // static const char *_MSG_POOL = "MSG_POOL"; // static const char *_SEC_2_PRI = "SEC_2_PRI"; // static const char *_PRI_2_SEC = "PRI_2_SEC"; typedef struct buffer_10mb { char *buff; size_t len; /* position to write next char to */ size_t max_len; /* == length of buff */ int is_flushing; } buffer_10mb; struct rte_ring *send_ring, *recv_ring; struct rte_mempool *message_pool; volatile int quit = 0; volatile int is_slave_ready = 0; static const char *MBUF_POOL = "MBUF_POOL"; struct buffer_10mb * log_buff; struct buffer_10mb * log_buff0; struct buffer_10mb * log_buff1; struct buffer_10mb * log_buff2; struct buffer_10mb *log_buffers[3]; uint16_t current_buf; // const char *nl_char = '\n'; // const char *rn_char = '\r\n'; // const char *r_char = '\r'; const char *ter_char = '\0'; #define TEMP_STRING_SIZE 1000 char temp_string[TEMP_STRING_SIZE]; char *ts0; #include <stdint.h> #include <inttypes.h> #include <rte_eal.h> #include <rte_ethdev.h> #include <rte_cycles.h> #include <rte_lcore.h> #include <rte_mbuf.h> #define RX_RING_SIZE 1024 #define TX_RING_SIZE 1024 struct rte_mempool *mbuf_pool; #define NUM_MBUFS 8191 #define MBUF_CACHE_SIZE 250 #define BURST_SIZE 32 static const struct rte_eth_conf port_conf_default = { .rxmode = { .max_rx_pkt_len = ETHER_MAX_LEN, }, }; static struct { uint64_t total_cycles; uint64_t total_pkts; } latency_numbers; static uint16_t add_timestamps(uint16_t port __rte_unused, uint16_t qidx __rte_unused, struct rte_mbuf **pkts, uint16_t nb_pkts, uint16_t max_pkts __rte_unused, void *_ __rte_unused) { unsigned i; uint64_t now = rte_rdtsc(); for (i = 0; i < nb_pkts; i++) pkts[i]->udata64 = now; return nb_pkts; } static uint16_t calc_latency(uint16_t port __rte_unused, uint16_t qidx __rte_unused, struct rte_mbuf **pkts, uint16_t nb_pkts, void *_ __rte_unused) { uint64_t cycles = 0; uint64_t now = rte_rdtsc(); unsigned i; for (i = 0; i < nb_pkts; i++) cycles += now - pkts[i]->udata64; latency_numbers.total_cycles += cycles; latency_numbers.total_pkts += nb_pkts; if (latency_numbers.total_pkts > (100 * 1000 * 1000ULL)) { printf("Latency = %"PRIu64" cycles\n", latency_numbers.total_cycles / latency_numbers.total_pkts); latency_numbers.total_cycles = latency_numbers.total_pkts = 0; } return nb_pkts; } /* * Initialises a given port using global settings and with the rx buffers * coming from the mbuf_pool passed as parameter */ static inline int port_init(uint16_t port, struct rte_mempool *mbuf_pool) { struct rte_eth_conf port_conf = port_conf_default; const uint16_t rx_rings = 1, tx_rings = 1; uint16_t nb_rxd = RX_RING_SIZE; uint16_t nb_txd = TX_RING_SIZE; int retval; uint16_t q; struct rte_eth_dev_info dev_info; struct rte_eth_txconf txconf; if (!rte_eth_dev_is_valid_port(port)) return -1; rte_eth_dev_info_get(port, &dev_info); if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf); if (retval != 0) return retval; retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd); if (retval != 0) return retval; for (q = 0; q < rx_rings; q++) { retval = rte_eth_rx_queue_setup(port, q, nb_rxd, rte_eth_dev_socket_id(port), NULL, mbuf_pool); if (retval < 0) return retval; } txconf = dev_info.default_txconf; txconf.offloads = port_conf.txmode.offloads; for (q = 0; q < tx_rings; q++) { retval = rte_eth_tx_queue_setup(port, q, nb_txd, rte_eth_dev_socket_id(port), &txconf); if (retval < 0) return retval; } retval = rte_eth_dev_start(port); if (retval < 0) return retval; struct ether_addr addr; rte_eth_macaddr_get(port, &addr); printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n", (unsigned)port, addr.addr_bytes[0], addr.addr_bytes[1], addr.addr_bytes[2], addr.addr_bytes[3], addr.addr_bytes[4], addr.addr_bytes[5]); rte_eth_promiscuous_enable(port); rte_eth_add_rx_callback(port, 0, add_timestamps, NULL); rte_eth_add_tx_callback(port, 0, calc_latency, NULL); return 0; } void init_log_buffers() { log_buff0 = malloc(sizeof(buffer_10mb)); log_buff0->buff = malloc(buff_size * sizeof(char)); // 10 MB log_buff0->len = 0; log_buff0->max_len = buff_size; log_buff0->is_flushing = 0; log_buff1 = malloc(sizeof(buffer_10mb)); log_buff1->buff = malloc(buff_size * sizeof(char)); // 10 MB log_buff1->len = 0; log_buff1->max_len = buff_size; log_buff1->is_flushing = 0; log_buff2 = malloc(sizeof(buffer_10mb)); log_buff2->buff = malloc(buff_size * sizeof(char)); // 10 MB log_buff2->len = 0; log_buff2->max_len = buff_size; log_buff2->is_flushing = 0; log_buffers[0] = log_buff0; log_buffers[1] = log_buff1; log_buffers[2] = log_buff2; current_buf = 0; } // static int // lcore_recv(__attribute__((unused)) void *arg) // { // unsigned lcore_id = rte_lcore_id(); // printf("Starting core %u\n", lcore_id); // while (!quit){ // void *msg; // if (rte_ring_dequeue(recv_ring, &msg) < 0){ // usleep(5); // continue; // } // printf("core %u: Received '%s'\n", lcore_id, (char *)msg); // rte_mempool_put(message_pool, msg); // } // return 0; // } int log_this_shit(const char *format, ...) { va_list arg; int done; // Prepare log string va_start (arg, format); done = vsnprintf (ts0, TEMP_STRING_SIZE, format, arg); va_end(arg); size_t log_len = strlen(ts0); if (log_len < 1) return -1; while (1) { size_t free_cell = (log_buffers[current_buf]->max_len) - log_buffers[current_buf]->len -2; if (free_cell < log_len) { printf("!!!!!!!! current_buf: %d full\n", current_buf); current_buf = (current_buf +3 +1) % 3; continue; } snprintf(log_buffers[current_buf]->buff + log_buffers[current_buf]->len, log_len +1, "%s\r", ts0); log_buffers[current_buf]->len += log_len; // exclude the \0 character of snprintf return done; } } static int lcore_main_send(__attribute__((unused)) void *arg) { int count = 0; int count2 = 0; printf("%s begin in 3 sec ...\n"); usleep(1000*1000*3); while (!quit && (count2 < 5000)) { //while (1) { //count2--; //if(!is_slave_ready) { // if (count % 1000 != 0) { // printf("count = %d\n", count); // count++; // usleep(10); // // continue; // } //} usleep(1000); struct Message { char data[10]; }; struct ether_hdr *eth_hdr; struct Message obj = {{'H', 'e', 'l', 'l', 'o', '2', '0', '1', '8'}}; struct Message *msg; struct ether_addr s_addr = {{0x14, 0x02, 0xEC, 0x89, 0x8D, 0x24}}; struct ether_addr d_addr = {{0x14, 0x02, 0xEC, 0x89, 0xED, 0x54}}; uint16_t ether_type = 0x0a00; struct rte_mbuf *pkt[BURST_SIZE]; int i; for (i = 0; i < BURST_SIZE; i++) { pkt[i] = rte_pktmbuf_alloc(mbuf_pool); eth_hdr = rte_pktmbuf_mtod(pkt[i], struct ether_hdr *); eth_hdr->d_addr = d_addr; eth_hdr->s_addr = s_addr; eth_hdr->ether_type = ether_type; msg = (struct Message *)(rte_pktmbuf_mtod(pkt[i], char *) + sizeof(struct ether_hdr)) ; *msg = obj; int pkt_size = sizeof(struct Message) + sizeof(struct ether_hdr) ; pkt[i]->data_len = pkt_size; pkt[i]->pkt_len = pkt_size; } uint16_t nb_tx = rte_eth_tx_burst(0, 0, pkt, BURST_SIZE); // printf("nb_tx: %d at interation %d\n", nb_tx, count2); #if LOG1 log_this_shit("log_this_shit: nb_tx: %d at %d\n", nb_tx, count2); #endif for (int a = 0; a < 32; a++) { rte_pktmbuf_free(pkt[a]); } count2++; } return 0; } void *log_buffers_flusher(void *vargp) { uint16_t i = 0; int j = 0; while(1) { if (!quit) { i = (current_buf + 3 -1) % 3; // get index of buffer before current buffer in array if (log_buffers[i]->len != 0) { log_buffers[i]->is_flushing = 1; // TODO: optimize here ... snprintf(log_buffers[i]->buff + log_buffers[i]->len++, 1, "%s", ter_char); fwrite(log_buffers[i]->buff, log_buffers[i]->len -1, sizeof(char), fp0); // restore size and status of buffer log_buffers[i]->len = 0; log_buffers[i]->is_flushing = 0; } continue; } // Experience ends, lets go home. Cleaning all for (int i=0; i<3; i++) { if (log_buffers[i]->len == 0) continue; snprintf(log_buffers[i]->buff + log_buffers[i]->len++, 1, "%s", ter_char); fwrite(log_buffers[i]->buff, log_buffers[i]->len -1, sizeof(char), fp0); } return; // or exit(0) if compiler complains; } } int main(int argc, char **argv) { // const unsigned flags = 0; // const unsigned ring_size = 64; // const unsigned pool_size = 1024; // const unsigned pool_cache = 32; // const unsigned priv_data_sz = 0; int ret; unsigned lcore_id; ret = rte_eal_init(argc, argv); if (ret < 0) rte_exit(EXIT_FAILURE, "Cannot init EAL\n"); /* if (rte_eal_process_type() == RTE_PROC_PRIMARY){ send_ring = rte_ring_create(_PRI_2_SEC, ring_size, rte_socket_id(), flags); recv_ring = rte_ring_create(_SEC_2_PRI, ring_size, rte_socket_id(), flags); message_pool = rte_mempool_create(_MSG_POOL, pool_size, STR_TOKEN_SIZE, pool_cache, priv_data_sz, NULL, NULL, NULL, NULL, rte_socket_id(), flags); } else { recv_ring = rte_ring_lookup(_PRI_2_SEC); send_ring = rte_ring_lookup(_SEC_2_PRI); message_pool = rte_mempool_lookup(_MSG_POOL); } if (send_ring == NULL) rte_exit(EXIT_FAILURE, "Problem getting sending ring\n"); if (recv_ring == NULL) rte_exit(EXIT_FAILURE, "Problem getting receiving ring\n"); if (message_pool == NULL) rte_exit(EXIT_FAILURE, "Problem getting message pool\n"); RTE_LOG(INFO, APP, "Finished Process Init.\n"); */ int nb_ports = 2; mbuf_pool = rte_pktmbuf_pool_create(MBUF_POOL, NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); if (mbuf_pool == NULL) rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); /* init log */ #if LOG1 log_buff = malloc(sizeof(buffer_10mb)); log_buff->buff = malloc(buff_size * sizeof(char)); // 10 MB log_buff->len = 0; log_buff->max_len = buff_size; ts0 = malloc(sizeof(char) * TEMP_STRING_SIZE); init_log_buffers(); fp0 = fopen("o_wr.txt", "w"); fputs("############ BEGIN LOG ############\n", fp0); fclose(fp0); fp0 = fopen("o_wr.txt", "a"); if (fp0 == NULL) rte_exit(EXIT_FAILURE, "Fail open output log fp0\n"); pthread_t tid; pthread_create(&tid, NULL, log_buffers_flusher, (void *)&tid); #endif /* initialize all ports */ uint16_t portid; RTE_ETH_FOREACH_DEV(portid) if (port_init(portid, mbuf_pool) != 0) rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8"\n", portid); /* call lcore_recv() on every slave lcore, which is core 1 in this case */ RTE_LCORE_FOREACH_SLAVE(lcore_id) { rte_eal_remote_launch(lcore_main_send, NULL, lcore_id); } /* call cmd prompt on master lcore */ struct cmdline *cl = cmdline_stdin_new(simple_mp_ctx, "\nsimple_mp > "); if (cl == NULL) rte_exit(EXIT_FAILURE, "Cannot create cmdline instance\n"); cmdline_interact(cl); cmdline_stdin_exit(cl); rte_eal_mp_wait_lcore(); pthread_join(tid, NULL); fclose(fp0); // TODO: free ... return 0; }