suricata
source-dpdk.c
Go to the documentation of this file.
1 /* Copyright (C) 2021-2025 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \defgroup dpdk DPDK running mode
20  *
21  * @{
22  */
23 
24 /**
25  * \file
26  *
27  * \author Lukas Sismis <lukas.sismis@gmail.com>
28  *
29  * DPDK capture interface
30  *
31  */
32 
33 #include "suricata-common.h"
34 #include "runmodes.h"
35 #include "decode.h"
36 #include "packet.h"
37 #include "source-dpdk.h"
38 #include "suricata.h"
39 #include "threads.h"
40 #include "threadvars.h"
41 #include "tm-threads.h"
42 #include "tmqh-packetpool.h"
43 #include "util-privs.h"
44 #include "util-device-private.h"
45 #include "action-globals.h"
46 
47 #ifndef HAVE_DPDK
48 
49 TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **);
50 
52 {
53  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
60 }
61 
62 /**
63  * \brief Registration Function for DecodeDPDK.
64  */
66 {
67  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
74 }
75 
76 /**
77  * \brief this function prints an error message and exits.
78  */
79 TmEcode NoDPDKSupportExit(ThreadVars *tv, const void *initdata, void **data)
80 {
81  FatalError("Error creating thread %s: you do not have "
82  "support for DPDK enabled, on Linux host please recompile "
83  "with --enable-dpdk",
84  tv->name);
85 }
86 
87 #else /* We have DPDK support */
88 
89 #include "util-affinity.h"
90 #include "util-dpdk.h"
91 #include "util-dpdk-i40e.h"
92 #include "util-dpdk-ice.h"
93 #include "util-dpdk-ixgbe.h"
94 #include "util-dpdk-mlx5.h"
95 #include "util-dpdk-bonding.h"
96 #include <numa.h>
97 
98 #define BURST_SIZE 32
99 // interrupt mode constants
100 #define MIN_ZERO_POLL_COUNT 10U
101 #define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
102 #define MINIMUM_SLEEP_TIME_US 1U
103 #define STANDARD_SLEEP_TIME_US 100U
104 #define MAX_EPOLL_TIMEOUT_MS 500U
105 static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
106 
107 /**
108  * \brief Structure to hold thread specific variables.
109  */
110 typedef struct DPDKThreadVars_ {
111  /* counters */
112  uint64_t pkts;
113  ThreadVars *tv;
114  TmSlot *slot;
115  LiveDevice *livedev;
116  ChecksumValidationMode checksum_mode;
117  bool intr_enabled;
118  /* references to packet and drop counters */
119  StatsCounterId capture_dpdk_packets;
120  StatsCounterId capture_dpdk_rx_errs;
121  StatsCounterId capture_dpdk_imissed;
122  StatsCounterId capture_dpdk_rx_no_mbufs;
123  StatsCounterId capture_dpdk_ierrors;
124  StatsCounterId capture_dpdk_tx_errs;
125  unsigned int flags;
126  uint16_t threads;
127  /* for IPS */
128  DpdkCopyModeEnum copy_mode;
129  uint16_t out_port_id;
130  /* Entry in the peers_list */
131 
132  uint64_t bytes;
133  uint64_t accepted;
134  uint64_t dropped;
135  uint16_t port_id;
136  uint16_t queue_id;
137  int32_t port_socket_id;
138  struct rte_mbuf *received_mbufs[BURST_SIZE];
139  DPDKWorkerSync *workers_sync;
140 } DPDKThreadVars;
141 
142 static TmEcode ReceiveDPDKThreadInit(ThreadVars *, const void *, void **);
143 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *, void *);
144 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot);
145 
146 static TmEcode DecodeDPDKThreadInit(ThreadVars *, const void *, void **);
147 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data);
148 static TmEcode DecodeDPDK(ThreadVars *, Packet *, void *);
149 
150 static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset);
151 static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
152 {
153  uint32_t event_data = (uint32_t)port_id << UINT16_WIDTH | queue_id;
154  int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
155  RTE_INTR_EVENT_ADD, (void *)((uintptr_t)event_data));
156 
157  if (ret != 0) {
158  SCLogError("%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
159  queue_id, rte_strerror(-ret));
160  return false;
161  }
162  return true;
163 }
164 
165 static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
166 {
167  if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
168  return MINIMUM_SLEEP_TIME_US;
169 
170  return STANDARD_SLEEP_TIME_US;
171 }
172 
173 static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool on)
174 {
175  rte_spinlock_lock(&(intr_lock[port_id]));
176 
177  if (on)
178  rte_eth_dev_rx_intr_enable(port_id, queue_id);
179  else
180  rte_eth_dev_rx_intr_disable(port_id, queue_id);
181 
182  rte_spinlock_unlock(&(intr_lock[port_id]));
183 }
184 
185 static inline void DPDKFreeMbufArray(
186  struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
187 {
188  for (int i = offset; i < mbuf_cnt; i++) {
189  rte_pktmbuf_free(mbuf_array[i]);
190  }
191 }
192 
193 static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
194 {
195  if (strcmp(driver_name, "net_bonding") == 0)
196  driver_name = BondingDeviceDriverGet(ptv->port_id);
197  if (strcmp(driver_name, "net_i40e") == 0)
198  i40eDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
199  else if (strcmp(driver_name, "net_ixgbe") == 0)
200  ixgbeDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
201  else if (strcmp(driver_name, "net_ice") == 0)
202  iceDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
203  else if (strcmp(driver_name, "mlx5_pci") == 0)
204  mlx5DeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
205 }
206 
207 static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
208 {
209  if (strcmp(driver_name, "net_bonding") == 0) {
210  driver_name = BondingDeviceDriverGet(ptv->port_id);
211  }
212 
213  if (
214 #if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
215  strcmp(driver_name, "net_i40e") == 0 ||
216 #endif /* RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0) */
217  strcmp(driver_name, "net_ixgbe") == 0 || strcmp(driver_name, "net_ice") == 0 ||
218  strcmp(driver_name, "mlx5_pci") == 0) {
219  // Flush the RSS rules that have been inserted in the post start section
220  struct rte_flow_error flush_error = { 0 };
221  int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
222  if (retval != 0) {
223  SCLogError("%s: unable to flush rte_flow rules: %s Flush error msg: %s",
224  ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
225  }
226  }
227 }
228 
229 /**
230  * Attempts to retrieve NUMA node id on which the caller runs
231  * @return NUMA id on success, -1 otherwise
232  */
233 static int GetNumaNode(void)
234 {
235  int cpu = 0;
236  int node = -1;
237 
238 #if defined(__linux__)
239  cpu = sched_getcpu();
240  node = numa_node_of_cpu(cpu);
241 #else
242  SCLogWarning("NUMA node retrieval is not supported on this OS.");
243 #endif
244 
245  return node;
246 }
247 
248 /**
249  * \brief Registration Function for ReceiveDPDK.
250  * \todo Unit tests are needed for this module.
251  */
253 {
254  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
255  tmm_modules[TMM_RECEIVEDPDK].ThreadInit = ReceiveDPDKThreadInit;
257  tmm_modules[TMM_RECEIVEDPDK].PktAcqLoop = ReceiveDPDKLoop;
260  tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = ReceiveDPDKThreadDeinit;
263 }
264 
265 /**
266  * \brief Registration Function for DecodeDPDK.
267  * \todo Unit tests are needed for this module.
268  */
270 {
271  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
272  tmm_modules[TMM_DECODEDPDK].ThreadInit = DecodeDPDKThreadInit;
273  tmm_modules[TMM_DECODEDPDK].Func = DecodeDPDK;
275  tmm_modules[TMM_DECODEDPDK].ThreadDeinit = DecodeDPDKThreadDeinit;
278 }
279 
280 static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
281 {
282  /* Some NICs (e.g. Intel) do not support queue statistics and the drops can be fetched only on
283  * the port level. Therefore setting it to the first worker to have at least continuous update
284  * on the dropped packets. */
285  if (ptv->queue_id == 0) {
286  struct rte_eth_stats eth_stats;
287  int retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
288  if (unlikely(retval != 0)) {
289  SCLogError("%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
290  return;
291  }
292 
293  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_packets,
294  ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
295  SC_ATOMIC_SET(ptv->livedev->pkts,
296  eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
297  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_rx_errs,
298  eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
299  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_imissed, eth_stats.imissed);
300  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
301  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_ierrors, eth_stats.ierrors);
302  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_tx_errs, eth_stats.oerrors);
304  ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
305  } else {
306  StatsCounterSetI64(&ptv->tv->stats, ptv->capture_dpdk_packets, ptv->pkts);
307  }
308 }
309 
310 static void DPDKReleasePacket(Packet *p)
311 {
312  int retval;
313  /* Need to be in copy mode and need to detect early release
314  where Ethernet header could not be set (and pseudo packet)
315  When enabling promiscuous mode on Intel cards, 2 ICMPv6 packets are generated.
316  These get into the infinite cycle between the NIC and the switch in some cases */
317  if ((p->dpdk_v.copy_mode == DPDK_COPY_MODE_TAP ||
318  (p->dpdk_v.copy_mode == DPDK_COPY_MODE_IPS && !PacketCheckAction(p, ACTION_DROP)))
319 #if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
320  && !(PacketIsICMPv6(p) && PacketGetICMPv6(p)->type == 143)
321 #endif
322  ) {
324  retval =
325  rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
326  // rte_eth_tx_burst can return only 0 (failure) or 1 (success) because we are only
327  // transmitting burst of size 1 and the function rte_eth_tx_burst returns number of
328  // successfully sent packets.
329  if (unlikely(retval < 1)) {
330  // sometimes a repeated transmit can help to send out the packet
331  rte_delay_us(DPDK_BURST_TX_WAIT_US);
332  retval = rte_eth_tx_burst(
333  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
334  if (unlikely(retval < 1)) {
335  SCLogDebug("Unable to transmit the packet on port %u queue %u",
336  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
337  rte_pktmbuf_free(p->dpdk_v.mbuf);
338  p->dpdk_v.mbuf = NULL;
339  }
340  }
341  } else {
342  rte_pktmbuf_free(p->dpdk_v.mbuf);
343  p->dpdk_v.mbuf = NULL;
344  }
345 
347 }
348 
349 static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
350 {
351  SCEnter();
352  // Indicate that the thread is actually running its application level
353  // code (i.e., it can poll packets)
355  PacketPoolWait();
356 
357  rte_eth_stats_reset(ptv->port_id);
358  rte_eth_xstats_reset(ptv->port_id);
359 
360  if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
362 
364 }
365 
366 static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
367 {
368  static thread_local uint64_t last_timeout_msec = 0;
369  SCTime_t t = TimeGet();
370  uint64_t msecs = SCTIME_MSECS(t);
371  if (msecs > last_timeout_msec + 100) {
372  TmThreadsCaptureHandleTimeout(tv, NULL);
373  last_timeout_msec = msecs;
374  }
375 }
376 
377 /**
378  * \brief Decides if it should retry the packet poll or continue with the packet processing
379  * \return true if the poll should be retried, false otherwise
380  */
381 static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
382 {
383  static thread_local uint32_t zero_pkt_polls_cnt = 0;
384 
385  if (nb_rx > 0) {
386  zero_pkt_polls_cnt = 0;
387  return false;
388  }
389 
390  LoopHandleTimeoutOnIdle(tv);
391  if (!ptv->intr_enabled)
392  return true;
393 
394  zero_pkt_polls_cnt++;
395  if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
396  return true;
397 
398  uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
399  if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
400  rte_delay_us(pwd_idle_hint);
401  } else {
402  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
403  struct rte_epoll_event event;
404  rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
405  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
406  return true;
407  }
408 
409  return false;
410 }
411 
412 /**
413  * \brief Initializes a packet from an mbuf
414  * \return true if the packet was initialized successfully, false otherwise
415  */
416 static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
417 {
419  if (unlikely(p == NULL)) {
420  return NULL;
421  }
424  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
426  }
427 
428  p->ts = TimeGet();
429  p->dpdk_v.mbuf = mbuf;
430  p->ReleasePacket = DPDKReleasePacket;
431  p->dpdk_v.copy_mode = ptv->copy_mode;
432  p->dpdk_v.out_port_id = ptv->out_port_id;
433  p->dpdk_v.out_queue_id = ptv->queue_id;
434  p->livedev = ptv->livedev;
435 
436  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
438  } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
439  uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
440  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
441  (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
442  SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
444  } else {
445  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
446  SCLogDebug("HW detected BAD IP checksum");
447  // chsum recalc will not be triggered but rule keyword check will be
448  p->l3.csum_set = true;
449  p->l3.csum = 0;
450  }
451  if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
452  SCLogDebug("HW detected BAD L4 chsum");
453  p->l4.csum_set = true;
454  p->l4.csum = 0;
455  }
456  }
457  }
458 
459  return p;
460 }
461 
462 static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
463 {
464  static thread_local bool segmented_mbufs_warned = false;
465  if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
466  char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
467  "Check your configuration or report the issue";
468  enum rte_proc_type_t eal_t = rte_eal_process_type();
469  if (eal_t == RTE_PROC_SECONDARY) {
470  SCLogWarning("%s. To avoid segmented mbufs, "
471  "try to increase mbuf size in your primary application",
472  warn_s);
473  } else if (eal_t == RTE_PROC_PRIMARY) {
474  SCLogWarning("%s. To avoid segmented mbufs, "
475  "try to increase MTU in your suricata.yaml",
476  warn_s);
477  }
478 
479  segmented_mbufs_warned = true;
480  }
481 }
482 
483 static void PrintDPDKPortXstats(uint16_t port_id, const char *port_name)
484 {
485  int ret = rte_eth_xstats_get(port_id, NULL, 0);
486  if (ret <= 0) {
487  SCLogPerf("%s: unable to obtain rte_eth_xstats (%s)", port_name,
488  ret == 0 ? "not supported" : rte_strerror(-ret));
489  return;
490  }
491  unsigned int len = (unsigned int)ret;
492  struct rte_eth_xstat_name *xstats_names = NULL;
493  struct rte_eth_xstat *xstats = SCCalloc(len, sizeof(*xstats));
494  if (xstats == NULL) {
495  SCLogWarning("Failed to allocate memory for the rte_eth_xstat structure");
496  return;
497  }
498 
499  ret = rte_eth_xstats_get(port_id, xstats, len);
500  if (ret < 0 || (unsigned int)ret > len) {
501  SCLogPerf("%s: unable to obtain rte_eth_xstats (%s)", port_name,
502  ret < 0 ? rte_strerror(-ret) : "table size too small");
503  goto cleanup;
504  }
505  xstats_names = SCCalloc(len, sizeof(*xstats_names));
506  if (xstats_names == NULL) {
507  SCLogWarning("Failed to allocate memory for the rte_eth_xstat_name array");
508  goto cleanup;
509  }
510  ret = rte_eth_xstats_get_names(port_id, xstats_names, len);
511  if (ret < 0 || (unsigned int)ret > len) {
512  SCLogPerf("%s: unable to obtain names of rte_eth_xstats (%s)", port_name,
513  ret < 0 ? rte_strerror(-ret) : "table size too small");
514  goto cleanup;
515  }
516  for (unsigned int i = 0; i < len; i++) {
517  if (xstats[i].value > 0)
518  SCLogPerf("Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].name,
519  xstats[i].value);
520  }
521 
522 cleanup:
523  if (xstats != NULL)
524  SCFree(xstats);
525  if (xstats_names != NULL)
526  SCFree(xstats_names);
527 }
528 
529 static void HandleShutdown(DPDKThreadVars *ptv)
530 {
531  SCLogDebug("Stopping Suricata!");
532  SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
533  while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
534  rte_delay_us(10);
535  }
536  // Dump counters while device is still running - some drivers (e.g. BNXT) fail
537  // to report stats after the device is stopped
538  DPDKDumpCounters(ptv);
539  if (ptv->queue_id == 0) {
540  PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
541  rte_delay_us(20); // wait for all threads to get out of the sync loop
542  SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
543  // If Suricata runs in peered mode, the peer threads might still want to send
544  // packets to our port. Instead, we know, that we are done with the peered port, so
545  // we stop it. The peered threads will stop our port.
546  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
547  rte_eth_dev_stop(ptv->out_port_id);
548  } else {
549  // in IDS we stop our port - no peer threads are running
550  rte_eth_dev_stop(ptv->port_id);
551  }
552  }
553 }
554 
555 static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
556 {
557  static thread_local SCTime_t last_dump = { 0 };
558  SCTime_t current_time = TimeGet();
559  /* Trigger one dump of stats every second */
560  if (current_time.secs != last_dump.secs) {
561  DPDKDumpCounters(ptv);
562  last_dump = current_time;
563  }
564 }
565 
566 /**
567  * \brief Main DPDK reading Loop function
568  */
569 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
570 {
571  SCEnter();
572  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
573  ptv->slot = ((TmSlot *)slot)->slot_next;
574  TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
575  if (ret != TM_ECODE_OK) {
576  SCReturnInt(ret);
577  }
578  while (true) {
579  if (unlikely(suricata_ctl_flags != 0)) {
580  HandleShutdown(ptv);
581  break;
582  }
583 
584  uint16_t nb_rx =
585  rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
586  if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
587  continue;
588  }
589 
590  ptv->pkts += (uint64_t)nb_rx;
591  for (uint16_t i = 0; i < nb_rx; i++) {
592  Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
593  if (p == NULL) {
594  rte_pktmbuf_free(ptv->received_mbufs[i]);
595  continue;
596  }
597  DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
598  PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
599  rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
600  if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
601  TmqhOutputPacketpool(ptv->tv, p);
602  DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
603  SCReturnInt(EXIT_FAILURE);
604  }
605  }
606 
607  PeriodicDPDKDumpCounters(ptv);
609  }
610 
612 }
613 
614 /**
615  * \brief Init function for ReceiveDPDK.
616  *
617  * \param tv pointer to ThreadVars
618  * \param initdata pointer to the interface passed from the user
619  * \param data pointer gets populated with DPDKThreadVars
620  *
621  */
622 static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
623 {
624  SCEnter();
625  int retval, thread_numa;
626  DPDKThreadVars *ptv = NULL;
627  DPDKIfaceConfig *dpdk_config = (DPDKIfaceConfig *)initdata;
628 
629  if (initdata == NULL) {
630  SCLogError("DPDK configuration is NULL in thread initialization");
631  goto fail;
632  }
633 
634  ptv = SCCalloc(1, sizeof(DPDKThreadVars));
635  if (unlikely(ptv == NULL)) {
636  SCLogError("Unable to allocate memory");
637  goto fail;
638  }
639 
640  ptv->tv = tv;
641  ptv->pkts = 0;
642  ptv->bytes = 0;
643  ptv->livedev = LiveGetDevice(dpdk_config->iface);
644 
645  ptv->capture_dpdk_packets = StatsRegisterCounter("capture.packets", &ptv->tv->stats);
646  ptv->capture_dpdk_rx_errs = StatsRegisterCounter("capture.rx_errors", &ptv->tv->stats);
647  ptv->capture_dpdk_tx_errs = StatsRegisterCounter("capture.tx_errors", &ptv->tv->stats);
648  ptv->capture_dpdk_imissed = StatsRegisterCounter("capture.dpdk.imissed", &ptv->tv->stats);
649  ptv->capture_dpdk_rx_no_mbufs = StatsRegisterCounter("capture.dpdk.no_mbufs", &ptv->tv->stats);
650  ptv->capture_dpdk_ierrors = StatsRegisterCounter("capture.dpdk.ierrors", &ptv->tv->stats);
651 
652  ptv->copy_mode = dpdk_config->copy_mode;
653  ptv->checksum_mode = dpdk_config->checksum_mode;
654 
655  ptv->threads = dpdk_config->threads;
656  ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) ? true : false;
657  ptv->port_id = dpdk_config->port_id;
658  ptv->out_port_id = dpdk_config->out_port_id;
659  ptv->port_socket_id = dpdk_config->socket_id;
660 
661  thread_numa = GetNumaNode();
662  if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
663  thread_numa != ptv->port_socket_id) {
664  SC_ATOMIC_ADD(dpdk_config->inconsistent_numa_cnt, 1);
665  SCLogPerf("%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
666  ptv->port_socket_id, thread_numa);
667  }
668 
669  ptv->workers_sync = dpdk_config->workers_sync;
670  uint16_t queue_id = SC_ATOMIC_ADD(dpdk_config->queue_id, 1);
671  ptv->queue_id = queue_id;
672 
673  // the last thread starts the device
674  if (queue_id == dpdk_config->threads - 1) {
675  retval = rte_eth_dev_start(ptv->port_id);
676  if (retval < 0) {
677  SCLogError("%s: error (%s) during device startup", dpdk_config->iface,
678  rte_strerror(-retval));
679  goto fail;
680  }
681 
682  struct rte_eth_dev_info dev_info;
683  retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
684  if (retval != 0) {
685  SCLogError("%s: error (%s) when getting device info", dpdk_config->iface,
686  rte_strerror(-retval));
687  goto fail;
688  }
689 
690  uint32_t timeout = dpdk_config->linkup_timeout * 10;
691  while (timeout > 0) {
692  struct rte_eth_link link = { 0 };
693  retval = rte_eth_link_get_nowait(ptv->port_id, &link);
694  if (retval != 0) {
695  if (retval == -ENOTSUP) {
696  SCLogInfo("%s: link status not supported, skipping", dpdk_config->iface);
697  } else {
698  SCLogInfo("%s: error (%s) when getting link status, skipping",
699  dpdk_config->iface, rte_strerror(-retval));
700  }
701  break;
702  }
703  if (link.link_status) {
704  char link_status_str[RTE_ETH_LINK_MAX_STR_LEN];
705 #if RTE_VERSION >= RTE_VERSION_NUM(20, 11, 0, 0)
706 #pragma GCC diagnostic push
707 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
708  rte_eth_link_to_str(link_status_str, sizeof(link_status_str), &link);
709 #pragma GCC diagnostic pop
710 #else
711  snprintf(link_status_str, sizeof(link_status_str),
712  "Link Up, speed %u Mbps, %s", // 22 chars + 10 for digits + 11 for duplex
713  link.link_speed,
714  (link.link_duplex == ETH_LINK_FULL_DUPLEX) ? "full-duplex" : "half-duplex");
715 #endif
716 
717  SCLogInfo("%s: %s", dpdk_config->iface, link_status_str);
718  break;
719  }
720 
721  rte_delay_ms(100);
722  timeout--;
723  }
724 
725  if (dpdk_config->linkup_timeout && timeout == 0) {
726  SCLogWarning("%s: link is down, trying to continue anyway", dpdk_config->iface);
727  }
728 
729  // some PMDs requires additional actions only after the device has started
730  DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
731 
732  uint16_t inconsistent_numa_cnt = SC_ATOMIC_GET(dpdk_config->inconsistent_numa_cnt);
733  if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
734  SCLogWarning("%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
735  dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
736  } else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
737  SCLogNotice(
738  "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
739  dpdk_config->iface);
740  }
741  if (ptv->intr_enabled) {
742  rte_spinlock_init(&intr_lock[ptv->port_id]);
743  }
744  }
745 
746  *data = (void *)ptv;
747  dpdk_config->DerefFunc(dpdk_config);
749 
750 fail:
751  if (dpdk_config != NULL)
752  dpdk_config->DerefFunc(dpdk_config);
753  if (ptv != NULL)
754  SCFree(ptv);
756 }
757 
758 /**
759  * \brief DeInit function closes dpdk at exit.
760  * \param tv pointer to ThreadVars
761  * \param data pointer that gets cast into DPDKThreadVars for ptv
762  */
763 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *tv, void *data)
764 {
765  SCEnter();
766  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
767 
768  if (ptv->queue_id == 0) {
769  struct rte_eth_dev_info dev_info;
770  int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
771  if (retval != 0) {
772  SCLogError("%s: error (%s) when getting device info", ptv->livedev->dev,
773  rte_strerror(-retval));
775  }
776 
777  DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
778 
779  if (ptv->workers_sync) {
780  SCFree(ptv->workers_sync);
781  }
782  }
783 
784  SCFree(ptv);
786 }
787 
788 /**
789  * \brief This function passes off to link type decoders.
790  *
791  * DecodeDPDK decodes packets from DPDK and passes
792  * them off to the proper link type decoder.
793  *
794  * \param t pointer to ThreadVars
795  * \param p pointer to the current packet
796  * \param data pointer that gets cast into DPDKThreadVars for ptv
797  */
798 static TmEcode DecodeDPDK(ThreadVars *tv, Packet *p, void *data)
799 {
800  SCEnter();
802 
804 
805  /* update counters */
807 
808  /* If suri has set vlan during reading, we increase vlan counter */
809  if (p->vlan_idx) {
811  }
812 
813  /* call the decoder */
814  DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
815 
817 
819 }
820 
821 static TmEcode DecodeDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
822 {
823  SCEnter();
824  DecodeThreadVars *dtv = NULL;
825 
827 
828  if (dtv == NULL)
830 
832 
833  *data = (void *)dtv;
834 
836 }
837 
838 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data)
839 {
840  SCEnter();
841  if (data != NULL)
842  DecodeThreadVarsFree(tv, data);
844 }
845 
846 #endif /* HAVE_DPDK */
847 /* eof */
848 /**
849  * @}
850  */
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:77
PacketL4::csum_set
bool csum_set
Definition: decode.h:466
PacketCheckAction
bool PacketCheckAction(const Packet *p, const uint8_t a)
Definition: packet.c:50
util-device-private.h
tm-threads.h
len
uint8_t len
Definition: app-layer-dnp3.h:2
TMM_RECEIVEDPDK
@ TMM_RECEIVEDPDK
Definition: tm-threads-common.h:56
CHECKSUM_VALIDATION_OFFLOAD
@ CHECKSUM_VALIDATION_OFFLOAD
Definition: decode.h:48
offset
uint64_t offset
Definition: util-streaming-buffer.h:0
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
PacketFreeOrRelease
void PacketFreeOrRelease(Packet *p)
Return a packet to where it was allocated.
Definition: decode.c:280
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(StatsThreadContext *stats)
Definition: counters.c:484
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
PKT_IS_PSEUDOPKT
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition: decode.h:1322
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
DPDK_COPY_MODE_IPS
@ DPDK_COPY_MODE_IPS
Definition: source-dpdk.h:34
PacketL4::csum
uint16_t csum
Definition: decode.h:467
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:282
StatsRegisterCounter
StatsCounterId StatsRegisterCounter(const char *name, StatsThreadContext *stats)
Registers a normal, unqualified counter.
Definition: counters.c:1039
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:101
name
const char * name
Definition: detect-engine-proto.c:48
TMM_DECODEDPDK
@ TMM_DECODEDPDK
Definition: tm-threads-common.h:57
action-globals.h
Packet_::flags
uint32_t flags
Definition: decode.h:544
DpdkCopyModeEnum
DpdkCopyModeEnum
Definition: source-dpdk.h:34
threads.h
Packet_::vlan_idx
uint8_t vlan_idx
Definition: decode.h:529
LiveDevice_
Definition: util-device-private.h:32
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
NoDPDKSupportExit
TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **)
this function prints an error message and exits.
Definition: source-dpdk.c:79
util-privs.h
CHECKSUM_VALIDATION_DISABLE
@ CHECKSUM_VALIDATION_DISABLE
Definition: decode.h:43
StatsCounterId
Definition: counters.h:30
PacketDecodeFinalize
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition: decode.c:236
DPDKIfaceConfig_
Definition: source-dpdk.h:53
util-dpdk-ice.h
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:305
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:82
tmqh-packetpool.h
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:58
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:81
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:53
Packet_::datalink
int datalink
Definition: decode.h:635
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1326
DPDKWorkerSync_
Definition: source-dpdk.h:48
DecodeRegisterPerfCounters
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:632
TmModuleReceiveDPDKRegister
void TmModuleReceiveDPDKRegister(void)
Definition: source-dpdk.c:51
decode.h
PKT_SRC_WIRE
@ PKT_SRC_WIRE
Definition: decode.h:52
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:61
Packet_::ts
SCTime_t ts
Definition: decode.h:555
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
LiveGetDevice
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
Definition: util-device.c:268
SCEnter
#define SCEnter(...)
Definition: util-debug.h:284
GET_PKT_DATA
#define GET_PKT_DATA(p)
Definition: decode.h:209
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:56
StatsCounterIncr
void StatsCounterIncr(StatsThreadContext *stats, StatsCounterId id)
Increments the local counter.
Definition: counters.c:166
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:262
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:317
PacketPoolWait
void PacketPoolWait(void)
Definition: tmqh-packetpool.c:71
Packet_
Definition: decode.h:501
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
type
uint16_t type
Definition: decode-vlan.c:106
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:29
GET_PKT_LEN
#define GET_PKT_LEN(p)
Definition: decode.h:208
TimeGet
SCTime_t TimeGet(void)
Definition: util-time.c:152
Packet_::l4
struct PacketL4 l4
Definition: decode.h:601
TmSlot_
Definition: tm-threads.h:53
PKT_IGNORE_CHECKSUM
#define PKT_IGNORE_CHECKSUM
Definition: decode.h:1283
SCTime_t
Definition: util-time.h:40
Packet_::livedev
struct LiveDevice_ * livedev
Definition: decode.h:618
DPDK_BURST_TX_WAIT_US
#define DPDK_BURST_TX_WAIT_US
Definition: source-dpdk.h:36
TmEcode
TmEcode
Definition: tm-threads-common.h:80
TmModule_::name
const char * name
Definition: tm-modules.h:48
runmodes.h
SCLogInfo
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:232
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
dtv
DecodeThreadVars * dtv
Definition: fuzz_decodepcapfile.c:34
PacketL3::csum_set
bool csum_set
Definition: decode.h:436
DPDK_IRQ_MODE
#define DPDK_IRQ_MODE
Definition: source-dpdk.h:42
Packet_::ReleasePacket
void(* ReleasePacket)(struct Packet_ *)
Definition: decode.h:591
util-dpdk.h
flags
uint8_t flags
Definition: decode-gre.h:0
DecodeThreadVarsFree
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:840
source-dpdk.h
ChecksumValidationMode
ChecksumValidationMode
Definition: decode.h:42
suricata-common.h
packet.h
ACTION_DROP
#define ACTION_DROP
Definition: action-globals.h:30
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:241
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:51
FatalError
#define FatalError(...)
Definition: util-debug.h:517
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:33
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:52
threadvars.h
StatsCounterSetI64
void StatsCounterSetI64(StatsThreadContext *stats, StatsCounterId id, int64_t x)
set, so overwrite, the value of the local counter
Definition: counters.c:205
Packet_::l3
struct PacketL3 l3
Definition: decode.h:600
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:274
SCFree
#define SCFree(p)
Definition: util-mem.h:61
DecodeThreadVars_
Structure to hold thread specific data for all decode modules.
Definition: decode.h:959
util-dpdk-bonding.h
util-dpdk-mlx5.h
DecodeThreadVarsAlloc
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:822
util-dpdk-ixgbe.h
PacketSetData
int PacketSetData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zero copy is used.
Definition: decode.c:860
util-dpdk-i40e.h
suricata.h
PacketL3::csum
uint16_t csum
Definition: decode.h:437
DecodeThreadVars_::counter_vlan
StatsCounterId counter_vlan
Definition: decode.h:997
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
TmModuleDecodeDPDKRegister
void TmModuleDecodeDPDKRegister(void)
Registration Function for DecodeDPDK.
Definition: source-dpdk.c:65
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:250
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
ThreadVars_::stats
StatsThreadContext stats
Definition: threadvars.h:121
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:288
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:297
SC_CAP_NET_RAW
#define SC_CAP_NET_RAW
Definition: util-privs.h:32
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:80
DPDK_COPY_MODE_TAP
@ DPDK_COPY_MODE_TAP
Definition: source-dpdk.h:34
DecodeUpdatePacketCounters
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition: decode.c:790
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:176