80 FatalError(
"Error creating thread %s: you do not have "
81 "support for DPDK enabled, on Linux host please recompile "
95 static struct timeval machine_start_time = { 0, 0 };
97 #define MIN_ZERO_POLL_COUNT 10U
98 #define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
99 #define MINIMUM_SLEEP_TIME_US 1U
100 #define STANDARD_SLEEP_TIME_US 100U
101 #define MAX_EPOLL_TIMEOUT_MS 500U
102 static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
107 typedef struct DPDKThreadVars_ {
116 uint16_t capture_dpdk_packets;
117 uint16_t capture_dpdk_rx_errs;
118 uint16_t capture_dpdk_imissed;
119 uint16_t capture_dpdk_rx_no_mbufs;
120 uint16_t capture_dpdk_ierrors;
121 uint16_t capture_dpdk_tx_errs;
126 uint16_t out_port_id;
134 int32_t port_socket_id;
135 struct rte_mempool *pkt_mempool;
136 struct rte_mbuf *received_mbufs[BURST_SIZE];
141 static void ReceiveDPDKThreadExitStats(
ThreadVars *,
void *);
149 static uint64_t CyclesToMicroseconds(uint64_t cycles);
150 static uint64_t CyclesToSeconds(uint64_t cycles);
151 static void DPDKFreeMbufArray(
struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t
offset);
152 static uint64_t DPDKGetSeconds(
void);
154 static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
156 uint32_t event_data = port_id << UINT16_WIDTH | queue_id;
157 int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
158 RTE_INTR_EVENT_ADD, (
void *)((uintptr_t)event_data));
161 SCLogError(
"%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
162 queue_id, rte_strerror(-ret));
168 static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
170 if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
171 return MINIMUM_SLEEP_TIME_US;
173 return STANDARD_SLEEP_TIME_US;
176 static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id,
bool on)
178 rte_spinlock_lock(&(intr_lock[port_id]));
181 rte_eth_dev_rx_intr_enable(port_id, queue_id);
183 rte_eth_dev_rx_intr_disable(port_id, queue_id);
185 rte_spinlock_unlock(&(intr_lock[port_id]));
188 static inline void DPDKFreeMbufArray(
189 struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t
offset)
191 for (
int i =
offset; i < mbuf_cnt; i++) {
192 rte_pktmbuf_free(mbuf_array[i]);
196 static uint64_t CyclesToMicroseconds(
const uint64_t cycles)
198 const uint64_t ticks_per_us = rte_get_tsc_hz() / 1000000;
199 if (ticks_per_us == 0) {
202 return cycles / ticks_per_us;
205 static uint64_t CyclesToSeconds(
const uint64_t cycles)
207 const uint64_t ticks_per_s = rte_get_tsc_hz();
208 if (ticks_per_s == 0) {
211 return cycles / ticks_per_s;
214 static void CyclesAddToTimeval(
215 const uint64_t cycles,
struct timeval *orig_tv,
struct timeval *new_tv)
217 uint64_t usec = CyclesToMicroseconds(cycles) + orig_tv->tv_usec;
218 new_tv->tv_sec = orig_tv->tv_sec + usec / 1000000;
219 new_tv->tv_usec = (usec % 1000000);
224 gettimeofday(&machine_start_time, NULL);
225 machine_start_time.tv_sec -= DPDKGetSeconds();
234 static SCTime_t DPDKSetTimevalReal(
struct timeval *machine_start_tv)
236 struct timeval real_tv;
237 CyclesAddToTimeval(rte_get_tsc_cycles(), machine_start_tv, &real_tv);
242 static uint64_t DPDKGetSeconds(
void)
244 return CyclesToSeconds(rte_get_tsc_cycles());
247 static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv,
const char *driver_name)
249 if (strcmp(driver_name,
"net_bonding") == 0) {
250 driver_name = BondingDeviceDriverGet(ptv->port_id);
255 if (strcmp(driver_name,
"net_i40e") == 0)
256 i40eDeviceSetRSS(ptv->port_id, ptv->threads);
259 static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv,
const char *driver_name)
261 if (strcmp(driver_name,
"net_bonding") == 0) {
262 driver_name = BondingDeviceDriverGet(ptv->port_id);
265 if (strcmp(driver_name,
"net_i40e") == 0) {
266 #if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
268 struct rte_flow_error flush_error = { 0 };
269 int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
271 SCLogError(
"%s: unable to flush rte_flow rules: %s Flush error msg: %s",
272 ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
282 static int GetNumaNode(
void)
287 #if defined(__linux__)
288 cpu = sched_getcpu();
289 node = numa_node_of_cpu(cpu);
291 SCLogWarning(
"NUMA node retrieval is not supported on this OS.");
329 static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
334 if (ptv->queue_id == 0) {
335 struct rte_eth_stats eth_stats;
336 int retval = rte_eth_stats_get(ptv->port_id, ð_stats);
338 SCLogError(
"%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
343 ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
345 eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
347 eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
348 StatsSetUI64(ptv->tv, ptv->capture_dpdk_imissed, eth_stats.imissed);
349 StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
350 StatsSetUI64(ptv->tv, ptv->capture_dpdk_ierrors, eth_stats.ierrors);
351 StatsSetUI64(ptv->tv, ptv->capture_dpdk_tx_errs, eth_stats.oerrors);
353 ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
355 StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets, ptv->pkts);
359 static void DPDKReleasePacket(
Packet *p)
368 #
if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
374 rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
381 retval = rte_eth_tx_burst(
382 p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
384 SCLogDebug(
"Unable to transmit the packet on port %u queue %u",
385 p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
386 rte_pktmbuf_free(p->dpdk_v.mbuf);
387 p->dpdk_v.mbuf = NULL;
391 rte_pktmbuf_free(p->dpdk_v.mbuf);
392 p->dpdk_v.mbuf = NULL;
406 rte_eth_stats_reset(ptv->port_id);
407 rte_eth_xstats_reset(ptv->port_id);
409 if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
415 static inline void LoopHandleTimeoutOnIdle(
ThreadVars *
tv)
417 static thread_local uint64_t last_timeout_msec = 0;
418 SCTime_t t = DPDKSetTimevalReal(&machine_start_time);
420 if (msecs > last_timeout_msec + 100) {
421 TmThreadsCaptureHandleTimeout(
tv, NULL);
422 last_timeout_msec = msecs;
430 static inline bool RXPacketCountHeuristic(
ThreadVars *
tv, DPDKThreadVars *ptv, uint16_t nb_rx)
432 static thread_local uint32_t zero_pkt_polls_cnt = 0;
435 zero_pkt_polls_cnt = 0;
439 LoopHandleTimeoutOnIdle(
tv);
440 if (!ptv->intr_enabled)
443 zero_pkt_polls_cnt++;
444 if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
447 uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
448 if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
449 rte_delay_us(pwd_idle_hint);
451 InterruptsTurnOnOff(ptv->port_id, ptv->queue_id,
true);
452 struct rte_epoll_event event;
453 rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
454 InterruptsTurnOnOff(ptv->port_id, ptv->queue_id,
false);
465 static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv,
struct rte_mbuf *mbuf)
477 p->
ts = DPDKSetTimevalReal(&machine_start_time);
478 p->dpdk_v.mbuf = mbuf;
480 p->dpdk_v.copy_mode = ptv->copy_mode;
481 p->dpdk_v.out_port_id = ptv->out_port_id;
482 p->dpdk_v.out_queue_id = ptv->queue_id;
488 uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
489 if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
490 (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
491 SCLogDebug(
"HW detected GOOD IP and L4 chsum, ignoring validation");
494 if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
499 if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
509 static inline void DPDKSegmentedMbufWarning(
struct rte_mbuf *mbuf)
511 static thread_local
bool segmented_mbufs_warned =
false;
512 if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
513 char warn_s[] =
"Segmented mbufs detected! Redmine Ticket #6012 "
514 "Check your configuration or report the issue";
515 enum rte_proc_type_t eal_t = rte_eal_process_type();
516 if (eal_t == RTE_PROC_SECONDARY) {
518 "try to increase mbuf size in your primary application",
520 }
else if (eal_t == RTE_PROC_PRIMARY) {
522 "try to increase MTU in your suricata.yaml",
526 segmented_mbufs_warned =
true;
530 static void HandleShutdown(DPDKThreadVars *ptv)
534 while (
SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
537 if (ptv->queue_id == 0) {
544 rte_eth_dev_stop(ptv->out_port_id);
547 rte_eth_dev_stop(ptv->port_id);
550 DPDKDumpCounters(ptv);
553 static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
555 static thread_local time_t last_dump = 0;
556 time_t current_time = DPDKGetSeconds();
558 if (current_time != last_dump) {
559 DPDKDumpCounters(ptv);
560 last_dump = current_time;
570 DPDKThreadVars *ptv = (DPDKThreadVars *)data;
571 ptv->slot = ((
TmSlot *)slot)->slot_next;
572 TmEcode ret = ReceiveDPDKLoopInit(
tv, ptv);
583 rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
584 if (RXPacketCountHeuristic(
tv, ptv, nb_rx)) {
588 ptv->pkts += (uint64_t)nb_rx;
589 for (uint16_t i = 0; i < nb_rx; i++) {
590 Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
592 rte_pktmbuf_free(ptv->received_mbufs[i]);
595 DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
596 PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
597 rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
598 if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) !=
TM_ECODE_OK) {
600 DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
605 PeriodicDPDKDumpCounters(ptv);
623 int retval, thread_numa;
624 DPDKThreadVars *ptv = NULL;
627 if (initdata == NULL) {
628 SCLogError(
"DPDK configuration is NULL in thread initialization");
632 ptv =
SCCalloc(1,
sizeof(DPDKThreadVars));
650 ptv->copy_mode = dpdk_config->copy_mode;
651 ptv->checksum_mode = dpdk_config->checksum_mode;
653 ptv->threads = dpdk_config->threads;
654 ptv->intr_enabled = (dpdk_config->flags &
DPDK_IRQ_MODE) ?
true :
false;
655 ptv->port_id = dpdk_config->port_id;
656 ptv->out_port_id = dpdk_config->out_port_id;
657 ptv->port_socket_id = dpdk_config->socket_id;
659 ptv->pkt_mempool = dpdk_config->pkt_mempool;
660 dpdk_config->pkt_mempool = NULL;
662 thread_numa = GetNumaNode();
663 if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
664 thread_numa != ptv->port_socket_id) {
666 SCLogPerf(
"%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
667 ptv->port_socket_id, thread_numa);
670 ptv->workers_sync = dpdk_config->workers_sync;
672 ptv->queue_id = queue_id;
675 if (queue_id == dpdk_config->threads - 1) {
676 retval = rte_eth_dev_start(ptv->port_id);
678 SCLogError(
"%s: error (%s) during device startup", dpdk_config->iface,
679 rte_strerror(-retval));
683 struct rte_eth_dev_info dev_info;
684 retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
686 SCLogError(
"%s: error (%s) when getting device info", dpdk_config->iface,
687 rte_strerror(-retval));
692 DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
694 uint16_t inconsistent_numa_cnt =
SC_ATOMIC_GET(dpdk_config->inconsistent_numa_cnt);
695 if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
696 SCLogWarning(
"%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
697 dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
698 }
else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
700 "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
703 if (ptv->intr_enabled) {
704 rte_spinlock_init(&intr_lock[ptv->port_id]);
709 dpdk_config->DerefFunc(dpdk_config);
713 if (dpdk_config != NULL)
714 dpdk_config->DerefFunc(dpdk_config);
720 static void PrintDPDKPortXstats(uint32_t port_id,
const char *port_name)
722 struct rte_eth_xstat *xstats;
723 struct rte_eth_xstat_name *xstats_names;
725 int32_t
len = rte_eth_xstats_get(port_id, NULL, 0);
727 FatalError(
"Error (%s) getting count of rte_eth_xstats failed on port %s",
728 rte_strerror(-
len), port_name);
732 FatalError(
"Failed to allocate memory for the rte_eth_xstat structure");
734 int32_t ret = rte_eth_xstats_get(port_id, xstats,
len);
735 if (ret < 0 || ret >
len) {
737 FatalError(
"Error (%s) getting rte_eth_xstats failed on port %s", rte_strerror(-ret),
740 xstats_names =
SCCalloc(
len,
sizeof(*xstats_names));
741 if (xstats_names == NULL) {
743 FatalError(
"Failed to allocate memory for the rte_eth_xstat_name array");
745 ret = rte_eth_xstats_get_names(port_id, xstats_names,
len);
746 if (ret < 0 || ret >
len) {
749 FatalError(
"Error (%s) getting names of rte_eth_xstats failed on port %s",
750 rte_strerror(-ret), port_name);
752 for (int32_t i = 0; i <
len; i++) {
753 if (xstats[i].value > 0)
754 SCLogPerf(
"Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].name,
767 static void ReceiveDPDKThreadExitStats(
ThreadVars *
tv,
void *data)
771 DPDKThreadVars *ptv = (DPDKThreadVars *)data;
773 if (ptv->queue_id == 0) {
774 struct rte_eth_stats eth_stats;
775 PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
776 retval = rte_eth_stats_get(ptv->port_id, ð_stats);
778 SCLogError(
"%s: failed to get stats (%s)", ptv->livedev->dev, strerror(-retval));
781 SCLogPerf(
"%s: total RX stats: packets %" PRIu64
" bytes: %" PRIu64
" missed: %" PRIu64
782 " errors: %" PRIu64
" nombufs: %" PRIu64,
783 ptv->livedev->dev, eth_stats.ipackets, eth_stats.ibytes, eth_stats.imissed,
784 eth_stats.ierrors, eth_stats.rx_nombuf);
786 SCLogPerf(
"%s: total TX stats: packets %" PRIu64
" bytes: %" PRIu64
" errors: %" PRIu64,
787 ptv->livedev->dev, eth_stats.opackets, eth_stats.obytes, eth_stats.oerrors);
790 DPDKDumpCounters(ptv);
802 DPDKThreadVars *ptv = (DPDKThreadVars *)data;
804 if (ptv->queue_id == 0) {
805 struct rte_eth_dev_info dev_info;
806 int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
808 SCLogError(
"%s: error (%s) when getting device info", ptv->livedev->dev,
809 rte_strerror(-retval));
813 DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
815 if (ptv->workers_sync) {
816 SCFree(ptv->workers_sync);
820 ptv->pkt_mempool = NULL;