suricata
source-dpdk.c
Go to the documentation of this file.
1 /* Copyright (C) 2021-2025 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \defgroup dpdk DPDK running mode
20  *
21  * @{
22  */
23 
24 /**
25  * \file
26  *
27  * \author Lukas Sismis <lukas.sismis@gmail.com>
28  *
29  * DPDK capture interface
30  *
31  */
32 
33 #include "suricata-common.h"
34 #include "runmodes.h"
35 #include "decode.h"
36 #include "packet.h"
37 #include "source-dpdk.h"
38 #include "suricata.h"
39 #include "threads.h"
40 #include "threadvars.h"
41 #include "tm-threads.h"
42 #include "tmqh-packetpool.h"
43 #include "util-privs.h"
44 #include "util-device-private.h"
45 #include "action-globals.h"
46 
47 #ifndef HAVE_DPDK
48 
49 TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **);
50 
52 {
53  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
60 }
61 
62 /**
63  * \brief Registration Function for DecodeDPDK.
64  */
66 {
67  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
74 }
75 
76 /**
77  * \brief this function prints an error message and exits.
78  */
79 TmEcode NoDPDKSupportExit(ThreadVars *tv, const void *initdata, void **data)
80 {
81  FatalError("Error creating thread %s: you do not have "
82  "support for DPDK enabled, on Linux host please recompile "
83  "with --enable-dpdk",
84  tv->name);
85 }
86 
87 #else /* We have DPDK support */
88 
89 #include "util-affinity.h"
90 #include "util-dpdk.h"
91 #include "util-dpdk-i40e.h"
92 #include "util-dpdk-ice.h"
93 #include "util-dpdk-ixgbe.h"
94 #include "util-dpdk-mlx5.h"
95 #include "util-dpdk-bonding.h"
96 #include <numa.h>
97 
98 #define BURST_SIZE 32
99 // interrupt mode constants
100 #define MIN_ZERO_POLL_COUNT 10U
101 #define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
102 #define MINIMUM_SLEEP_TIME_US 1U
103 #define STANDARD_SLEEP_TIME_US 100U
104 #define MAX_EPOLL_TIMEOUT_MS 500U
105 static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
106 
107 /**
108  * \brief Structure to hold thread specific variables.
109  */
110 typedef struct DPDKThreadVars_ {
111  /* counters */
112  uint64_t pkts;
113  ThreadVars *tv;
114  TmSlot *slot;
115  LiveDevice *livedev;
116  ChecksumValidationMode checksum_mode;
117  bool intr_enabled;
118  /* references to packet and drop counters */
119  StatsCounterId capture_dpdk_packets;
120  StatsCounterId capture_dpdk_rx_errs;
121  StatsCounterId capture_dpdk_imissed;
122  StatsCounterId capture_dpdk_rx_no_mbufs;
123  StatsCounterId capture_dpdk_ierrors;
124  StatsCounterId capture_dpdk_tx_errs;
125  unsigned int flags;
126  uint16_t threads;
127  /* for IPS */
128  DpdkCopyModeEnum copy_mode;
129  uint16_t out_port_id;
130  /* Entry in the peers_list */
131 
132  uint64_t bytes;
133  uint64_t accepted;
134  uint64_t dropped;
135  uint16_t port_id;
136  uint16_t queue_id;
137  int32_t port_socket_id;
138  struct rte_mbuf *received_mbufs[BURST_SIZE];
139  DPDKWorkerSync *workers_sync;
140 } DPDKThreadVars;
141 
142 static TmEcode ReceiveDPDKThreadInit(ThreadVars *, const void *, void **);
143 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *, void *);
144 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot);
145 
146 static TmEcode DecodeDPDKThreadInit(ThreadVars *, const void *, void **);
147 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data);
148 static TmEcode DecodeDPDK(ThreadVars *, Packet *, void *);
149 
150 static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset);
151 static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
152 {
153  uint32_t event_data = (uint32_t)port_id << UINT16_WIDTH | queue_id;
154  int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
155  RTE_INTR_EVENT_ADD, (void *)((uintptr_t)event_data));
156 
157  if (ret != 0) {
158  SCLogError("%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
159  queue_id, rte_strerror(-ret));
160  return false;
161  }
162  return true;
163 }
164 
165 static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
166 {
167  if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
168  return MINIMUM_SLEEP_TIME_US;
169 
170  return STANDARD_SLEEP_TIME_US;
171 }
172 
173 static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool on)
174 {
175  rte_spinlock_lock(&(intr_lock[port_id]));
176 
177  if (on)
178  rte_eth_dev_rx_intr_enable(port_id, queue_id);
179  else
180  rte_eth_dev_rx_intr_disable(port_id, queue_id);
181 
182  rte_spinlock_unlock(&(intr_lock[port_id]));
183 }
184 
185 static inline void DPDKFreeMbufArray(
186  struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
187 {
188  for (int i = offset; i < mbuf_cnt; i++) {
189  rte_pktmbuf_free(mbuf_array[i]);
190  }
191 }
192 
193 static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
194 {
195  if (strcmp(driver_name, "net_bonding") == 0)
196  driver_name = BondingDeviceDriverGet(ptv->port_id);
197  if (strcmp(driver_name, "net_i40e") == 0)
198  i40eDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
199  else if (strcmp(driver_name, "net_ixgbe") == 0)
200  ixgbeDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
201  else if (strcmp(driver_name, "net_ice") == 0)
202  iceDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
203  else if (strcmp(driver_name, "mlx5_pci") == 0)
204  mlx5DeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
205 }
206 
207 static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
208 {
209  if (strcmp(driver_name, "net_bonding") == 0) {
210  driver_name = BondingDeviceDriverGet(ptv->port_id);
211  }
212 
213  if (
214 #if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
215  strcmp(driver_name, "net_i40e") == 0 ||
216 #endif /* RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0) */
217  strcmp(driver_name, "net_ixgbe") == 0 || strcmp(driver_name, "net_ice") == 0 ||
218  strcmp(driver_name, "mlx5_pci") == 0) {
219  // Flush the RSS rules that have been inserted in the post start section
220  struct rte_flow_error flush_error = { 0 };
221  int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
222  if (retval != 0) {
223  SCLogError("%s: unable to flush rte_flow rules: %s Flush error msg: %s",
224  ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
225  }
226  }
227 }
228 
229 /**
230  * Attempts to retrieve NUMA node id on which the caller runs
231  * @return NUMA id on success, -1 otherwise
232  */
233 static int GetNumaNode(void)
234 {
235  int cpu = 0;
236  int node = -1;
237 
238 #if defined(__linux__)
239  cpu = sched_getcpu();
240  node = numa_node_of_cpu(cpu);
241 #else
242  SCLogWarning("NUMA node retrieval is not supported on this OS.");
243 #endif
244 
245  return node;
246 }
247 
248 /**
249  * \brief Registration Function for ReceiveDPDK.
250  * \todo Unit tests are needed for this module.
251  */
253 {
254  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
255  tmm_modules[TMM_RECEIVEDPDK].ThreadInit = ReceiveDPDKThreadInit;
257  tmm_modules[TMM_RECEIVEDPDK].PktAcqLoop = ReceiveDPDKLoop;
260  tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = ReceiveDPDKThreadDeinit;
263 }
264 
265 /**
266  * \brief Registration Function for DecodeDPDK.
267  * \todo Unit tests are needed for this module.
268  */
270 {
271  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
272  tmm_modules[TMM_DECODEDPDK].ThreadInit = DecodeDPDKThreadInit;
273  tmm_modules[TMM_DECODEDPDK].Func = DecodeDPDK;
275  tmm_modules[TMM_DECODEDPDK].ThreadDeinit = DecodeDPDKThreadDeinit;
278 }
279 
280 static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
281 {
282  /* Some NICs (e.g. Intel) do not support queue statistics and the drops can be fetched only on
283  * the port level. Therefore setting it to the first worker to have at least continuous update
284  * on the dropped packets. */
285  if (ptv->queue_id == 0) {
286  struct rte_eth_stats eth_stats;
287  int retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
288  if (unlikely(retval != 0)) {
289  SCLogError("%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
290  return;
291  }
292 
293  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_packets,
294  ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
295  SC_ATOMIC_SET(ptv->livedev->pkts,
296  eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
297  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_rx_errs,
298  eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
299  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_imissed, eth_stats.imissed);
300  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
301  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_ierrors, eth_stats.ierrors);
302  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_tx_errs, eth_stats.oerrors);
304  ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
305  } else {
306  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_packets, ptv->pkts);
307  }
308 }
309 
310 static void DPDKReleasePacket(Packet *p)
311 {
312  int retval;
313  /* Need to be in copy mode and need to detect early release
314  where Ethernet header could not be set (and pseudo packet)
315  When enabling promiscuous mode on Intel cards, 2 ICMPv6 packets are generated.
316  These get into the infinite cycle between the NIC and the switch in some cases */
317  if ((p->dpdk_v.copy_mode == DPDK_COPY_MODE_TAP ||
318  (p->dpdk_v.copy_mode == DPDK_COPY_MODE_IPS && !PacketCheckAction(p, ACTION_DROP)))
319 #if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
320  && !(PacketIsICMPv6(p) && PacketGetICMPv6(p)->type == 143)
321 #endif
322  ) {
324  retval =
325  rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
326  // rte_eth_tx_burst can return only 0 (failure) or 1 (success) because we are only
327  // transmitting burst of size 1 and the function rte_eth_tx_burst returns number of
328  // successfully sent packets.
329  if (unlikely(retval < 1)) {
330  // sometimes a repeated transmit can help to send out the packet
331  rte_delay_us(DPDK_BURST_TX_WAIT_US);
332  retval = rte_eth_tx_burst(
333  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
334  if (unlikely(retval < 1)) {
335  SCLogDebug("Unable to transmit the packet on port %u queue %u",
336  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
337  rte_pktmbuf_free(p->dpdk_v.mbuf);
338  p->dpdk_v.mbuf = NULL;
339  }
340  }
341  } else {
342  rte_pktmbuf_free(p->dpdk_v.mbuf);
343  p->dpdk_v.mbuf = NULL;
344  }
345 
347 }
348 
349 static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
350 {
351  SCEnter();
352  // Indicate that the thread is actually running its application level
353  // code (i.e., it can poll packets)
355  PacketPoolWait();
356 
357  rte_eth_stats_reset(ptv->port_id);
358  rte_eth_xstats_reset(ptv->port_id);
359 
360  if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
362 
364 }
365 
366 static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
367 {
368  static thread_local uint64_t last_timeout_msec = 0;
369  SCTime_t t = TimeGet();
370  uint64_t msecs = SCTIME_MSECS(t);
371  if (msecs > last_timeout_msec + 100) {
372  TmThreadsCaptureHandleTimeout(tv, NULL);
373  last_timeout_msec = msecs;
374  }
375 }
376 
377 /**
378  * \brief Decides if it should retry the packet poll or continue with the packet processing
379  * \return true if the poll should be retried, false otherwise
380  */
381 static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
382 {
383  static thread_local uint32_t zero_pkt_polls_cnt = 0;
384 
385  if (nb_rx > 0) {
386  zero_pkt_polls_cnt = 0;
387  return false;
388  }
389 
390  LoopHandleTimeoutOnIdle(tv);
391  if (!ptv->intr_enabled)
392  return true;
393 
394  zero_pkt_polls_cnt++;
395  if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
396  return true;
397 
398  uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
399  if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
400  rte_delay_us(pwd_idle_hint);
401  } else {
402  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
403  struct rte_epoll_event event;
404  rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
405  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
406  return true;
407  }
408 
409  return false;
410 }
411 
412 /**
413  * \brief Initializes a packet from an mbuf
414  * \return true if the packet was initialized successfully, false otherwise
415  */
416 static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
417 {
419  if (unlikely(p == NULL)) {
420  return NULL;
421  }
424  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
426  }
427 
428  p->ts = TimeGet();
429  p->dpdk_v.mbuf = mbuf;
430  p->ReleasePacket = DPDKReleasePacket;
431  p->dpdk_v.copy_mode = ptv->copy_mode;
432  p->dpdk_v.out_port_id = ptv->out_port_id;
433  p->dpdk_v.out_queue_id = ptv->queue_id;
434  p->livedev = ptv->livedev;
435 
436  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
438  } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
439  uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
440  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
441  (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
442  SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
444  } else {
445  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
446  SCLogDebug("HW detected BAD IP checksum");
447  // chsum recalc will not be triggered but rule keyword check will be
448  p->l3.csum_set = true;
449  p->l3.csum = 0;
450  }
451  if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
452  SCLogDebug("HW detected BAD L4 chsum");
453  p->l4.csum_set = true;
454  p->l4.csum = 0;
455  }
456  }
457  }
458 
459  return p;
460 }
461 
462 static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
463 {
464  static thread_local bool segmented_mbufs_warned = false;
465  if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
466  char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
467  "Check your configuration or report the issue";
468  enum rte_proc_type_t eal_t = rte_eal_process_type();
469  if (eal_t == RTE_PROC_SECONDARY) {
470  SCLogWarning("%s. To avoid segmented mbufs, "
471  "try to increase mbuf size in your primary application",
472  warn_s);
473  } else if (eal_t == RTE_PROC_PRIMARY) {
474  SCLogWarning("%s. To avoid segmented mbufs, "
475  "try to increase MTU in your suricata.yaml",
476  warn_s);
477  }
478 
479  segmented_mbufs_warned = true;
480  }
481 }
482 
483 static void PrintDPDKPortXstats(uint16_t port_id, const char *port_name)
484 {
485  int ret = rte_eth_xstats_get(port_id, NULL, 0);
486  if (ret <= 0) {
487  SCLogPerf("%s: unable to obtain rte_eth_xstats (%s)", port_name,
488  ret == 0 ? "not supported" : rte_strerror(-ret));
489  return;
490  }
491  unsigned int len = (unsigned int)ret;
492  struct rte_eth_xstat_name *xstats_names = NULL;
493  struct rte_eth_xstat *xstats = SCCalloc(len, sizeof(*xstats));
494  if (xstats == NULL) {
495  SCLogWarning("Failed to allocate memory for the rte_eth_xstat structure");
496  return;
497  }
498 
499  ret = rte_eth_xstats_get(port_id, xstats, len);
500  if (ret < 0 || (unsigned int)ret > len) {
501  SCLogPerf("%s: unable to obtain rte_eth_xstats (%s)", port_name,
502  ret < 0 ? rte_strerror(-ret) : "table size too small");
503  goto cleanup;
504  }
505  xstats_names = SCCalloc(len, sizeof(*xstats_names));
506  if (xstats_names == NULL) {
507  SCLogWarning("Failed to allocate memory for the rte_eth_xstat_name array");
508  goto cleanup;
509  }
510  ret = rte_eth_xstats_get_names(port_id, xstats_names, len);
511  if (ret < 0 || (unsigned int)ret > len) {
512  SCLogPerf("%s: unable to obtain names of rte_eth_xstats (%s)", port_name,
513  ret < 0 ? rte_strerror(-ret) : "table size too small");
514  goto cleanup;
515  }
516  for (unsigned int i = 0; i < len; i++) {
517  if (xstats[i].value > 0)
518  SCLogPerf("Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].name,
519  xstats[i].value);
520  }
521 
522 cleanup:
523  if (xstats != NULL)
524  SCFree(xstats);
525  if (xstats_names != NULL)
526  SCFree(xstats_names);
527 }
528 
529 static void HandleShutdown(DPDKThreadVars *ptv)
530 {
531  SCLogDebug("Stopping Suricata!");
532  SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
533  while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
534  rte_delay_us(10);
535  }
536  // Dump counters while device is still running - some drivers (e.g. BNXT) fail
537  // to report stats after the device is stopped
538  DPDKDumpCounters(ptv);
539  if (ptv->queue_id == 0) {
540  PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
541  rte_delay_us(20); // wait for all threads to get out of the sync loop
542  SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
543  // If Suricata runs in peered mode, the peer threads might still want to send
544  // packets to our port. Instead, we know, that we are done with the peered port, so
545  // we stop it. The peered threads will stop our port.
546  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
547  rte_eth_dev_stop(ptv->out_port_id);
548  } else {
549  // in IDS we stop our port - no peer threads are running
550  rte_eth_dev_stop(ptv->port_id);
551  }
552  }
553 }
554 
555 static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
556 {
557  static thread_local SCTime_t last_dump = { 0 };
558  SCTime_t current_time = TimeGet();
559  /* Trigger one dump of stats every second */
560  if (current_time.secs != last_dump.secs) {
561  DPDKDumpCounters(ptv);
562  last_dump = current_time;
563  }
564 }
565 
566 /**
567  * \brief Main DPDK reading Loop function
568  */
569 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
570 {
571  SCEnter();
572  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
573  ptv->slot = ((TmSlot *)slot)->slot_next;
574  uint32_t mp_sz = ptv->livedev->dpdk_vars->pkt_mp[ptv->queue_id]->size;
575  uint16_t burst_size = (uint16_t)MIN(BURST_SIZE, mp_sz);
576 
577  TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
578  if (ret != TM_ECODE_OK) {
579  SCReturnInt(ret);
580  }
581  while (true) {
582  if (unlikely(suricata_ctl_flags != 0)) {
583  HandleShutdown(ptv);
584  break;
585  }
586 
587  uint16_t nb_rx =
588  rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, burst_size);
589  if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
590  continue;
591  }
592 
593  ptv->pkts += (uint64_t)nb_rx;
594  for (uint16_t i = 0; i < nb_rx; i++) {
595  Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
596  if (p == NULL) {
597  rte_pktmbuf_free(ptv->received_mbufs[i]);
598  continue;
599  }
600  DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
601  PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
602  rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
603  if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
604  TmqhOutputPacketpool(ptv->tv, p);
605  DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
606  SCReturnInt(EXIT_FAILURE);
607  }
608  }
609 
610  PeriodicDPDKDumpCounters(ptv);
612  }
613 
615 }
616 
617 /**
618  * \brief Init function for ReceiveDPDK.
619  *
620  * \param tv pointer to ThreadVars
621  * \param initdata pointer to the interface passed from the user
622  * \param data pointer gets populated with DPDKThreadVars
623  *
624  */
625 static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
626 {
627  SCEnter();
628  int retval, thread_numa;
629  DPDKThreadVars *ptv = NULL;
630  DPDKIfaceConfig *dpdk_config = (DPDKIfaceConfig *)initdata;
631 
632  if (initdata == NULL) {
633  SCLogError("DPDK configuration is NULL in thread initialization");
634  goto fail;
635  }
636 
637  ptv = SCCalloc(1, sizeof(DPDKThreadVars));
638  if (unlikely(ptv == NULL)) {
639  SCLogError("Unable to allocate memory");
640  goto fail;
641  }
642 
643  ptv->tv = tv;
644  ptv->pkts = 0;
645  ptv->bytes = 0;
646  ptv->livedev = LiveGetDevice(dpdk_config->iface);
647 
648  ptv->capture_dpdk_packets = StatsRegisterCounter("capture.packets", &ptv->tv->stats);
649  ptv->capture_dpdk_rx_errs = StatsRegisterCounter("capture.rx_errors", &ptv->tv->stats);
650  ptv->capture_dpdk_tx_errs = StatsRegisterCounter("capture.tx_errors", &ptv->tv->stats);
651  ptv->capture_dpdk_imissed = StatsRegisterCounter("capture.dpdk.imissed", &ptv->tv->stats);
652  ptv->capture_dpdk_rx_no_mbufs = StatsRegisterCounter("capture.dpdk.no_mbufs", &ptv->tv->stats);
653  ptv->capture_dpdk_ierrors = StatsRegisterCounter("capture.dpdk.ierrors", &ptv->tv->stats);
654 
655  ptv->copy_mode = dpdk_config->copy_mode;
656  ptv->checksum_mode = dpdk_config->checksum_mode;
657 
658  ptv->threads = dpdk_config->threads;
659  ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) != 0;
660  ptv->port_id = dpdk_config->port_id;
661  ptv->out_port_id = dpdk_config->out_port_id;
662  ptv->port_socket_id = dpdk_config->socket_id;
663 
664  thread_numa = GetNumaNode();
665  if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
666  thread_numa != ptv->port_socket_id) {
667  SC_ATOMIC_ADD(dpdk_config->inconsistent_numa_cnt, 1);
668  SCLogPerf("%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
669  ptv->port_socket_id, thread_numa);
670  }
671 
672  ptv->workers_sync = dpdk_config->workers_sync;
673  uint16_t queue_id = SC_ATOMIC_ADD(dpdk_config->queue_id, 1);
674  ptv->queue_id = queue_id;
675 
676  // the last thread starts the device
677  if (queue_id == dpdk_config->threads - 1) {
678  retval = rte_eth_dev_start(ptv->port_id);
679  if (retval < 0) {
680  SCLogError("%s: error (%s) during device startup", dpdk_config->iface,
681  rte_strerror(-retval));
682  goto fail;
683  }
684 
685  struct rte_eth_dev_info dev_info;
686  retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
687  if (retval != 0) {
688  SCLogError("%s: error (%s) when getting device info", dpdk_config->iface,
689  rte_strerror(-retval));
690  goto fail;
691  }
692 
693  uint32_t timeout = dpdk_config->linkup_timeout * 10;
694  while (timeout > 0) {
695  struct rte_eth_link link = { 0 };
696  retval = rte_eth_link_get_nowait(ptv->port_id, &link);
697  if (retval != 0) {
698  if (retval == -ENOTSUP) {
699  SCLogInfo("%s: link status not supported, skipping", dpdk_config->iface);
700  } else {
701  SCLogInfo("%s: error (%s) when getting link status, skipping",
702  dpdk_config->iface, rte_strerror(-retval));
703  }
704  break;
705  }
706  if (link.link_status) {
707  char link_status_str[RTE_ETH_LINK_MAX_STR_LEN];
708 #if RTE_VERSION >= RTE_VERSION_NUM(20, 11, 0, 0)
709 #pragma GCC diagnostic push
710 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
711  rte_eth_link_to_str(link_status_str, sizeof(link_status_str), &link);
712 #pragma GCC diagnostic pop
713 #else
714  snprintf(link_status_str, sizeof(link_status_str),
715  "Link Up, speed %u Mbps, %s", // 22 chars + 10 for digits + 11 for duplex
716  link.link_speed,
717  (link.link_duplex == ETH_LINK_FULL_DUPLEX) ? "full-duplex" : "half-duplex");
718 #endif
719 
720  SCLogInfo("%s: %s", dpdk_config->iface, link_status_str);
721  break;
722  }
723 
724  rte_delay_ms(100);
725  timeout--;
726  }
727 
728  if (dpdk_config->linkup_timeout && timeout == 0) {
729  SCLogWarning("%s: link is down, trying to continue anyway", dpdk_config->iface);
730  }
731 
732  // some PMDs requires additional actions only after the device has started
733  DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
734 
735  uint16_t inconsistent_numa_cnt = SC_ATOMIC_GET(dpdk_config->inconsistent_numa_cnt);
736  if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
737  SCLogWarning("%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
738  dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
739  } else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
740  SCLogNotice(
741  "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
742  dpdk_config->iface);
743  }
744  if (ptv->intr_enabled) {
745  rte_spinlock_init(&intr_lock[ptv->port_id]);
746  }
747  }
748 
749  *data = (void *)ptv;
750  dpdk_config->DerefFunc(dpdk_config);
752 
753 fail:
754  if (dpdk_config != NULL)
755  dpdk_config->DerefFunc(dpdk_config);
756  if (ptv != NULL)
757  SCFree(ptv);
759 }
760 
761 /**
762  * \brief DeInit function closes dpdk at exit.
763  * \param tv pointer to ThreadVars
764  * \param data pointer that gets cast into DPDKThreadVars for ptv
765  */
766 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *tv, void *data)
767 {
768  SCEnter();
769  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
770 
771  if (ptv->queue_id == 0) {
772  struct rte_eth_dev_info dev_info;
773  int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
774  if (retval != 0) {
775  SCLogError("%s: error (%s) when getting device info", ptv->livedev->dev,
776  rte_strerror(-retval));
778  }
779 
780  DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
781 
782  if (ptv->workers_sync) {
783  SCFree(ptv->workers_sync);
784  }
785  }
786 
787  SCFree(ptv);
789 }
790 
791 /**
792  * \brief This function passes off to link type decoders.
793  *
794  * DecodeDPDK decodes packets from DPDK and passes
795  * them off to the proper link type decoder.
796  *
797  * \param t pointer to ThreadVars
798  * \param p pointer to the current packet
799  * \param data pointer that gets cast into DPDKThreadVars for ptv
800  */
801 static TmEcode DecodeDPDK(ThreadVars *tv, Packet *p, void *data)
802 {
803  SCEnter();
805 
807 
808  /* update counters */
810 
811  /* If suri has set vlan during reading, we increase vlan counter */
812  if (p->vlan_idx) {
814  }
815 
816  /* call the decoder */
817  DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
818 
820 
822 }
823 
824 static TmEcode DecodeDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
825 {
826  SCEnter();
827  DecodeThreadVars *dtv = NULL;
828 
830 
831  if (dtv == NULL)
833 
835 
836  *data = (void *)dtv;
837 
839 }
840 
841 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data)
842 {
843  SCEnter();
844  if (data != NULL)
845  DecodeThreadVarsFree(tv, data);
847 }
848 
849 #endif /* HAVE_DPDK */
850 /* eof */
851 /**
852  * @}
853  */
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:77
PacketL4::csum_set
bool csum_set
Definition: decode.h:468
PacketCheckAction
bool PacketCheckAction(const Packet *p, const uint8_t a)
Definition: packet.c:50
util-device-private.h
tm-threads.h
len
uint8_t len
Definition: app-layer-dnp3.h:2
TMM_RECEIVEDPDK
@ TMM_RECEIVEDPDK
Definition: tm-threads-common.h:56
CHECKSUM_VALIDATION_OFFLOAD
@ CHECKSUM_VALIDATION_OFFLOAD
Definition: decode.h:48
offset
uint64_t offset
Definition: util-streaming-buffer.h:0
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
PacketFreeOrRelease
void PacketFreeOrRelease(Packet *p)
Return a packet to where it was allocated.
Definition: decode.c:282
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(StatsThreadContext *stats)
Definition: counters.c:482
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
PKT_IS_PSEUDOPKT
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition: decode.h:1346
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
DPDK_COPY_MODE_IPS
@ DPDK_COPY_MODE_IPS
Definition: source-dpdk.h:34
PacketL4::csum
uint16_t csum
Definition: decode.h:469
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:282
StatsRegisterCounter
StatsCounterId StatsRegisterCounter(const char *name, StatsThreadContext *stats)
Registers a normal, unqualified counter.
Definition: counters.c:1037
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:103
name
const char * name
Definition: detect-engine-proto.c:48
TMM_DECODEDPDK
@ TMM_DECODEDPDK
Definition: tm-threads-common.h:57
action-globals.h
Packet_::flags
uint32_t flags
Definition: decode.h:551
DpdkCopyModeEnum
DpdkCopyModeEnum
Definition: source-dpdk.h:34
threads.h
Packet_::vlan_idx
uint8_t vlan_idx
Definition: decode.h:533
LiveDevice_
Definition: util-device-private.h:32
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
NoDPDKSupportExit
TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **)
this function prints an error message and exits.
Definition: source-dpdk.c:79
MIN
#define MIN(x, y)
Definition: suricata-common.h:408
util-privs.h
CHECKSUM_VALIDATION_DISABLE
@ CHECKSUM_VALIDATION_DISABLE
Definition: decode.h:43
StatsCounterId
Definition: counters.h:30
PacketDecodeFinalize
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition: decode.c:238
DPDKIfaceConfig_
Definition: source-dpdk.h:53
util-dpdk-ice.h
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:305
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:82
tmqh-packetpool.h
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:58
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:81
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:53
Packet_::datalink
int datalink
Definition: decode.h:639
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1348
DPDKWorkerSync_
Definition: source-dpdk.h:48
DecodeRegisterPerfCounters
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:634
TmModuleReceiveDPDKRegister
void TmModuleReceiveDPDKRegister(void)
Definition: source-dpdk.c:51
decode.h
PKT_SRC_WIRE
@ PKT_SRC_WIRE
Definition: decode.h:52
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:61
Packet_::ts
SCTime_t ts
Definition: decode.h:559
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
LiveGetDevice
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
Definition: util-device.c:268
SCEnter
#define SCEnter(...)
Definition: util-debug.h:284
GET_PKT_DATA
#define GET_PKT_DATA(p)
Definition: decode.h:210
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:56
StatsCounterIncr
void StatsCounterIncr(StatsThreadContext *stats, StatsCounterId id)
Increments the local counter.
Definition: counters.c:164
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:262
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:317
PacketPoolWait
void PacketPoolWait(void)
Definition: tmqh-packetpool.c:71
Packet_
Definition: decode.h:505
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
type
uint16_t type
Definition: decode-vlan.c:106
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:29
GET_PKT_LEN
#define GET_PKT_LEN(p)
Definition: decode.h:209
TimeGet
SCTime_t TimeGet(void)
Definition: util-time.c:152
Packet_::l4
struct PacketL4 l4
Definition: decode.h:605
TmSlot_
Definition: tm-threads.h:53
PKT_IGNORE_CHECKSUM
#define PKT_IGNORE_CHECKSUM
Definition: decode.h:1309
SCTime_t
Definition: util-time.h:40
Packet_::livedev
struct LiveDevice_ * livedev
Definition: decode.h:622
DPDK_BURST_TX_WAIT_US
#define DPDK_BURST_TX_WAIT_US
Definition: source-dpdk.h:36
TmEcode
TmEcode
Definition: tm-threads-common.h:80
TmModule_::name
const char * name
Definition: tm-modules.h:48
runmodes.h
SCLogInfo
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:232
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
dtv
DecodeThreadVars * dtv
Definition: fuzz_decodepcapfile.c:34
PacketL3::csum_set
bool csum_set
Definition: decode.h:437
DPDK_IRQ_MODE
#define DPDK_IRQ_MODE
Definition: source-dpdk.h:42
Packet_::ReleasePacket
void(* ReleasePacket)(struct Packet_ *)
Definition: decode.h:595
util-dpdk.h
flags
uint8_t flags
Definition: decode-gre.h:0
DecodeThreadVarsFree
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:843
source-dpdk.h
ChecksumValidationMode
ChecksumValidationMode
Definition: decode.h:42
suricata-common.h
packet.h
ACTION_DROP
#define ACTION_DROP
Definition: action-globals.h:30
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:241
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:51
FatalError
#define FatalError(...)
Definition: util-debug.h:517
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:33
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:52
threadvars.h
StatsCounterSetI64
void StatsCounterSetI64(StatsThreadContext *stats, StatsCounterId id, int64_t x)
set, so overwrite, the value of the local counter
Definition: counters.c:203
Packet_::l3
struct PacketL3 l3
Definition: decode.h:604
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:274
SCFree
#define SCFree(p)
Definition: util-mem.h:61
DecodeThreadVars_
Structure to hold thread specific data for all decode modules.
Definition: decode.h:982
util-dpdk-bonding.h
util-dpdk-mlx5.h
DecodeThreadVarsAlloc
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:825
util-dpdk-ixgbe.h
PacketSetData
int PacketSetData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zero copy is used.
Definition: decode.c:863
util-dpdk-i40e.h
suricata.h
PacketL3::csum
uint16_t csum
Definition: decode.h:438
DecodeThreadVars_::counter_vlan
StatsCounterId counter_vlan
Definition: decode.h:1021
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
TmModuleDecodeDPDKRegister
void TmModuleDecodeDPDKRegister(void)
Registration Function for DecodeDPDK.
Definition: source-dpdk.c:65
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:250
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
ThreadVars_::stats
StatsThreadContext stats
Definition: threadvars.h:121
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:288
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:299
SC_CAP_NET_RAW
#define SC_CAP_NET_RAW
Definition: util-privs.h:32
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:80
DPDK_COPY_MODE_TAP
@ DPDK_COPY_MODE_TAP
Definition: source-dpdk.h:34
DecodeUpdatePacketCounters
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition: decode.c:793
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:176