81 FatalError(
"Error creating thread %s: you do not have "
82 "support for DPDK enabled, on Linux host please recompile "
100 #define MIN_ZERO_POLL_COUNT 10U
101 #define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
102 #define MINIMUM_SLEEP_TIME_US 1U
103 #define STANDARD_SLEEP_TIME_US 100U
104 #define MAX_EPOLL_TIMEOUT_MS 500U
105 static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
110 typedef struct DPDKThreadVars_ {
129 uint16_t out_port_id;
137 int32_t port_socket_id;
138 struct rte_mbuf *received_mbufs[BURST_SIZE];
150 static void DPDKFreeMbufArray(
struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t
offset);
151 static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
153 uint32_t event_data = (uint32_t)port_id << UINT16_WIDTH | queue_id;
154 int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
155 RTE_INTR_EVENT_ADD, (
void *)((uintptr_t)event_data));
158 SCLogError(
"%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
159 queue_id, rte_strerror(-ret));
165 static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
167 if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
168 return MINIMUM_SLEEP_TIME_US;
170 return STANDARD_SLEEP_TIME_US;
173 static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id,
bool on)
175 rte_spinlock_lock(&(intr_lock[port_id]));
178 rte_eth_dev_rx_intr_enable(port_id, queue_id);
180 rte_eth_dev_rx_intr_disable(port_id, queue_id);
182 rte_spinlock_unlock(&(intr_lock[port_id]));
185 static inline void DPDKFreeMbufArray(
186 struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t
offset)
188 for (
int i =
offset; i < mbuf_cnt; i++) {
189 rte_pktmbuf_free(mbuf_array[i]);
193 static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv,
const char *driver_name)
195 if (strcmp(driver_name,
"net_bonding") == 0)
196 driver_name = BondingDeviceDriverGet(ptv->port_id);
197 if (strcmp(driver_name,
"net_i40e") == 0)
198 i40eDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
199 else if (strcmp(driver_name,
"net_ixgbe") == 0)
200 ixgbeDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
201 else if (strcmp(driver_name,
"net_ice") == 0)
202 iceDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
203 else if (strcmp(driver_name,
"mlx5_pci") == 0)
204 mlx5DeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
207 static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv,
const char *driver_name)
209 if (strcmp(driver_name,
"net_bonding") == 0) {
210 driver_name = BondingDeviceDriverGet(ptv->port_id);
214 #
if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
215 strcmp(driver_name,
"net_i40e") == 0 ||
217 strcmp(driver_name,
"net_ixgbe") == 0 || strcmp(driver_name,
"net_ice") == 0 ||
218 strcmp(driver_name,
"mlx5_pci") == 0) {
220 struct rte_flow_error flush_error = { 0 };
221 int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
223 SCLogError(
"%s: unable to flush rte_flow rules: %s Flush error msg: %s",
224 ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
233 static int GetNumaNode(
void)
238 #if defined(__linux__)
239 cpu = sched_getcpu();
240 node = numa_node_of_cpu(cpu);
242 SCLogWarning(
"NUMA node retrieval is not supported on this OS.");
280 static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
285 if (ptv->queue_id == 0) {
286 struct rte_eth_stats eth_stats;
287 int retval = rte_eth_stats_get(ptv->port_id, ð_stats);
289 SCLogError(
"%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
294 ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
296 eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
298 eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
300 StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
304 ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
310 static void DPDKReleasePacket(
Packet *p)
319 #
if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
320 && !(PacketIsICMPv6(p) && PacketGetICMPv6(p)->
type == 143)
325 rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
332 retval = rte_eth_tx_burst(
333 p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
335 SCLogDebug(
"Unable to transmit the packet on port %u queue %u",
336 p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
337 rte_pktmbuf_free(p->dpdk_v.mbuf);
338 p->dpdk_v.mbuf = NULL;
342 rte_pktmbuf_free(p->dpdk_v.mbuf);
343 p->dpdk_v.mbuf = NULL;
357 rte_eth_stats_reset(ptv->port_id);
358 rte_eth_xstats_reset(ptv->port_id);
360 if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
366 static inline void LoopHandleTimeoutOnIdle(
ThreadVars *
tv)
368 static thread_local uint64_t last_timeout_msec = 0;
371 if (msecs > last_timeout_msec + 100) {
372 TmThreadsCaptureHandleTimeout(
tv, NULL);
373 last_timeout_msec = msecs;
381 static inline bool RXPacketCountHeuristic(
ThreadVars *
tv, DPDKThreadVars *ptv, uint16_t nb_rx)
383 static thread_local uint32_t zero_pkt_polls_cnt = 0;
386 zero_pkt_polls_cnt = 0;
390 LoopHandleTimeoutOnIdle(
tv);
391 if (!ptv->intr_enabled)
394 zero_pkt_polls_cnt++;
395 if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
398 uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
399 if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
400 rte_delay_us(pwd_idle_hint);
402 InterruptsTurnOnOff(ptv->port_id, ptv->queue_id,
true);
403 struct rte_epoll_event event;
404 rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
405 InterruptsTurnOnOff(ptv->port_id, ptv->queue_id,
false);
416 static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv,
struct rte_mbuf *mbuf)
429 p->dpdk_v.mbuf = mbuf;
431 p->dpdk_v.copy_mode = ptv->copy_mode;
432 p->dpdk_v.out_port_id = ptv->out_port_id;
433 p->dpdk_v.out_queue_id = ptv->queue_id;
439 uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
440 if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
441 (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
442 SCLogDebug(
"HW detected GOOD IP and L4 chsum, ignoring validation");
445 if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
451 if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
462 static inline void DPDKSegmentedMbufWarning(
struct rte_mbuf *mbuf)
464 static thread_local
bool segmented_mbufs_warned =
false;
465 if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
466 char warn_s[] =
"Segmented mbufs detected! Redmine Ticket #6012 "
467 "Check your configuration or report the issue";
468 enum rte_proc_type_t eal_t = rte_eal_process_type();
469 if (eal_t == RTE_PROC_SECONDARY) {
471 "try to increase mbuf size in your primary application",
473 }
else if (eal_t == RTE_PROC_PRIMARY) {
475 "try to increase MTU in your suricata.yaml",
479 segmented_mbufs_warned =
true;
483 static void PrintDPDKPortXstats(uint16_t port_id,
const char *port_name)
485 int ret = rte_eth_xstats_get(port_id, NULL, 0);
487 SCLogPerf(
"%s: unable to obtain rte_eth_xstats (%s)", port_name,
488 ret == 0 ?
"not supported" : rte_strerror(-ret));
491 unsigned int len = (
unsigned int)ret;
492 struct rte_eth_xstat_name *xstats_names = NULL;
493 struct rte_eth_xstat *xstats =
SCCalloc(
len,
sizeof(*xstats));
494 if (xstats == NULL) {
495 SCLogWarning(
"Failed to allocate memory for the rte_eth_xstat structure");
499 ret = rte_eth_xstats_get(port_id, xstats,
len);
500 if (ret < 0 || (
unsigned int)ret >
len) {
501 SCLogPerf(
"%s: unable to obtain rte_eth_xstats (%s)", port_name,
502 ret < 0 ? rte_strerror(-ret) :
"table size too small");
505 xstats_names =
SCCalloc(
len,
sizeof(*xstats_names));
506 if (xstats_names == NULL) {
507 SCLogWarning(
"Failed to allocate memory for the rte_eth_xstat_name array");
510 ret = rte_eth_xstats_get_names(port_id, xstats_names,
len);
511 if (ret < 0 || (
unsigned int)ret >
len) {
512 SCLogPerf(
"%s: unable to obtain names of rte_eth_xstats (%s)", port_name,
513 ret < 0 ? rte_strerror(-ret) :
"table size too small");
516 for (
unsigned int i = 0; i <
len; i++) {
517 if (xstats[i].value > 0)
518 SCLogPerf(
"Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].
name,
525 if (xstats_names != NULL)
529 static void HandleShutdown(DPDKThreadVars *ptv)
533 while (
SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
538 DPDKDumpCounters(ptv);
539 if (ptv->queue_id == 0) {
540 PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
547 rte_eth_dev_stop(ptv->out_port_id);
550 rte_eth_dev_stop(ptv->port_id);
555 static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
557 static thread_local
SCTime_t last_dump = { 0 };
560 if (current_time.
secs != last_dump.secs) {
561 DPDKDumpCounters(ptv);
562 last_dump = current_time;
572 DPDKThreadVars *ptv = (DPDKThreadVars *)data;
573 ptv->slot = ((
TmSlot *)slot)->slot_next;
574 TmEcode ret = ReceiveDPDKLoopInit(
tv, ptv);
585 rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
586 if (RXPacketCountHeuristic(
tv, ptv, nb_rx)) {
590 ptv->pkts += (uint64_t)nb_rx;
591 for (uint16_t i = 0; i < nb_rx; i++) {
592 Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
594 rte_pktmbuf_free(ptv->received_mbufs[i]);
597 DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
598 PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
599 rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
600 if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) !=
TM_ECODE_OK) {
602 DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
607 PeriodicDPDKDumpCounters(ptv);
625 int retval, thread_numa;
626 DPDKThreadVars *ptv = NULL;
629 if (initdata == NULL) {
630 SCLogError(
"DPDK configuration is NULL in thread initialization");
634 ptv =
SCCalloc(1,
sizeof(DPDKThreadVars));
652 ptv->copy_mode = dpdk_config->copy_mode;
653 ptv->checksum_mode = dpdk_config->checksum_mode;
655 ptv->threads = dpdk_config->threads;
656 ptv->intr_enabled = (dpdk_config->flags &
DPDK_IRQ_MODE) ?
true :
false;
657 ptv->port_id = dpdk_config->port_id;
658 ptv->out_port_id = dpdk_config->out_port_id;
659 ptv->port_socket_id = dpdk_config->socket_id;
661 thread_numa = GetNumaNode();
662 if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
663 thread_numa != ptv->port_socket_id) {
665 SCLogPerf(
"%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
666 ptv->port_socket_id, thread_numa);
669 ptv->workers_sync = dpdk_config->workers_sync;
671 ptv->queue_id = queue_id;
674 if (queue_id == dpdk_config->threads - 1) {
675 retval = rte_eth_dev_start(ptv->port_id);
677 SCLogError(
"%s: error (%s) during device startup", dpdk_config->iface,
678 rte_strerror(-retval));
682 struct rte_eth_dev_info dev_info;
683 retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
685 SCLogError(
"%s: error (%s) when getting device info", dpdk_config->iface,
686 rte_strerror(-retval));
690 uint32_t timeout = dpdk_config->linkup_timeout * 10;
691 while (timeout > 0) {
692 struct rte_eth_link link = { 0 };
693 retval = rte_eth_link_get_nowait(ptv->port_id, &link);
695 if (retval == -ENOTSUP) {
696 SCLogInfo(
"%s: link status not supported, skipping", dpdk_config->iface);
698 SCLogInfo(
"%s: error (%s) when getting link status, skipping",
699 dpdk_config->iface, rte_strerror(-retval));
703 if (link.link_status) {
704 char link_status_str[RTE_ETH_LINK_MAX_STR_LEN];
705 #if RTE_VERSION >= RTE_VERSION_NUM(20, 11, 0, 0)
706 #pragma GCC diagnostic push
707 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
708 rte_eth_link_to_str(link_status_str,
sizeof(link_status_str), &link);
709 #pragma GCC diagnostic pop
711 snprintf(link_status_str,
sizeof(link_status_str),
712 "Link Up, speed %u Mbps, %s",
714 (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
"full-duplex" :
"half-duplex");
717 SCLogInfo(
"%s: %s", dpdk_config->iface, link_status_str);
725 if (dpdk_config->linkup_timeout && timeout == 0) {
726 SCLogWarning(
"%s: link is down, trying to continue anyway", dpdk_config->iface);
730 DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
732 uint16_t inconsistent_numa_cnt =
SC_ATOMIC_GET(dpdk_config->inconsistent_numa_cnt);
733 if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
734 SCLogWarning(
"%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
735 dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
736 }
else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
738 "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
741 if (ptv->intr_enabled) {
742 rte_spinlock_init(&intr_lock[ptv->port_id]);
747 dpdk_config->DerefFunc(dpdk_config);
751 if (dpdk_config != NULL)
752 dpdk_config->DerefFunc(dpdk_config);
766 DPDKThreadVars *ptv = (DPDKThreadVars *)data;
768 if (ptv->queue_id == 0) {
769 struct rte_eth_dev_info dev_info;
770 int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
772 SCLogError(
"%s: error (%s) when getting device info", ptv->livedev->dev,
773 rte_strerror(-retval));
777 DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
779 if (ptv->workers_sync) {
780 SCFree(ptv->workers_sync);