suricata
source-dpdk.c
Go to the documentation of this file.
1 /* Copyright (C) 2021-2025 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \defgroup dpdk DPDK running mode
20  *
21  * @{
22  */
23 
24 /**
25  * \file
26  *
27  * \author Lukas Sismis <lukas.sismis@gmail.com>
28  *
29  * DPDK capture interface
30  *
31  */
32 
33 #include "suricata-common.h"
34 #include "runmodes.h"
35 #include "decode.h"
36 #include "packet.h"
37 #include "source-dpdk.h"
38 #include "suricata.h"
39 #include "threads.h"
40 #include "threadvars.h"
41 #include "tm-threads.h"
42 #include "tmqh-packetpool.h"
43 #include "util-privs.h"
44 #include "util-device-private.h"
45 #include "action-globals.h"
46 
47 #ifndef HAVE_DPDK
48 
49 TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **);
50 
52 {
53  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
60 }
61 
62 /**
63  * \brief Registration Function for DecodeDPDK.
64  */
66 {
67  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
74 }
75 
76 /**
77  * \brief this function prints an error message and exits.
78  */
79 TmEcode NoDPDKSupportExit(ThreadVars *tv, const void *initdata, void **data)
80 {
81  FatalError("Error creating thread %s: you do not have "
82  "support for DPDK enabled, on Linux host please recompile "
83  "with --enable-dpdk",
84  tv->name);
85 }
86 
87 #else /* We have DPDK support */
88 
89 #include "util-affinity.h"
90 #include "util-dpdk.h"
91 #include "util-dpdk-i40e.h"
92 #include "util-dpdk-ice.h"
93 #include "util-dpdk-ixgbe.h"
94 #include "util-dpdk-mlx5.h"
95 #include "util-dpdk-bonding.h"
96 #include <numa.h>
97 
98 #define BURST_SIZE 32
99 // interrupt mode constants
100 #define MIN_ZERO_POLL_COUNT 10U
101 #define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
102 #define MINIMUM_SLEEP_TIME_US 1U
103 #define STANDARD_SLEEP_TIME_US 100U
104 #define MAX_EPOLL_TIMEOUT_MS 500U
105 static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
106 
107 /**
108  * \brief Structure to hold thread specific variables.
109  */
110 typedef struct DPDKThreadVars_ {
111  /* counters */
112  uint64_t pkts;
113  ThreadVars *tv;
114  TmSlot *slot;
115  LiveDevice *livedev;
116  ChecksumValidationMode checksum_mode;
117  bool intr_enabled;
118  /* references to packet and drop counters */
119  uint16_t capture_dpdk_packets;
120  uint16_t capture_dpdk_rx_errs;
121  uint16_t capture_dpdk_imissed;
122  uint16_t capture_dpdk_rx_no_mbufs;
123  uint16_t capture_dpdk_ierrors;
124  uint16_t capture_dpdk_tx_errs;
125  unsigned int flags;
126  int threads;
127  /* for IPS */
128  DpdkCopyModeEnum copy_mode;
129  uint16_t out_port_id;
130  /* Entry in the peers_list */
131 
132  uint64_t bytes;
133  uint64_t accepted;
134  uint64_t dropped;
135  uint16_t port_id;
136  uint16_t queue_id;
137  int32_t port_socket_id;
138  struct rte_mbuf *received_mbufs[BURST_SIZE];
139  DPDKWorkerSync *workers_sync;
140 } DPDKThreadVars;
141 
142 static TmEcode ReceiveDPDKThreadInit(ThreadVars *, const void *, void **);
143 static void ReceiveDPDKThreadExitStats(ThreadVars *, void *);
144 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *, void *);
145 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot);
146 
147 static TmEcode DecodeDPDKThreadInit(ThreadVars *, const void *, void **);
148 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data);
149 static TmEcode DecodeDPDK(ThreadVars *, Packet *, void *);
150 
151 static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset);
152 static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
153 {
154  uint32_t event_data = port_id << UINT16_WIDTH | queue_id;
155  int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
156  RTE_INTR_EVENT_ADD, (void *)((uintptr_t)event_data));
157 
158  if (ret != 0) {
159  SCLogError("%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
160  queue_id, rte_strerror(-ret));
161  return false;
162  }
163  return true;
164 }
165 
166 static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
167 {
168  if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
169  return MINIMUM_SLEEP_TIME_US;
170 
171  return STANDARD_SLEEP_TIME_US;
172 }
173 
174 static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool on)
175 {
176  rte_spinlock_lock(&(intr_lock[port_id]));
177 
178  if (on)
179  rte_eth_dev_rx_intr_enable(port_id, queue_id);
180  else
181  rte_eth_dev_rx_intr_disable(port_id, queue_id);
182 
183  rte_spinlock_unlock(&(intr_lock[port_id]));
184 }
185 
186 static inline void DPDKFreeMbufArray(
187  struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
188 {
189  for (int i = offset; i < mbuf_cnt; i++) {
190  rte_pktmbuf_free(mbuf_array[i]);
191  }
192 }
193 
194 static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
195 {
196  if (strcmp(driver_name, "net_bonding") == 0)
197  driver_name = BondingDeviceDriverGet(ptv->port_id);
198  if (strcmp(driver_name, "net_i40e") == 0)
199  i40eDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
200  else if (strcmp(driver_name, "net_ixgbe") == 0)
201  ixgbeDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
202  else if (strcmp(driver_name, "net_ice") == 0)
203  iceDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
204  else if (strcmp(driver_name, "mlx5_pci") == 0)
205  mlx5DeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
206 }
207 
208 static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
209 {
210  if (strcmp(driver_name, "net_bonding") == 0) {
211  driver_name = BondingDeviceDriverGet(ptv->port_id);
212  }
213 
214  if (
215 #if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
216  strcmp(driver_name, "net_i40e") == 0 ||
217 #endif /* RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0) */
218  strcmp(driver_name, "net_ixgbe") == 0 || strcmp(driver_name, "net_ice") == 0 ||
219  strcmp(driver_name, "mlx5_pci") == 0) {
220  // Flush the RSS rules that have been inserted in the post start section
221  struct rte_flow_error flush_error = { 0 };
222  int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
223  if (retval != 0) {
224  SCLogError("%s: unable to flush rte_flow rules: %s Flush error msg: %s",
225  ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
226  }
227  }
228 }
229 
230 /**
231  * Attempts to retrieve NUMA node id on which the caller runs
232  * @return NUMA id on success, -1 otherwise
233  */
234 static int GetNumaNode(void)
235 {
236  int cpu = 0;
237  int node = -1;
238 
239 #if defined(__linux__)
240  cpu = sched_getcpu();
241  node = numa_node_of_cpu(cpu);
242 #else
243  SCLogWarning("NUMA node retrieval is not supported on this OS.");
244 #endif
245 
246  return node;
247 }
248 
249 /**
250  * \brief Registration Function for ReceiveDPDK.
251  * \todo Unit tests are needed for this module.
252  */
254 {
255  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
256  tmm_modules[TMM_RECEIVEDPDK].ThreadInit = ReceiveDPDKThreadInit;
258  tmm_modules[TMM_RECEIVEDPDK].PktAcqLoop = ReceiveDPDKLoop;
260  tmm_modules[TMM_RECEIVEDPDK].ThreadExitPrintStats = ReceiveDPDKThreadExitStats;
261  tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = ReceiveDPDKThreadDeinit;
264 }
265 
266 /**
267  * \brief Registration Function for DecodeDPDK.
268  * \todo Unit tests are needed for this module.
269  */
271 {
272  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
273  tmm_modules[TMM_DECODEDPDK].ThreadInit = DecodeDPDKThreadInit;
274  tmm_modules[TMM_DECODEDPDK].Func = DecodeDPDK;
276  tmm_modules[TMM_DECODEDPDK].ThreadDeinit = DecodeDPDKThreadDeinit;
279 }
280 
281 static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
282 {
283  /* Some NICs (e.g. Intel) do not support queue statistics and the drops can be fetched only on
284  * the port level. Therefore setting it to the first worker to have at least continuous update
285  * on the dropped packets. */
286  if (ptv->queue_id == 0) {
287  struct rte_eth_stats eth_stats;
288  int retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
289  if (unlikely(retval != 0)) {
290  SCLogError("%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
291  return;
292  }
293 
294  StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets,
295  ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
296  SC_ATOMIC_SET(ptv->livedev->pkts,
297  eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
298  StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_errs,
299  eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
300  StatsSetUI64(ptv->tv, ptv->capture_dpdk_imissed, eth_stats.imissed);
301  StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
302  StatsSetUI64(ptv->tv, ptv->capture_dpdk_ierrors, eth_stats.ierrors);
303  StatsSetUI64(ptv->tv, ptv->capture_dpdk_tx_errs, eth_stats.oerrors);
305  ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
306  } else {
307  StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets, ptv->pkts);
308  }
309 }
310 
311 static void DPDKReleasePacket(Packet *p)
312 {
313  int retval;
314  /* Need to be in copy mode and need to detect early release
315  where Ethernet header could not be set (and pseudo packet)
316  When enabling promiscuous mode on Intel cards, 2 ICMPv6 packets are generated.
317  These get into the infinite cycle between the NIC and the switch in some cases */
318  if ((p->dpdk_v.copy_mode == DPDK_COPY_MODE_TAP ||
319  (p->dpdk_v.copy_mode == DPDK_COPY_MODE_IPS && !PacketCheckAction(p, ACTION_DROP)))
320 #if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
321  && !(PacketIsICMPv6(p) && PacketGetICMPv6(p)->type == 143)
322 #endif
323  ) {
325  retval =
326  rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
327  // rte_eth_tx_burst can return only 0 (failure) or 1 (success) because we are only
328  // transmitting burst of size 1 and the function rte_eth_tx_burst returns number of
329  // successfully sent packets.
330  if (unlikely(retval < 1)) {
331  // sometimes a repeated transmit can help to send out the packet
332  rte_delay_us(DPDK_BURST_TX_WAIT_US);
333  retval = rte_eth_tx_burst(
334  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
335  if (unlikely(retval < 1)) {
336  SCLogDebug("Unable to transmit the packet on port %u queue %u",
337  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
338  rte_pktmbuf_free(p->dpdk_v.mbuf);
339  p->dpdk_v.mbuf = NULL;
340  }
341  }
342  } else {
343  rte_pktmbuf_free(p->dpdk_v.mbuf);
344  p->dpdk_v.mbuf = NULL;
345  }
346 
348 }
349 
350 static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
351 {
352  SCEnter();
353  // Indicate that the thread is actually running its application level
354  // code (i.e., it can poll packets)
356  PacketPoolWait();
357 
358  rte_eth_stats_reset(ptv->port_id);
359  rte_eth_xstats_reset(ptv->port_id);
360 
361  if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
363 
365 }
366 
367 static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
368 {
369  static thread_local uint64_t last_timeout_msec = 0;
370  SCTime_t t = TimeGet();
371  uint64_t msecs = SCTIME_MSECS(t);
372  if (msecs > last_timeout_msec + 100) {
373  TmThreadsCaptureHandleTimeout(tv, NULL);
374  last_timeout_msec = msecs;
375  }
376 }
377 
378 /**
379  * \brief Decides if it should retry the packet poll or continue with the packet processing
380  * \return true if the poll should be retried, false otherwise
381  */
382 static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
383 {
384  static thread_local uint32_t zero_pkt_polls_cnt = 0;
385 
386  if (nb_rx > 0) {
387  zero_pkt_polls_cnt = 0;
388  return false;
389  }
390 
391  LoopHandleTimeoutOnIdle(tv);
392  if (!ptv->intr_enabled)
393  return true;
394 
395  zero_pkt_polls_cnt++;
396  if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
397  return true;
398 
399  uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
400  if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
401  rte_delay_us(pwd_idle_hint);
402  } else {
403  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
404  struct rte_epoll_event event;
405  rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
406  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
407  return true;
408  }
409 
410  return false;
411 }
412 
413 /**
414  * \brief Initializes a packet from an mbuf
415  * \return true if the packet was initialized successfully, false otherwise
416  */
417 static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
418 {
420  if (unlikely(p == NULL)) {
421  return NULL;
422  }
425  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
427  }
428 
429  p->ts = TimeGet();
430  p->dpdk_v.mbuf = mbuf;
431  p->ReleasePacket = DPDKReleasePacket;
432  p->dpdk_v.copy_mode = ptv->copy_mode;
433  p->dpdk_v.out_port_id = ptv->out_port_id;
434  p->dpdk_v.out_queue_id = ptv->queue_id;
435  p->livedev = ptv->livedev;
436 
437  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
439  } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
440  uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
441  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
442  (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
443  SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
445  } else {
446  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
447  SCLogDebug("HW detected BAD IP checksum");
448  // chsum recalc will not be triggered but rule keyword check will be
449  p->l3.csum_set = true;
450  p->l3.csum = 0;
451  }
452  if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
453  SCLogDebug("HW detected BAD L4 chsum");
454  p->l4.csum_set = true;
455  p->l4.csum = 0;
456  }
457  }
458  }
459 
460  return p;
461 }
462 
463 static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
464 {
465  static thread_local bool segmented_mbufs_warned = false;
466  if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
467  char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
468  "Check your configuration or report the issue";
469  enum rte_proc_type_t eal_t = rte_eal_process_type();
470  if (eal_t == RTE_PROC_SECONDARY) {
471  SCLogWarning("%s. To avoid segmented mbufs, "
472  "try to increase mbuf size in your primary application",
473  warn_s);
474  } else if (eal_t == RTE_PROC_PRIMARY) {
475  SCLogWarning("%s. To avoid segmented mbufs, "
476  "try to increase MTU in your suricata.yaml",
477  warn_s);
478  }
479 
480  segmented_mbufs_warned = true;
481  }
482 }
483 
484 static void HandleShutdown(DPDKThreadVars *ptv)
485 {
486  SCLogDebug("Stopping Suricata!");
487  SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
488  while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
489  rte_delay_us(10);
490  }
491  if (ptv->queue_id == 0) {
492  rte_delay_us(20); // wait for all threads to get out of the sync loop
493  SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
494  // If Suricata runs in peered mode, the peer threads might still want to send
495  // packets to our port. Instead, we know, that we are done with the peered port, so
496  // we stop it. The peered threads will stop our port.
497  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
498  rte_eth_dev_stop(ptv->out_port_id);
499  } else {
500  // in IDS we stop our port - no peer threads are running
501  rte_eth_dev_stop(ptv->port_id);
502  }
503  }
504  DPDKDumpCounters(ptv);
505 }
506 
507 static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
508 {
509  static thread_local SCTime_t last_dump = { 0 };
510  SCTime_t current_time = TimeGet();
511  /* Trigger one dump of stats every second */
512  if (current_time.secs != last_dump.secs) {
513  DPDKDumpCounters(ptv);
514  last_dump = current_time;
515  }
516 }
517 
518 /**
519  * \brief Main DPDK reading Loop function
520  */
521 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
522 {
523  SCEnter();
524  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
525  ptv->slot = ((TmSlot *)slot)->slot_next;
526  TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
527  if (ret != TM_ECODE_OK) {
528  SCReturnInt(ret);
529  }
530  while (true) {
531  if (unlikely(suricata_ctl_flags != 0)) {
532  HandleShutdown(ptv);
533  break;
534  }
535 
536  uint16_t nb_rx =
537  rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
538  if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
539  continue;
540  }
541 
542  ptv->pkts += (uint64_t)nb_rx;
543  for (uint16_t i = 0; i < nb_rx; i++) {
544  Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
545  if (p == NULL) {
546  rte_pktmbuf_free(ptv->received_mbufs[i]);
547  continue;
548  }
549  DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
550  PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
551  rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
552  if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
553  TmqhOutputPacketpool(ptv->tv, p);
554  DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
555  SCReturnInt(EXIT_FAILURE);
556  }
557  }
558 
559  PeriodicDPDKDumpCounters(ptv);
561  }
562 
564 }
565 
566 /**
567  * \brief Init function for ReceiveDPDK.
568  *
569  * \param tv pointer to ThreadVars
570  * \param initdata pointer to the interface passed from the user
571  * \param data pointer gets populated with DPDKThreadVars
572  *
573  */
574 static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
575 {
576  SCEnter();
577  int retval, thread_numa;
578  DPDKThreadVars *ptv = NULL;
579  DPDKIfaceConfig *dpdk_config = (DPDKIfaceConfig *)initdata;
580 
581  if (initdata == NULL) {
582  SCLogError("DPDK configuration is NULL in thread initialization");
583  goto fail;
584  }
585 
586  ptv = SCCalloc(1, sizeof(DPDKThreadVars));
587  if (unlikely(ptv == NULL)) {
588  SCLogError("Unable to allocate memory");
589  goto fail;
590  }
591 
592  ptv->tv = tv;
593  ptv->pkts = 0;
594  ptv->bytes = 0;
595  ptv->livedev = LiveGetDevice(dpdk_config->iface);
596 
597  ptv->capture_dpdk_packets = StatsRegisterCounter("capture.packets", ptv->tv);
598  ptv->capture_dpdk_rx_errs = StatsRegisterCounter("capture.rx_errors", ptv->tv);
599  ptv->capture_dpdk_tx_errs = StatsRegisterCounter("capture.tx_errors", ptv->tv);
600  ptv->capture_dpdk_imissed = StatsRegisterCounter("capture.dpdk.imissed", ptv->tv);
601  ptv->capture_dpdk_rx_no_mbufs = StatsRegisterCounter("capture.dpdk.no_mbufs", ptv->tv);
602  ptv->capture_dpdk_ierrors = StatsRegisterCounter("capture.dpdk.ierrors", ptv->tv);
603 
604  ptv->copy_mode = dpdk_config->copy_mode;
605  ptv->checksum_mode = dpdk_config->checksum_mode;
606 
607  ptv->threads = dpdk_config->threads;
608  ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) ? true : false;
609  ptv->port_id = dpdk_config->port_id;
610  ptv->out_port_id = dpdk_config->out_port_id;
611  ptv->port_socket_id = dpdk_config->socket_id;
612 
613  thread_numa = GetNumaNode();
614  if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
615  thread_numa != ptv->port_socket_id) {
616  SC_ATOMIC_ADD(dpdk_config->inconsistent_numa_cnt, 1);
617  SCLogPerf("%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
618  ptv->port_socket_id, thread_numa);
619  }
620 
621  ptv->workers_sync = dpdk_config->workers_sync;
622  uint16_t queue_id = SC_ATOMIC_ADD(dpdk_config->queue_id, 1);
623  ptv->queue_id = queue_id;
624 
625  // the last thread starts the device
626  if (queue_id == dpdk_config->threads - 1) {
627  retval = rte_eth_dev_start(ptv->port_id);
628  if (retval < 0) {
629  SCLogError("%s: error (%s) during device startup", dpdk_config->iface,
630  rte_strerror(-retval));
631  goto fail;
632  }
633 
634  struct rte_eth_dev_info dev_info;
635  retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
636  if (retval != 0) {
637  SCLogError("%s: error (%s) when getting device info", dpdk_config->iface,
638  rte_strerror(-retval));
639  goto fail;
640  }
641 
642  uint32_t timeout = dpdk_config->linkup_timeout * 10;
643  while (timeout > 0) {
644  struct rte_eth_link link = { 0 };
645  retval = rte_eth_link_get_nowait(ptv->port_id, &link);
646  if (retval != 0) {
647  if (retval == -ENOTSUP) {
648  SCLogInfo("%s: link status not supported, skipping", dpdk_config->iface);
649  } else {
650  SCLogInfo("%s: error (%s) when getting link status, skipping",
651  dpdk_config->iface, rte_strerror(-retval));
652  }
653  break;
654  }
655  if (link.link_status) {
656  char link_status_str[RTE_ETH_LINK_MAX_STR_LEN];
657 #if RTE_VERSION >= RTE_VERSION_NUM(20, 11, 0, 0)
658 #pragma GCC diagnostic push
659 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
660  rte_eth_link_to_str(link_status_str, sizeof(link_status_str), &link);
661 #pragma GCC diagnostic pop
662 #else
663  snprintf(link_status_str, sizeof(link_status_str),
664  "Link Up, speed %u Mbps, %s", // 22 chars + 10 for digits + 11 for duplex
665  link.link_speed,
666  (link.link_duplex == ETH_LINK_FULL_DUPLEX) ? "full-duplex" : "half-duplex");
667 #endif
668 
669  SCLogInfo("%s: %s", dpdk_config->iface, link_status_str);
670  break;
671  }
672 
673  rte_delay_ms(100);
674  timeout--;
675  }
676 
677  if (dpdk_config->linkup_timeout && timeout == 0) {
678  SCLogWarning("%s: link is down, trying to continue anyway", dpdk_config->iface);
679  }
680 
681  // some PMDs requires additional actions only after the device has started
682  DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
683 
684  uint16_t inconsistent_numa_cnt = SC_ATOMIC_GET(dpdk_config->inconsistent_numa_cnt);
685  if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
686  SCLogWarning("%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
687  dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
688  } else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
689  SCLogNotice(
690  "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
691  dpdk_config->iface);
692  }
693  if (ptv->intr_enabled) {
694  rte_spinlock_init(&intr_lock[ptv->port_id]);
695  }
696  }
697 
698  *data = (void *)ptv;
699  dpdk_config->DerefFunc(dpdk_config);
701 
702 fail:
703  if (dpdk_config != NULL)
704  dpdk_config->DerefFunc(dpdk_config);
705  if (ptv != NULL)
706  SCFree(ptv);
708 }
709 
710 static void PrintDPDKPortXstats(uint32_t port_id, const char *port_name)
711 {
712  struct rte_eth_xstat *xstats;
713  struct rte_eth_xstat_name *xstats_names;
714 
715  int32_t len = rte_eth_xstats_get(port_id, NULL, 0);
716  if (len < 0)
717  FatalError("Error (%s) getting count of rte_eth_xstats failed on port %s",
718  rte_strerror(-len), port_name);
719 
720  xstats = SCCalloc(len, sizeof(*xstats));
721  if (xstats == NULL)
722  FatalError("Failed to allocate memory for the rte_eth_xstat structure");
723 
724  int32_t ret = rte_eth_xstats_get(port_id, xstats, len);
725  if (ret < 0 || ret > len) {
726  SCFree(xstats);
727  FatalError("Error (%s) getting rte_eth_xstats failed on port %s", rte_strerror(-ret),
728  port_name);
729  }
730  xstats_names = SCCalloc(len, sizeof(*xstats_names));
731  if (xstats_names == NULL) {
732  SCFree(xstats);
733  FatalError("Failed to allocate memory for the rte_eth_xstat_name array");
734  }
735  ret = rte_eth_xstats_get_names(port_id, xstats_names, len);
736  if (ret < 0 || ret > len) {
737  SCFree(xstats);
738  SCFree(xstats_names);
739  FatalError("Error (%s) getting names of rte_eth_xstats failed on port %s",
740  rte_strerror(-ret), port_name);
741  }
742  for (int32_t i = 0; i < len; i++) {
743  if (xstats[i].value > 0)
744  SCLogPerf("Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].name,
745  xstats[i].value);
746  }
747 
748  SCFree(xstats);
749  SCFree(xstats_names);
750 }
751 
752 /**
753  * \brief This function prints stats to the screen at exit.
754  * \param tv pointer to ThreadVars
755  * \param data pointer that gets cast into DPDKThreadVars for ptv
756  */
757 static void ReceiveDPDKThreadExitStats(ThreadVars *tv, void *data)
758 {
759  SCEnter();
760  int retval;
761  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
762 
763  if (ptv->queue_id == 0) {
764  struct rte_eth_stats eth_stats;
765  PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
766  retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
767  if (unlikely(retval != 0)) {
768  SCLogError("%s: failed to get stats (%s)", ptv->livedev->dev, strerror(-retval));
769  SCReturn;
770  }
771  SCLogPerf("%s: total RX stats: packets %" PRIu64 " bytes: %" PRIu64 " missed: %" PRIu64
772  " errors: %" PRIu64 " nombufs: %" PRIu64,
773  ptv->livedev->dev, eth_stats.ipackets, eth_stats.ibytes, eth_stats.imissed,
774  eth_stats.ierrors, eth_stats.rx_nombuf);
775  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS)
776  SCLogPerf("%s: total TX stats: packets %" PRIu64 " bytes: %" PRIu64 " errors: %" PRIu64,
777  ptv->livedev->dev, eth_stats.opackets, eth_stats.obytes, eth_stats.oerrors);
778  }
779 
780  DPDKDumpCounters(ptv);
781  SCLogPerf("(%s) received packets %" PRIu64, tv->name, ptv->pkts);
782 }
783 
784 /**
785  * \brief DeInit function closes dpdk at exit.
786  * \param tv pointer to ThreadVars
787  * \param data pointer that gets cast into DPDKThreadVars for ptv
788  */
789 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *tv, void *data)
790 {
791  SCEnter();
792  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
793 
794  if (ptv->queue_id == 0) {
795  struct rte_eth_dev_info dev_info;
796  int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
797  if (retval != 0) {
798  SCLogError("%s: error (%s) when getting device info", ptv->livedev->dev,
799  rte_strerror(-retval));
801  }
802 
803  DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
804 
805  if (ptv->workers_sync) {
806  SCFree(ptv->workers_sync);
807  }
808  }
809 
810  SCFree(ptv);
812 }
813 
814 /**
815  * \brief This function passes off to link type decoders.
816  *
817  * DecodeDPDK decodes packets from DPDK and passes
818  * them off to the proper link type decoder.
819  *
820  * \param t pointer to ThreadVars
821  * \param p pointer to the current packet
822  * \param data pointer that gets cast into DPDKThreadVars for ptv
823  */
824 static TmEcode DecodeDPDK(ThreadVars *tv, Packet *p, void *data)
825 {
826  SCEnter();
828 
830 
831  /* update counters */
833 
834  /* If suri has set vlan during reading, we increase vlan counter */
835  if (p->vlan_idx) {
837  }
838 
839  /* call the decoder */
840  DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
841 
843 
845 }
846 
847 static TmEcode DecodeDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
848 {
849  SCEnter();
850  DecodeThreadVars *dtv = NULL;
851 
853 
854  if (dtv == NULL)
856 
858 
859  *data = (void *)dtv;
860 
862 }
863 
864 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data)
865 {
866  SCEnter();
867  if (data != NULL)
868  DecodeThreadVarsFree(tv, data);
870 }
871 
872 #endif /* HAVE_DPDK */
873 /* eof */
874 /**
875  * @}
876  */
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:73
PacketL4::csum_set
bool csum_set
Definition: decode.h:457
PacketCheckAction
bool PacketCheckAction(const Packet *p, const uint8_t a)
Definition: packet.c:49
util-device-private.h
tm-threads.h
len
uint8_t len
Definition: app-layer-dnp3.h:2
TMM_RECEIVEDPDK
@ TMM_RECEIVEDPDK
Definition: tm-threads-common.h:56
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:166
CHECKSUM_VALIDATION_OFFLOAD
@ CHECKSUM_VALIDATION_OFFLOAD
Definition: decode.h:48
offset
uint64_t offset
Definition: util-streaming-buffer.h:0
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
PacketFreeOrRelease
void PacketFreeOrRelease(Packet *p)
Return a packet to where it was allocated.
Definition: decode.c:250
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
PKT_IS_PSEUDOPKT
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition: decode.h:1305
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
DPDK_COPY_MODE_IPS
@ DPDK_COPY_MODE_IPS
Definition: source-dpdk.h:34
PacketL4::csum
uint16_t csum
Definition: decode.h:458
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:101
TMM_DECODEDPDK
@ TMM_DECODEDPDK
Definition: tm-threads-common.h:57
action-globals.h
Packet_::flags
uint32_t flags
Definition: decode.h:535
DpdkCopyModeEnum
DpdkCopyModeEnum
Definition: source-dpdk.h:34
threads.h
Packet_::vlan_idx
uint8_t vlan_idx
Definition: decode.h:520
LiveDevice_
Definition: util-device-private.h:32
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
StatsSetUI64
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:207
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
NoDPDKSupportExit
TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **)
this function prints an error message and exits.
Definition: source-dpdk.c:79
util-privs.h
CHECKSUM_VALIDATION_DISABLE
@ CHECKSUM_VALIDATION_DISABLE
Definition: decode.h:43
PacketDecodeFinalize
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition: decode.c:206
DPDKIfaceConfig_
Definition: source-dpdk.h:53
util-dpdk-ice.h
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:314
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:82
tmqh-packetpool.h
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:81
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
Packet_::datalink
int datalink
Definition: decode.h:630
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1309
DPDKWorkerSync_
Definition: source-dpdk.h:48
DecodeRegisterPerfCounters
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:602
TmModuleReceiveDPDKRegister
void TmModuleReceiveDPDKRegister(void)
Definition: source-dpdk.c:51
decode.h
PKT_SRC_WIRE
@ PKT_SRC_WIRE
Definition: decode.h:52
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
Packet_::ts
SCTime_t ts
Definition: decode.h:546
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
LiveGetDevice
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
Definition: util-device.c:253
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
GET_PKT_DATA
#define GET_PKT_DATA(p)
Definition: decode.h:209
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:317
PacketPoolWait
void PacketPoolWait(void)
Definition: tmqh-packetpool.c:80
SCReturn
#define SCReturn
Definition: util-debug.h:273
Packet_
Definition: decode.h:492
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
type
uint16_t type
Definition: decode-vlan.c:106
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:29
GET_PKT_LEN
#define GET_PKT_LEN(p)
Definition: decode.h:208
TimeGet
SCTime_t TimeGet(void)
Definition: util-time.c:152
Packet_::l4
struct PacketL4 l4
Definition: decode.h:592
TmSlot_
Definition: tm-threads.h:53
PKT_IGNORE_CHECKSUM
#define PKT_IGNORE_CHECKSUM
Definition: decode.h:1266
SCTime_t
Definition: util-time.h:40
Packet_::livedev
struct LiveDevice_ * livedev
Definition: decode.h:609
DPDK_BURST_TX_WAIT_US
#define DPDK_BURST_TX_WAIT_US
Definition: source-dpdk.h:36
TmEcode
TmEcode
Definition: tm-threads-common.h:80
name
const char * name
Definition: tm-threads.c:2123
TmModule_::name
const char * name
Definition: tm-modules.h:44
DecodeThreadVars_::counter_vlan
uint16_t counter_vlan
Definition: decode.h:991
runmodes.h
SCLogInfo
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:224
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
dtv
DecodeThreadVars * dtv
Definition: fuzz_decodepcapfile.c:33
PacketL3::csum_set
bool csum_set
Definition: decode.h:427
DPDK_IRQ_MODE
#define DPDK_IRQ_MODE
Definition: source-dpdk.h:42
Packet_::ReleasePacket
void(* ReleasePacket)(struct Packet_ *)
Definition: decode.h:582
util-dpdk.h
flags
uint8_t flags
Definition: decode-gre.h:0
DecodeThreadVarsFree
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:793
source-dpdk.h
ChecksumValidationMode
ChecksumValidationMode
Definition: decode.h:42
suricata-common.h
packet.h
ACTION_DROP
#define ACTION_DROP
Definition: action-globals.h:30
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
FatalError
#define FatalError(...)
Definition: util-debug.h:502
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
threadvars.h
Packet_::l3
struct PacketL3 l3
Definition: decode.h:591
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
SCFree
#define SCFree(p)
Definition: util-mem.h:61
DecodeThreadVars_
Structure to hold thread specific data for all decode modules.
Definition: decode.h:954
util-dpdk-bonding.h
util-dpdk-mlx5.h
DecodeThreadVarsAlloc
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:775
util-dpdk-ixgbe.h
PacketSetData
int PacketSetData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zero copy is used.
Definition: decode.c:813
util-dpdk-i40e.h
suricata.h
PacketL3::csum
uint16_t csum
Definition: decode.h:428
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition: counters.c:450
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
TmModuleDecodeDPDKRegister
void TmModuleDecodeDPDKRegister(void)
Registration Function for DecodeDPDK.
Definition: source-dpdk.c:65
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:952
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:275
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:267
SC_CAP_NET_RAW
#define SC_CAP_NET_RAW
Definition: util-privs.h:32
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:76
DPDK_COPY_MODE_TAP
@ DPDK_COPY_MODE_TAP
Definition: source-dpdk.h:34
DecodeUpdatePacketCounters
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition: decode.c:741
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:171