summaryrefslogtreecommitdiff
path: root/accel-dp/distributor.c
diff options
context:
space:
mode:
Diffstat (limited to 'accel-dp/distributor.c')
-rw-r--r--accel-dp/distributor.c156
1 files changed, 156 insertions, 0 deletions
diff --git a/accel-dp/distributor.c b/accel-dp/distributor.c
new file mode 100644
index 00000000..970f322f
--- /dev/null
+++ b/accel-dp/distributor.c
@@ -0,0 +1,156 @@
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_malloc.h>
+#include <rte_memzone.h>
+#include <rte_launch.h>
+#include <rte_eal.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_ethdev.h>
+#include <rte_log.h>
+#include <rte_errno.h>
+#include <rte_distributor.h>
+#include <rte_debug.h>
+
+#include "init.h"
+#include "kni_dev.h"
+#include "event.h"
+#include "log.h"
+
+#define BURST_SIZE 32
+
+#define MBUF_DROP 255
+
+static struct rte_distributor *d;
+static int term;
+static int port_cnt;
+
+struct xmit_buf {
+ struct rte_mbuf *bufs[BURST_SIZE];
+ int cnt;
+};
+
+int distributor_init(int ded)
+{
+ d = rte_distributor_create("distributor", rte_socket_id(), rte_lcore_count() - ded);
+ return d == NULL;
+}
+
+static void flush_port(int port, struct xmit_buf *xb)
+{
+ int nb;
+
+ if (likely(port < port_cnt))
+ nb = rte_eth_tx_burst(port, 0, xb->bufs, xb->cnt);
+ else
+ nb = kni_dev_tx_burst(port - port_cnt, 0, xb->bufs, xb->cnt);
+
+ if (unlikely(nb < xb->cnt)) {
+ do {
+ rte_pktmbuf_free(xb->bufs[nb]);
+ } while (++nb < xb->cnt);
+ }
+}
+
+static void distributor_tx(struct rte_mbuf **bufs, int nb, struct xmit_buf *xmit_bufs)
+{
+ struct rte_mbuf *mb;
+ int i, p;
+
+ _mm_prefetch(bufs[0], 0);
+ _mm_prefetch(bufs[1], 0);
+ _mm_prefetch(bufs[2], 0);
+ for (i = 0; i < nb; i++) {
+ _mm_prefetch(bufs[i + 3], 0);
+
+ mb = bufs[i];
+
+ p = mb->port;
+
+ if (likely(p != MBUF_DROP)) {
+ struct xmit_buf *xb = &xmit_bufs[p];
+ xb->bufs[xb->cnt++] = mb;
+ if (xb->cnt == BURST_SIZE)
+ flush_port(p, xb);
+ } else
+ rte_pktmbuf_free(mb);
+ }
+}
+
+void distributor_loop(int chk_event)
+{
+ int kni_port_cnt = kni_dev_count();
+ struct rte_mbuf *bufs[BURST_SIZE*2];
+ int port, nb, i;
+ struct xmit_buf *xmit_bufs;
+ int tot_port_cnt;
+ struct xmit_buf *xb;
+
+ port_cnt = rte_eth_dev_count();
+ tot_port_cnt = port_cnt + kni_port_cnt;
+
+ xmit_bufs = rte_malloc(NULL, (tot_port_cnt * sizeof(struct xmit_buf)), 0);
+
+ for (i = 0; i < tot_port_cnt; i++)
+ xmit_bufs[i].cnt = 0;
+
+ while (!term) {
+ for (port = 0; port < port_cnt; port++) {
+ nb = rte_eth_rx_burst(port, 0, bufs, BURST_SIZE);
+
+ if (likely(nb))
+ rte_distributor_process(d, bufs, nb);
+
+ nb = rte_distributor_returned_pkts(d, bufs, BURST_SIZE*2);
+
+ if (likely(nb))
+ distributor_tx(bufs, nb, xmit_bufs);
+ }
+
+ for (port = 0; port < kni_port_cnt; port++) {
+ nb = kni_dev_rx_burst(port, 0, bufs, BURST_SIZE);
+
+ if (likely(nb))
+ rte_distributor_process(d, bufs, nb);
+
+ nb = rte_distributor_returned_pkts(d, bufs, BURST_SIZE*2);
+
+ if (likely(nb))
+ distributor_tx(bufs, nb, xmit_bufs);
+ }
+
+ _mm_prefetch(&xmit_bufs[0], 0);
+ _mm_prefetch(&xmit_bufs[1], 0);
+ _mm_prefetch(&xmit_bufs[2], 0);
+ for (i = 0; i < tot_port_cnt; i++) {
+ _mm_prefetch(&xmit_bufs[i + 3], 0);
+
+ xb = &xmit_bufs[i];
+
+ if (likely(xb->cnt))
+ flush_port(i, xb);
+ }
+
+ if (chk_event)
+ event_process(0);
+ }
+}
+
+int lcore_worker(void *a)
+{
+ struct rte_mbuf *mb = NULL;
+
+ while (!term) {
+ mb = rte_distributor_get_pkt(d, 0, mb);
+ mb->port = MBUF_DROP;
+ }
+
+ return 0;
+}
+
+int lcore_distributor(void *a)
+{
+ distributor_loop(0);
+
+ return 0;
+}