suricata
source-dpdk.c
Go to the documentation of this file.
1 /* Copyright (C) 2021-2025 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \defgroup dpdk DPDK running mode
20  *
21  * @{
22  */
23 
24 /**
25  * \file
26  *
27  * \author Lukas Sismis <lukas.sismis@gmail.com>
28  *
29  * DPDK capture interface
30  *
31  */
32 
33 #include "suricata-common.h"
34 #include "runmodes.h"
35 #include "decode.h"
36 #include "packet.h"
37 #include "source-dpdk.h"
38 #include "suricata.h"
39 #include "threads.h"
40 #include "threadvars.h"
41 #include "tm-threads.h"
42 #include "tmqh-packetpool.h"
43 #include "util-privs.h"
44 #include "util-device-private.h"
45 #include "action-globals.h"
46 
47 #ifndef HAVE_DPDK
48 
49 TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **);
50 
52 {
53  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
60 }
61 
62 /**
63  * \brief Registration Function for DecodeDPDK.
64  */
66 {
67  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
74 }
75 
76 /**
77  * \brief this function prints an error message and exits.
78  */
79 TmEcode NoDPDKSupportExit(ThreadVars *tv, const void *initdata, void **data)
80 {
81  FatalError("Error creating thread %s: you do not have "
82  "support for DPDK enabled, on Linux host please recompile "
83  "with --enable-dpdk",
84  tv->name);
85 }
86 
87 #else /* We have DPDK support */
88 
89 #include "util-affinity.h"
90 #include "util-dpdk.h"
91 #include "util-dpdk-i40e.h"
92 #include "util-dpdk-ice.h"
93 #include "util-dpdk-ixgbe.h"
94 #include "util-dpdk-mlx5.h"
95 #include "util-dpdk-bonding.h"
96 #include <numa.h>
97 
98 #define BURST_SIZE 32
99 // interrupt mode constants
100 #define MIN_ZERO_POLL_COUNT 10U
101 #define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
102 #define MINIMUM_SLEEP_TIME_US 1U
103 #define STANDARD_SLEEP_TIME_US 100U
104 #define MAX_EPOLL_TIMEOUT_MS 500U
105 static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
106 
107 /**
108  * \brief Structure to hold thread specific variables.
109  */
110 typedef struct DPDKThreadVars_ {
111  /* counters */
112  uint64_t pkts;
113  ThreadVars *tv;
114  TmSlot *slot;
115  LiveDevice *livedev;
116  ChecksumValidationMode checksum_mode;
117  bool intr_enabled;
118  /* references to packet and drop counters */
119  StatsCounterId capture_dpdk_packets;
120  StatsCounterId capture_dpdk_rx_errs;
121  StatsCounterId capture_dpdk_imissed;
122  StatsCounterId capture_dpdk_rx_no_mbufs;
123  StatsCounterId capture_dpdk_ierrors;
124  StatsCounterId capture_dpdk_tx_errs;
125  unsigned int flags;
126  uint16_t threads;
127  /* for IPS */
128  DpdkCopyModeEnum copy_mode;
129  uint16_t out_port_id;
130  /* Entry in the peers_list */
131 
132  uint64_t bytes;
133  uint64_t accepted;
134  uint64_t dropped;
135  uint16_t port_id;
136  uint16_t queue_id;
137  int32_t port_socket_id;
138  struct rte_mbuf *received_mbufs[BURST_SIZE];
139  DPDKWorkerSync *workers_sync;
140 } DPDKThreadVars;
141 
142 static TmEcode ReceiveDPDKThreadInit(ThreadVars *, const void *, void **);
143 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *, void *);
144 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot);
145 
146 static TmEcode DecodeDPDKThreadInit(ThreadVars *, const void *, void **);
147 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data);
148 static TmEcode DecodeDPDK(ThreadVars *, Packet *, void *);
149 
150 static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset);
151 static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
152 {
153  uint32_t event_data = (uint32_t)port_id << UINT16_WIDTH | queue_id;
154  int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
155  RTE_INTR_EVENT_ADD, (void *)((uintptr_t)event_data));
156 
157  if (ret != 0) {
158  SCLogError("%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
159  queue_id, rte_strerror(-ret));
160  return false;
161  }
162  return true;
163 }
164 
165 static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
166 {
167  if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
168  return MINIMUM_SLEEP_TIME_US;
169 
170  return STANDARD_SLEEP_TIME_US;
171 }
172 
173 static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool on)
174 {
175  rte_spinlock_lock(&(intr_lock[port_id]));
176 
177  if (on)
178  rte_eth_dev_rx_intr_enable(port_id, queue_id);
179  else
180  rte_eth_dev_rx_intr_disable(port_id, queue_id);
181 
182  rte_spinlock_unlock(&(intr_lock[port_id]));
183 }
184 
185 static inline void DPDKFreeMbufArray(
186  struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
187 {
188  for (int i = offset; i < mbuf_cnt; i++) {
189  rte_pktmbuf_free(mbuf_array[i]);
190  }
191 }
192 
193 static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
194 {
195  if (strcmp(driver_name, "net_bonding") == 0)
196  driver_name = BondingDeviceDriverGet(ptv->port_id);
197  if (strcmp(driver_name, "net_i40e") == 0)
198  i40eDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
199  else if (strcmp(driver_name, "net_ixgbe") == 0)
200  ixgbeDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
201  else if (strcmp(driver_name, "net_ice") == 0)
202  iceDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
203  else if (strcmp(driver_name, "mlx5_pci") == 0)
204  mlx5DeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
205 }
206 
207 static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
208 {
209  if (strcmp(driver_name, "net_bonding") == 0) {
210  driver_name = BondingDeviceDriverGet(ptv->port_id);
211  }
212 
213  if (
214 #if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
215  strcmp(driver_name, "net_i40e") == 0 ||
216 #endif /* RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0) */
217  strcmp(driver_name, "net_ixgbe") == 0 || strcmp(driver_name, "net_ice") == 0 ||
218  strcmp(driver_name, "mlx5_pci") == 0) {
219  // Flush the RSS rules that have been inserted in the post start section
220  struct rte_flow_error flush_error = { 0 };
221  int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
222  if (retval != 0) {
223  SCLogError("%s: unable to flush rte_flow rules: %s Flush error msg: %s",
224  ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
225  }
226  }
227 }
228 
229 /**
230  * Attempts to retrieve NUMA node id on which the caller runs
231  * @return NUMA id on success, -1 otherwise
232  */
233 static int GetNumaNode(void)
234 {
235  int cpu = 0;
236  int node = -1;
237 
238 #if defined(__linux__)
239  cpu = sched_getcpu();
240  node = numa_node_of_cpu(cpu);
241 #else
242  SCLogWarning("NUMA node retrieval is not supported on this OS.");
243 #endif
244 
245  return node;
246 }
247 
248 /**
249  * \brief Registration Function for ReceiveDPDK.
250  * \todo Unit tests are needed for this module.
251  */
253 {
254  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
255  tmm_modules[TMM_RECEIVEDPDK].ThreadInit = ReceiveDPDKThreadInit;
257  tmm_modules[TMM_RECEIVEDPDK].PktAcqLoop = ReceiveDPDKLoop;
260  tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = ReceiveDPDKThreadDeinit;
263 }
264 
265 /**
266  * \brief Registration Function for DecodeDPDK.
267  * \todo Unit tests are needed for this module.
268  */
270 {
271  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
272  tmm_modules[TMM_DECODEDPDK].ThreadInit = DecodeDPDKThreadInit;
273  tmm_modules[TMM_DECODEDPDK].Func = DecodeDPDK;
275  tmm_modules[TMM_DECODEDPDK].ThreadDeinit = DecodeDPDKThreadDeinit;
278 }
279 
280 static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
281 {
282  /* Some NICs (e.g. Intel) do not support queue statistics and the drops can be fetched only on
283  * the port level. Therefore setting it to the first worker to have at least continuous update
284  * on the dropped packets. */
285  if (ptv->queue_id == 0) {
286  struct rte_eth_stats eth_stats;
287  int retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
288  if (unlikely(retval != 0)) {
289  SCLogError("%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
290  return;
291  }
292 
293  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_packets,
294  ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
295  SC_ATOMIC_SET(ptv->livedev->pkts,
296  eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
297  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_rx_errs,
298  eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
299  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_imissed, eth_stats.imissed);
300  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
301  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_ierrors, eth_stats.ierrors);
302  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_tx_errs, eth_stats.oerrors);
304  ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
305  } else {
306  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_packets, ptv->pkts);
307  }
308 }
309 
310 static void DPDKReleasePacket(Packet *p)
311 {
312  int retval;
313  /* Need to be in copy mode and need to detect early release
314  where Ethernet header could not be set (and pseudo packet)
315  When enabling promiscuous mode on Intel cards, 2 ICMPv6 packets are generated.
316  These get into the infinite cycle between the NIC and the switch in some cases */
317  if ((p->dpdk_v.copy_mode == DPDK_COPY_MODE_TAP ||
318  (p->dpdk_v.copy_mode == DPDK_COPY_MODE_IPS && !PacketCheckAction(p, ACTION_DROP)))
319 #if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
320  && !(PacketIsICMPv6(p) && PacketGetICMPv6(p)->type == 143)
321 #endif
322  ) {
324  retval =
325  rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
326  // rte_eth_tx_burst can return only 0 (failure) or 1 (success) because we are only
327  // transmitting burst of size 1 and the function rte_eth_tx_burst returns number of
328  // successfully sent packets.
329  if (unlikely(retval < 1)) {
330  // sometimes a repeated transmit can help to send out the packet
331  rte_delay_us(DPDK_BURST_TX_WAIT_US);
332  retval = rte_eth_tx_burst(
333  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
334  if (unlikely(retval < 1)) {
335  SCLogDebug("Unable to transmit the packet on port %u queue %u",
336  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
337  rte_pktmbuf_free(p->dpdk_v.mbuf);
338  p->dpdk_v.mbuf = NULL;
339  }
340  }
341  } else {
342  rte_pktmbuf_free(p->dpdk_v.mbuf);
343  p->dpdk_v.mbuf = NULL;
344  }
345 
347 }
348 
349 static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
350 {
351  SCEnter();
352  // Indicate that the thread is actually running its application level
353  // code (i.e., it can poll packets)
355  PacketPoolWait();
356 
357  rte_eth_stats_reset(ptv->port_id);
358  rte_eth_xstats_reset(ptv->port_id);
359 
360  if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
362 
364 }
365 
366 static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
367 {
368  static thread_local uint64_t last_timeout_msec = 0;
369  SCTime_t t = TimeGet();
370  uint64_t msecs = SCTIME_MSECS(t);
371  if (msecs > last_timeout_msec + 100) {
372  TmThreadsCaptureHandleTimeout(tv, NULL);
373  last_timeout_msec = msecs;
374  }
375 }
376 
377 /**
378  * \brief Decides if it should retry the packet poll or continue with the packet processing
379  * \return true if the poll should be retried, false otherwise
380  */
381 static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
382 {
383  static thread_local uint32_t zero_pkt_polls_cnt = 0;
384 
385  if (nb_rx > 0) {
386  zero_pkt_polls_cnt = 0;
387  return false;
388  }
389 
390  LoopHandleTimeoutOnIdle(tv);
391  if (!ptv->intr_enabled)
392  return true;
393 
394  zero_pkt_polls_cnt++;
395  if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
396  return true;
397 
398  uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
399  if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
400  rte_delay_us(pwd_idle_hint);
401  } else {
402  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
403  struct rte_epoll_event event;
404  rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
405  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
406  return true;
407  }
408 
409  return false;
410 }
411 
412 /**
413  * \brief Initializes a packet from an mbuf
414  * \return true if the packet was initialized successfully, false otherwise
415  */
416 static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
417 {
419  if (unlikely(p == NULL)) {
420  return NULL;
421  }
424  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
426  }
427 
428  p->ts = TimeGet();
429  p->dpdk_v.mbuf = mbuf;
430  p->ReleasePacket = DPDKReleasePacket;
431  p->dpdk_v.copy_mode = ptv->copy_mode;
432  p->dpdk_v.out_port_id = ptv->out_port_id;
433  p->dpdk_v.out_queue_id = ptv->queue_id;
434  p->livedev_id = ptv->livedev->id;
435 
436  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
438  } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
439  uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
440  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
441  (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
442  SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
444  } else {
445  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
446  SCLogDebug("HW detected BAD IP checksum");
447  // chsum recalc will not be triggered but rule keyword check will be
448  p->l3.csum_set = true;
449  p->l3.csum = 0;
450  }
451  if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
452  SCLogDebug("HW detected BAD L4 chsum");
453  p->l4.csum_set = true;
454  p->l4.csum = 0;
455  }
456  }
457  }
458 
459  return p;
460 }
461 
462 static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
463 {
464  static thread_local bool segmented_mbufs_warned = false;
465  if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
466  char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
467  "Check your configuration or report the issue";
468  enum rte_proc_type_t eal_t = rte_eal_process_type();
469  if (eal_t == RTE_PROC_SECONDARY) {
470  SCLogWarning("%s. To avoid segmented mbufs, "
471  "try to increase mbuf size in your primary application",
472  warn_s);
473  } else if (eal_t == RTE_PROC_PRIMARY) {
474  SCLogWarning("%s. To avoid segmented mbufs, "
475  "try to increase MTU in your suricata.yaml",
476  warn_s);
477  }
478 
479  segmented_mbufs_warned = true;
480  }
481 }
482 
483 static void PrintDPDKPortXstats(uint16_t port_id, const char *port_name)
484 {
485  int ret = rte_eth_xstats_get(port_id, NULL, 0);
486  if (ret <= 0) {
487  SCLogPerf("%s: unable to obtain rte_eth_xstats (%s)", port_name,
488  ret == 0 ? "not supported" : rte_strerror(-ret));
489  return;
490  }
491  unsigned int len = (unsigned int)ret;
492  struct rte_eth_xstat_name *xstats_names = NULL;
493  struct rte_eth_xstat *xstats = SCCalloc(len, sizeof(*xstats));
494  if (xstats == NULL) {
495  SCLogWarning("Failed to allocate memory for the rte_eth_xstat structure");
496  return;
497  }
498 
499  ret = rte_eth_xstats_get(port_id, xstats, len);
500  if (ret < 0 || (unsigned int)ret > len) {
501  SCLogPerf("%s: unable to obtain rte_eth_xstats (%s)", port_name,
502  ret < 0 ? rte_strerror(-ret) : "table size too small");
503  goto cleanup;
504  }
505  xstats_names = SCCalloc(len, sizeof(*xstats_names));
506  if (xstats_names == NULL) {
507  SCLogWarning("Failed to allocate memory for the rte_eth_xstat_name array");
508  goto cleanup;
509  }
510  ret = rte_eth_xstats_get_names(port_id, xstats_names, len);
511  if (ret < 0 || (unsigned int)ret > len) {
512  SCLogPerf("%s: unable to obtain names of rte_eth_xstats (%s)", port_name,
513  ret < 0 ? rte_strerror(-ret) : "table size too small");
514  goto cleanup;
515  }
516  for (unsigned int i = 0; i < len; i++) {
517  if (xstats[i].value > 0)
518  SCLogPerf("Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].name,
519  xstats[i].value);
520  }
521 
522 cleanup:
523  if (xstats != NULL)
524  SCFree(xstats);
525  if (xstats_names != NULL)
526  SCFree(xstats_names);
527 }
528 
529 static void HandleShutdown(DPDKThreadVars *ptv)
530 {
531  SCLogDebug("Stopping Suricata!");
532  SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
533  while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
534  rte_delay_us(10);
535  }
536  // Dump counters while device is still running - some drivers (e.g. BNXT) fail
537  // to report stats after the device is stopped
538  DPDKDumpCounters(ptv);
539  if (ptv->queue_id == 0) {
540  PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
541  rte_delay_us(20); // wait for all threads to get out of the sync loop
542  SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
543  // If Suricata runs in peered mode, the peer threads might still want to send
544  // packets to our port. Instead, we know, that we are done with the peered port, so
545  // we stop it. The peered threads will stop our port.
546  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
547  rte_eth_dev_stop(ptv->out_port_id);
548  } else {
549  // in IDS we stop our port - no peer threads are running
550  rte_eth_dev_stop(ptv->port_id);
551  }
552  }
553 }
554 
555 static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
556 {
557  static thread_local SCTime_t last_dump = { 0 };
558  SCTime_t current_time = TimeGet();
559  /* Trigger one dump of stats every second */
560  if (current_time.secs != last_dump.secs) {
561  DPDKDumpCounters(ptv);
562  last_dump = current_time;
563  }
564 }
565 
566 /**
567  * \brief Main DPDK reading Loop function
568  */
569 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
570 {
571  SCEnter();
572  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
573  ptv->slot = ((TmSlot *)slot)->slot_next;
574  uint32_t mp_sz = ptv->livedev->dpdk_vars->pkt_mp[ptv->queue_id]->size;
575  uint16_t burst_size = (uint16_t)MIN(BURST_SIZE, mp_sz);
576 
577  TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
578  if (ret != TM_ECODE_OK) {
579  SCReturnInt(ret);
580  }
581  while (true) {
582  if (unlikely(suricata_ctl_flags != 0)) {
583  HandleShutdown(ptv);
584  break;
585  }
586 
587  uint16_t nb_rx =
588  rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, burst_size);
589  if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
590  continue;
591  }
592 
593  ptv->pkts += (uint64_t)nb_rx;
594  for (uint16_t i = 0; i < nb_rx; i++) {
595  Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
596  if (p == NULL) {
597  rte_pktmbuf_free(ptv->received_mbufs[i]);
598  continue;
599  }
600  DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
601  PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
602  rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
603  if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
604  TmqhOutputPacketpool(ptv->tv, p);
605  DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
606  SCReturnInt(EXIT_FAILURE);
607  }
608  }
609 
610  PeriodicDPDKDumpCounters(ptv);
612  }
613 
615 }
616 
617 /**
618  * \brief Init function for ReceiveDPDK.
619  *
620  * \param tv pointer to ThreadVars
621  * \param initdata pointer to the interface passed from the user
622  * \param data pointer gets populated with DPDKThreadVars
623  *
624  */
625 static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
626 {
627  SCEnter();
628  int retval, thread_numa;
629  DPDKThreadVars *ptv = NULL;
630  DPDKIfaceConfig *dpdk_config = (DPDKIfaceConfig *)initdata;
631 
632  if (initdata == NULL) {
633  SCLogError("DPDK configuration is NULL in thread initialization");
634  goto fail;
635  }
636 
637  ptv = SCCalloc(1, sizeof(DPDKThreadVars));
638  if (unlikely(ptv == NULL)) {
639  SCLogError("Unable to allocate memory");
640  goto fail;
641  }
642 
643  ptv->tv = tv;
644  ptv->pkts = 0;
645  ptv->bytes = 0;
646 
647  ptv->livedev = LiveGetDevice(dpdk_config->iface);
648  if (unlikely(ptv->livedev == NULL)) {
649  SCLogError("Unable to allocate memory for livedev %s", dpdk_config->iface);
650  goto fail;
651  }
652 
653  ptv->capture_dpdk_packets = StatsRegisterCounter("capture.packets", &ptv->tv->stats);
654  ptv->capture_dpdk_rx_errs = StatsRegisterCounter("capture.rx_errors", &ptv->tv->stats);
655  ptv->capture_dpdk_tx_errs = StatsRegisterCounter("capture.tx_errors", &ptv->tv->stats);
656  ptv->capture_dpdk_imissed = StatsRegisterCounter("capture.dpdk.imissed", &ptv->tv->stats);
657  ptv->capture_dpdk_rx_no_mbufs = StatsRegisterCounter("capture.dpdk.no_mbufs", &ptv->tv->stats);
658  ptv->capture_dpdk_ierrors = StatsRegisterCounter("capture.dpdk.ierrors", &ptv->tv->stats);
659 
660  ptv->copy_mode = dpdk_config->copy_mode;
661  ptv->checksum_mode = dpdk_config->checksum_mode;
662 
663  ptv->threads = dpdk_config->threads;
664  ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) != 0;
665  ptv->port_id = dpdk_config->port_id;
666  ptv->out_port_id = dpdk_config->out_port_id;
667  ptv->port_socket_id = dpdk_config->socket_id;
668 
669  thread_numa = GetNumaNode();
670  if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
671  thread_numa != ptv->port_socket_id) {
672  SC_ATOMIC_ADD(dpdk_config->inconsistent_numa_cnt, 1);
673  SCLogPerf("%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
674  ptv->port_socket_id, thread_numa);
675  }
676 
677  ptv->workers_sync = dpdk_config->workers_sync;
678  uint16_t queue_id = SC_ATOMIC_ADD(dpdk_config->queue_id, 1);
679  ptv->queue_id = queue_id;
680 
681  // the last thread starts the device
682  if (queue_id == dpdk_config->threads - 1) {
683  retval = rte_eth_dev_start(ptv->port_id);
684  if (retval < 0) {
685  SCLogError("%s: error (%s) during device startup", dpdk_config->iface,
686  rte_strerror(-retval));
687  goto fail;
688  }
689 
690  struct rte_eth_dev_info dev_info;
691  retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
692  if (retval != 0) {
693  SCLogError("%s: error (%s) when getting device info", dpdk_config->iface,
694  rte_strerror(-retval));
695  goto fail;
696  }
697 
698  uint32_t timeout = dpdk_config->linkup_timeout * 10;
699  while (timeout > 0) {
700  struct rte_eth_link link = { 0 };
701  retval = rte_eth_link_get_nowait(ptv->port_id, &link);
702  if (retval != 0) {
703  if (retval == -ENOTSUP) {
704  SCLogInfo("%s: link status not supported, skipping", dpdk_config->iface);
705  } else {
706  SCLogInfo("%s: error (%s) when getting link status, skipping",
707  dpdk_config->iface, rte_strerror(-retval));
708  }
709  break;
710  }
711  if (link.link_status) {
712  char link_status_str[RTE_ETH_LINK_MAX_STR_LEN];
713 #if RTE_VERSION >= RTE_VERSION_NUM(20, 11, 0, 0)
714 #pragma GCC diagnostic push
715 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
716  rte_eth_link_to_str(link_status_str, sizeof(link_status_str), &link);
717 #pragma GCC diagnostic pop
718 #else
719  snprintf(link_status_str, sizeof(link_status_str),
720  "Link Up, speed %u Mbps, %s", // 22 chars + 10 for digits + 11 for duplex
721  link.link_speed,
722  (link.link_duplex == ETH_LINK_FULL_DUPLEX) ? "full-duplex" : "half-duplex");
723 #endif
724 
725  SCLogInfo("%s: %s", dpdk_config->iface, link_status_str);
726  break;
727  }
728 
729  rte_delay_ms(100);
730  timeout--;
731  }
732 
733  if (dpdk_config->linkup_timeout && timeout == 0) {
734  SCLogWarning("%s: link is down, trying to continue anyway", dpdk_config->iface);
735  }
736 
737  // some PMDs requires additional actions only after the device has started
738  DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
739 
740  uint16_t inconsistent_numa_cnt = SC_ATOMIC_GET(dpdk_config->inconsistent_numa_cnt);
741  if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
742  SCLogWarning("%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
743  dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
744  } else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
745  SCLogNotice(
746  "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
747  dpdk_config->iface);
748  }
749  if (ptv->intr_enabled) {
750  rte_spinlock_init(&intr_lock[ptv->port_id]);
751  }
752  }
753 
754  *data = (void *)ptv;
755  dpdk_config->DerefFunc(dpdk_config);
757 
758 fail:
759  if (dpdk_config != NULL)
760  dpdk_config->DerefFunc(dpdk_config);
761  if (ptv != NULL)
762  SCFree(ptv);
764 }
765 
766 /**
767  * \brief DeInit function closes dpdk at exit.
768  * \param tv pointer to ThreadVars
769  * \param data pointer that gets cast into DPDKThreadVars for ptv
770  */
771 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *tv, void *data)
772 {
773  SCEnter();
774  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
775 
776  if (ptv->queue_id == 0) {
777  struct rte_eth_dev_info dev_info;
778  int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
779  if (retval != 0) {
780  SCLogError("%s: error (%s) when getting device info", ptv->livedev->dev,
781  rte_strerror(-retval));
783  }
784 
785  DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
786 
787  if (ptv->workers_sync) {
788  SCFree(ptv->workers_sync);
789  }
790  }
791 
792  SCFree(ptv);
794 }
795 
796 /**
797  * \brief This function passes off to link type decoders.
798  *
799  * DecodeDPDK decodes packets from DPDK and passes
800  * them off to the proper link type decoder.
801  *
802  * \param t pointer to ThreadVars
803  * \param p pointer to the current packet
804  * \param data pointer that gets cast into DPDKThreadVars for ptv
805  */
806 static TmEcode DecodeDPDK(ThreadVars *tv, Packet *p, void *data)
807 {
808  SCEnter();
810 
812 
813  /* update counters */
815 
816  /* If suri has set vlan during reading, we increase vlan counter */
817  if (p->vlan_idx) {
819  }
820 
821  /* call the decoder */
822  DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
823 
825 
827 }
828 
829 static TmEcode DecodeDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
830 {
831  SCEnter();
832  DecodeThreadVars *dtv = NULL;
833 
835 
836  if (dtv == NULL)
838 
840 
841  *data = (void *)dtv;
842 
844 }
845 
846 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data)
847 {
848  SCEnter();
849  if (data != NULL)
850  DecodeThreadVarsFree(tv, data);
852 }
853 
854 #endif /* HAVE_DPDK */
855 /* eof */
856 /**
857  * @}
858  */
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:77
PacketL4::csum_set
bool csum_set
Definition: decode.h:477
PacketCheckAction
bool PacketCheckAction(const Packet *p, const uint8_t a)
Definition: packet.c:50
util-device-private.h
tm-threads.h
len
uint8_t len
Definition: app-layer-dnp3.h:2
TMM_RECEIVEDPDK
@ TMM_RECEIVEDPDK
Definition: tm-threads-common.h:56
CHECKSUM_VALIDATION_OFFLOAD
@ CHECKSUM_VALIDATION_OFFLOAD
Definition: decode.h:48
offset
uint64_t offset
Definition: util-streaming-buffer.h:0
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
PacketFreeOrRelease
void PacketFreeOrRelease(Packet *p)
Return a packet to where it was allocated.
Definition: decode.c:282
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(StatsThreadContext *stats)
Definition: counters.c:482
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
PKT_IS_PSEUDOPKT
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition: decode.h:1357
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
DPDK_COPY_MODE_IPS
@ DPDK_COPY_MODE_IPS
Definition: source-dpdk.h:34
PacketL4::csum
uint16_t csum
Definition: decode.h:478
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:282
StatsRegisterCounter
StatsCounterId StatsRegisterCounter(const char *name, StatsThreadContext *stats)
Registers a normal, unqualified counter.
Definition: counters.c:1039
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:103
name
const char * name
Definition: detect-engine-proto.c:48
TMM_DECODEDPDK
@ TMM_DECODEDPDK
Definition: tm-threads-common.h:57
action-globals.h
Packet_::flags
uint32_t flags
Definition: decode.h:560
DpdkCopyModeEnum
DpdkCopyModeEnum
Definition: source-dpdk.h:34
threads.h
Packet_::vlan_idx
uint8_t vlan_idx
Definition: decode.h:542
LiveDevice_
Definition: util-device-private.h:32
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
NoDPDKSupportExit
TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **)
this function prints an error message and exits.
Definition: source-dpdk.c:79
MIN
#define MIN(x, y)
Definition: suricata-common.h:416
util-privs.h
CHECKSUM_VALIDATION_DISABLE
@ CHECKSUM_VALIDATION_DISABLE
Definition: decode.h:43
StatsCounterId
Definition: counters.h:30
PacketDecodeFinalize
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition: decode.c:238
DPDKIfaceConfig_
Definition: source-dpdk.h:53
util-dpdk-ice.h
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:305
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:82
tmqh-packetpool.h
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:58
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:81
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:53
Packet_::datalink
int datalink
Definition: decode.h:650
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1359
DPDKWorkerSync_
Definition: source-dpdk.h:48
DecodeRegisterPerfCounters
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:634
TmModuleReceiveDPDKRegister
void TmModuleReceiveDPDKRegister(void)
Definition: source-dpdk.c:51
decode.h
PKT_SRC_WIRE
@ PKT_SRC_WIRE
Definition: decode.h:52
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:61
Packet_::ts
SCTime_t ts
Definition: decode.h:568
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
LiveGetDevice
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
Definition: util-device.c:269
SCEnter
#define SCEnter(...)
Definition: util-debug.h:284
GET_PKT_DATA
#define GET_PKT_DATA(p)
Definition: decode.h:210
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:56
StatsCounterIncr
void StatsCounterIncr(StatsThreadContext *stats, StatsCounterId id)
Increments the local counter.
Definition: counters.c:164
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:262
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:325
PacketPoolWait
void PacketPoolWait(void)
Definition: tmqh-packetpool.c:71
Packet_
Definition: decode.h:514
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
type
uint16_t type
Definition: decode-vlan.c:106
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:29
GET_PKT_LEN
#define GET_PKT_LEN(p)
Definition: decode.h:209
TimeGet
SCTime_t TimeGet(void)
Definition: util-time.c:152
Packet_::l4
struct PacketL4 l4
Definition: decode.h:614
TmSlot_
Definition: tm-threads.h:53
PKT_IGNORE_CHECKSUM
#define PKT_IGNORE_CHECKSUM
Definition: decode.h:1320
SCTime_t
Definition: util-time.h:40
DPDK_BURST_TX_WAIT_US
#define DPDK_BURST_TX_WAIT_US
Definition: source-dpdk.h:36
TmEcode
TmEcode
Definition: tm-threads-common.h:80
TmModule_::name
const char * name
Definition: tm-modules.h:48
runmodes.h
SCLogInfo
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:232
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
dtv
DecodeThreadVars * dtv
Definition: fuzz_decodepcapfile.c:34
PacketL3::csum_set
bool csum_set
Definition: decode.h:446
DPDK_IRQ_MODE
#define DPDK_IRQ_MODE
Definition: source-dpdk.h:42
Packet_::ReleasePacket
void(* ReleasePacket)(struct Packet_ *)
Definition: decode.h:604
util-dpdk.h
flags
uint8_t flags
Definition: decode-gre.h:0
DecodeThreadVarsFree
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:843
source-dpdk.h
ChecksumValidationMode
ChecksumValidationMode
Definition: decode.h:42
suricata-common.h
packet.h
Packet_::livedev_id
uint16_t livedev_id
Definition: decode.h:631
ACTION_DROP
#define ACTION_DROP
Definition: action-globals.h:30
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:241
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:51
FatalError
#define FatalError(...)
Definition: util-debug.h:517
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:33
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:52
threadvars.h
StatsCounterSetI64
void StatsCounterSetI64(StatsThreadContext *stats, StatsCounterId id, int64_t x)
set, so overwrite, the value of the local counter
Definition: counters.c:203
Packet_::l3
struct PacketL3 l3
Definition: decode.h:613
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:274
SCFree
#define SCFree(p)
Definition: util-mem.h:61
DecodeThreadVars_
Structure to hold thread specific data for all decode modules.
Definition: decode.h:993
util-dpdk-bonding.h
util-dpdk-mlx5.h
DecodeThreadVarsAlloc
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:825
util-dpdk-ixgbe.h
PacketSetData
int PacketSetData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zero copy is used.
Definition: decode.c:863
util-dpdk-i40e.h
suricata.h
PacketL3::csum
uint16_t csum
Definition: decode.h:447
DecodeThreadVars_::counter_vlan
StatsCounterId counter_vlan
Definition: decode.h:1032
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
TmModuleDecodeDPDKRegister
void TmModuleDecodeDPDKRegister(void)
Registration Function for DecodeDPDK.
Definition: source-dpdk.c:65
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:250
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
ThreadVars_::stats
StatsThreadContext stats
Definition: threadvars.h:121
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:288
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:299
SC_CAP_NET_RAW
#define SC_CAP_NET_RAW
Definition: util-privs.h:32
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:80
DPDK_COPY_MODE_TAP
@ DPDK_COPY_MODE_TAP
Definition: source-dpdk.h:34
DecodeUpdatePacketCounters
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition: decode.c:793
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:176