suricata
source-dpdk.c
Go to the documentation of this file.
1 /* Copyright (C) 2021-2025 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \defgroup dpdk DPDK running mode
20  *
21  * @{
22  */
23 
24 /**
25  * \file
26  *
27  * \author Lukas Sismis <lukas.sismis@gmail.com>
28  *
29  * DPDK capture interface
30  *
31  */
32 
33 #include "suricata-common.h"
34 #include "runmodes.h"
35 #include "decode.h"
36 #include "packet.h"
37 #include "source-dpdk.h"
38 #include "suricata.h"
39 #include "threads.h"
40 #include "threadvars.h"
41 #include "tm-threads.h"
42 #include "tmqh-packetpool.h"
43 #include "util-privs.h"
44 #include "action-globals.h"
45 
46 #ifndef HAVE_DPDK
47 
48 TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **);
49 
51 {
52  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
59 }
60 
61 /**
62  * \brief Registration Function for DecodeDPDK.
63  */
65 {
66  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
73 }
74 
75 /**
76  * \brief this function prints an error message and exits.
77  */
78 TmEcode NoDPDKSupportExit(ThreadVars *tv, const void *initdata, void **data)
79 {
80  FatalError("Error creating thread %s: you do not have "
81  "support for DPDK enabled, on Linux host please recompile "
82  "with --enable-dpdk",
83  tv->name);
84 }
85 
86 #else /* We have DPDK support */
87 
88 #include "util-affinity.h"
89 #include "util-dpdk.h"
90 #include "util-dpdk-i40e.h"
91 #include "util-dpdk-ice.h"
92 #include "util-dpdk-ixgbe.h"
93 #include "util-dpdk-mlx5.h"
94 #include "util-dpdk-bonding.h"
95 #include <numa.h>
96 
97 #define BURST_SIZE 32
98 // interrupt mode constants
99 #define MIN_ZERO_POLL_COUNT 10U
100 #define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
101 #define MINIMUM_SLEEP_TIME_US 1U
102 #define STANDARD_SLEEP_TIME_US 100U
103 #define MAX_EPOLL_TIMEOUT_MS 500U
104 static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
105 
106 /**
107  * \brief Structure to hold thread specific variables.
108  */
109 typedef struct DPDKThreadVars_ {
110  /* counters */
111  uint64_t pkts;
112  ThreadVars *tv;
113  TmSlot *slot;
114  LiveDevice *livedev;
115  ChecksumValidationMode checksum_mode;
116  bool intr_enabled;
117  /* references to packet and drop counters */
118  uint16_t capture_dpdk_packets;
119  uint16_t capture_dpdk_rx_errs;
120  uint16_t capture_dpdk_imissed;
121  uint16_t capture_dpdk_rx_no_mbufs;
122  uint16_t capture_dpdk_ierrors;
123  uint16_t capture_dpdk_tx_errs;
124  unsigned int flags;
125  int threads;
126  /* for IPS */
127  DpdkCopyModeEnum copy_mode;
128  uint16_t out_port_id;
129  /* Entry in the peers_list */
130 
131  uint64_t bytes;
132  uint64_t accepted;
133  uint64_t dropped;
134  uint16_t port_id;
135  uint16_t queue_id;
136  int32_t port_socket_id;
137  struct rte_mbuf *received_mbufs[BURST_SIZE];
138  DPDKWorkerSync *workers_sync;
139 } DPDKThreadVars;
140 
141 static TmEcode ReceiveDPDKThreadInit(ThreadVars *, const void *, void **);
142 static void ReceiveDPDKThreadExitStats(ThreadVars *, void *);
143 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *, void *);
144 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot);
145 
146 static TmEcode DecodeDPDKThreadInit(ThreadVars *, const void *, void **);
147 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data);
148 static TmEcode DecodeDPDK(ThreadVars *, Packet *, void *);
149 
150 static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset);
151 static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
152 {
153  uint32_t event_data = port_id << UINT16_WIDTH | queue_id;
154  int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
155  RTE_INTR_EVENT_ADD, (void *)((uintptr_t)event_data));
156 
157  if (ret != 0) {
158  SCLogError("%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
159  queue_id, rte_strerror(-ret));
160  return false;
161  }
162  return true;
163 }
164 
165 static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
166 {
167  if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
168  return MINIMUM_SLEEP_TIME_US;
169 
170  return STANDARD_SLEEP_TIME_US;
171 }
172 
173 static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool on)
174 {
175  rte_spinlock_lock(&(intr_lock[port_id]));
176 
177  if (on)
178  rte_eth_dev_rx_intr_enable(port_id, queue_id);
179  else
180  rte_eth_dev_rx_intr_disable(port_id, queue_id);
181 
182  rte_spinlock_unlock(&(intr_lock[port_id]));
183 }
184 
185 static inline void DPDKFreeMbufArray(
186  struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
187 {
188  for (int i = offset; i < mbuf_cnt; i++) {
189  rte_pktmbuf_free(mbuf_array[i]);
190  }
191 }
192 
193 static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
194 {
195  if (strcmp(driver_name, "net_bonding") == 0)
196  driver_name = BondingDeviceDriverGet(ptv->port_id);
197  if (strcmp(driver_name, "net_i40e") == 0)
198  i40eDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
199  else if (strcmp(driver_name, "net_ixgbe") == 0)
200  ixgbeDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
201  else if (strcmp(driver_name, "net_ice") == 0)
202  iceDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
203  else if (strcmp(driver_name, "mlx5_pci") == 0)
204  mlx5DeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
205 }
206 
207 static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
208 {
209  if (strcmp(driver_name, "net_bonding") == 0) {
210  driver_name = BondingDeviceDriverGet(ptv->port_id);
211  }
212 
213  if (
214 #if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
215  strcmp(driver_name, "net_i40e") == 0 ||
216 #endif /* RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0) */
217  strcmp(driver_name, "net_ixgbe") == 0 || strcmp(driver_name, "net_ice") == 0 ||
218  strcmp(driver_name, "mlx5_pci") == 0) {
219  // Flush the RSS rules that have been inserted in the post start section
220  struct rte_flow_error flush_error = { 0 };
221  int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
222  if (retval != 0) {
223  SCLogError("%s: unable to flush rte_flow rules: %s Flush error msg: %s",
224  ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
225  }
226  }
227 }
228 
229 /**
230  * Attempts to retrieve NUMA node id on which the caller runs
231  * @return NUMA id on success, -1 otherwise
232  */
233 static int GetNumaNode(void)
234 {
235  int cpu = 0;
236  int node = -1;
237 
238 #if defined(__linux__)
239  cpu = sched_getcpu();
240  node = numa_node_of_cpu(cpu);
241 #else
242  SCLogWarning("NUMA node retrieval is not supported on this OS.");
243 #endif
244 
245  return node;
246 }
247 
248 /**
249  * \brief Registration Function for ReceiveDPDK.
250  * \todo Unit tests are needed for this module.
251  */
253 {
254  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
255  tmm_modules[TMM_RECEIVEDPDK].ThreadInit = ReceiveDPDKThreadInit;
257  tmm_modules[TMM_RECEIVEDPDK].PktAcqLoop = ReceiveDPDKLoop;
259  tmm_modules[TMM_RECEIVEDPDK].ThreadExitPrintStats = ReceiveDPDKThreadExitStats;
260  tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = ReceiveDPDKThreadDeinit;
263 }
264 
265 /**
266  * \brief Registration Function for DecodeDPDK.
267  * \todo Unit tests are needed for this module.
268  */
270 {
271  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
272  tmm_modules[TMM_DECODEDPDK].ThreadInit = DecodeDPDKThreadInit;
273  tmm_modules[TMM_DECODEDPDK].Func = DecodeDPDK;
275  tmm_modules[TMM_DECODEDPDK].ThreadDeinit = DecodeDPDKThreadDeinit;
278 }
279 
280 static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
281 {
282  /* Some NICs (e.g. Intel) do not support queue statistics and the drops can be fetched only on
283  * the port level. Therefore setting it to the first worker to have at least continuous update
284  * on the dropped packets. */
285  if (ptv->queue_id == 0) {
286  struct rte_eth_stats eth_stats;
287  int retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
288  if (unlikely(retval != 0)) {
289  SCLogError("%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
290  return;
291  }
292 
293  StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets,
294  ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
295  SC_ATOMIC_SET(ptv->livedev->pkts,
296  eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
297  StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_errs,
298  eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
299  StatsSetUI64(ptv->tv, ptv->capture_dpdk_imissed, eth_stats.imissed);
300  StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
301  StatsSetUI64(ptv->tv, ptv->capture_dpdk_ierrors, eth_stats.ierrors);
302  StatsSetUI64(ptv->tv, ptv->capture_dpdk_tx_errs, eth_stats.oerrors);
304  ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
305  } else {
306  StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets, ptv->pkts);
307  }
308 }
309 
310 static void DPDKReleasePacket(Packet *p)
311 {
312  int retval;
313  /* Need to be in copy mode and need to detect early release
314  where Ethernet header could not be set (and pseudo packet)
315  When enabling promiscuous mode on Intel cards, 2 ICMPv6 packets are generated.
316  These get into the infinite cycle between the NIC and the switch in some cases */
317  if ((p->dpdk_v.copy_mode == DPDK_COPY_MODE_TAP ||
318  (p->dpdk_v.copy_mode == DPDK_COPY_MODE_IPS && !PacketCheckAction(p, ACTION_DROP)))
319 #if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
320  && !(PacketIsICMPv6(p) && PacketGetICMPv6(p)->type == 143)
321 #endif
322  ) {
324  retval =
325  rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
326  // rte_eth_tx_burst can return only 0 (failure) or 1 (success) because we are only
327  // transmitting burst of size 1 and the function rte_eth_tx_burst returns number of
328  // successfully sent packets.
329  if (unlikely(retval < 1)) {
330  // sometimes a repeated transmit can help to send out the packet
331  rte_delay_us(DPDK_BURST_TX_WAIT_US);
332  retval = rte_eth_tx_burst(
333  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
334  if (unlikely(retval < 1)) {
335  SCLogDebug("Unable to transmit the packet on port %u queue %u",
336  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
337  rte_pktmbuf_free(p->dpdk_v.mbuf);
338  p->dpdk_v.mbuf = NULL;
339  }
340  }
341  } else {
342  rte_pktmbuf_free(p->dpdk_v.mbuf);
343  p->dpdk_v.mbuf = NULL;
344  }
345 
347 }
348 
349 static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
350 {
351  SCEnter();
352  // Indicate that the thread is actually running its application level
353  // code (i.e., it can poll packets)
355  PacketPoolWait();
356 
357  rte_eth_stats_reset(ptv->port_id);
358  rte_eth_xstats_reset(ptv->port_id);
359 
360  if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
362 
364 }
365 
366 static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
367 {
368  static thread_local uint64_t last_timeout_msec = 0;
369  SCTime_t t = TimeGet();
370  uint64_t msecs = SCTIME_MSECS(t);
371  if (msecs > last_timeout_msec + 100) {
372  TmThreadsCaptureHandleTimeout(tv, NULL);
373  last_timeout_msec = msecs;
374  }
375 }
376 
377 /**
378  * \brief Decides if it should retry the packet poll or continue with the packet processing
379  * \return true if the poll should be retried, false otherwise
380  */
381 static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
382 {
383  static thread_local uint32_t zero_pkt_polls_cnt = 0;
384 
385  if (nb_rx > 0) {
386  zero_pkt_polls_cnt = 0;
387  return false;
388  }
389 
390  LoopHandleTimeoutOnIdle(tv);
391  if (!ptv->intr_enabled)
392  return true;
393 
394  zero_pkt_polls_cnt++;
395  if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
396  return true;
397 
398  uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
399  if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
400  rte_delay_us(pwd_idle_hint);
401  } else {
402  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
403  struct rte_epoll_event event;
404  rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
405  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
406  return true;
407  }
408 
409  return false;
410 }
411 
412 /**
413  * \brief Initializes a packet from an mbuf
414  * \return true if the packet was initialized successfully, false otherwise
415  */
416 static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
417 {
419  if (unlikely(p == NULL)) {
420  return NULL;
421  }
424  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
426  }
427 
428  p->ts = TimeGet();
429  p->dpdk_v.mbuf = mbuf;
430  p->ReleasePacket = DPDKReleasePacket;
431  p->dpdk_v.copy_mode = ptv->copy_mode;
432  p->dpdk_v.out_port_id = ptv->out_port_id;
433  p->dpdk_v.out_queue_id = ptv->queue_id;
434  p->livedev = ptv->livedev;
435 
436  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
438  } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
439  uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
440  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
441  (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
442  SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
444  } else {
445  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
446  SCLogDebug("HW detected BAD IP checksum");
447  // chsum recalc will not be triggered but rule keyword check will be
448  p->l3.csum_set = true;
449  p->l3.csum = 0;
450  }
451  if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
452  SCLogDebug("HW detected BAD L4 chsum");
453  p->l4.csum_set = true;
454  p->l4.csum = 0;
455  }
456  }
457  }
458 
459  return p;
460 }
461 
462 static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
463 {
464  static thread_local bool segmented_mbufs_warned = false;
465  if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
466  char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
467  "Check your configuration or report the issue";
468  enum rte_proc_type_t eal_t = rte_eal_process_type();
469  if (eal_t == RTE_PROC_SECONDARY) {
470  SCLogWarning("%s. To avoid segmented mbufs, "
471  "try to increase mbuf size in your primary application",
472  warn_s);
473  } else if (eal_t == RTE_PROC_PRIMARY) {
474  SCLogWarning("%s. To avoid segmented mbufs, "
475  "try to increase MTU in your suricata.yaml",
476  warn_s);
477  }
478 
479  segmented_mbufs_warned = true;
480  }
481 }
482 
483 static void HandleShutdown(DPDKThreadVars *ptv)
484 {
485  SCLogDebug("Stopping Suricata!");
486  SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
487  while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
488  rte_delay_us(10);
489  }
490  if (ptv->queue_id == 0) {
491  rte_delay_us(20); // wait for all threads to get out of the sync loop
492  SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
493  // If Suricata runs in peered mode, the peer threads might still want to send
494  // packets to our port. Instead, we know, that we are done with the peered port, so
495  // we stop it. The peered threads will stop our port.
496  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
497  rte_eth_dev_stop(ptv->out_port_id);
498  } else {
499  // in IDS we stop our port - no peer threads are running
500  rte_eth_dev_stop(ptv->port_id);
501  }
502  }
503  DPDKDumpCounters(ptv);
504 }
505 
506 static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
507 {
508  static thread_local SCTime_t last_dump = { 0 };
509  SCTime_t current_time = TimeGet();
510  /* Trigger one dump of stats every second */
511  if (current_time.secs != last_dump.secs) {
512  DPDKDumpCounters(ptv);
513  last_dump = current_time;
514  }
515 }
516 
517 /**
518  * \brief Main DPDK reading Loop function
519  */
520 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
521 {
522  SCEnter();
523  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
524  ptv->slot = ((TmSlot *)slot)->slot_next;
525  TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
526  if (ret != TM_ECODE_OK) {
527  SCReturnInt(ret);
528  }
529  while (true) {
530  if (unlikely(suricata_ctl_flags != 0)) {
531  HandleShutdown(ptv);
532  break;
533  }
534 
535  uint16_t nb_rx =
536  rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
537  if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
538  continue;
539  }
540 
541  ptv->pkts += (uint64_t)nb_rx;
542  for (uint16_t i = 0; i < nb_rx; i++) {
543  Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
544  if (p == NULL) {
545  rte_pktmbuf_free(ptv->received_mbufs[i]);
546  continue;
547  }
548  DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
549  PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
550  rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
551  if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
552  TmqhOutputPacketpool(ptv->tv, p);
553  DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
554  SCReturnInt(EXIT_FAILURE);
555  }
556  }
557 
558  PeriodicDPDKDumpCounters(ptv);
560  }
561 
563 }
564 
565 /**
566  * \brief Init function for ReceiveDPDK.
567  *
568  * \param tv pointer to ThreadVars
569  * \param initdata pointer to the interface passed from the user
570  * \param data pointer gets populated with DPDKThreadVars
571  *
572  */
573 static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
574 {
575  SCEnter();
576  int retval, thread_numa;
577  DPDKThreadVars *ptv = NULL;
578  DPDKIfaceConfig *dpdk_config = (DPDKIfaceConfig *)initdata;
579 
580  if (initdata == NULL) {
581  SCLogError("DPDK configuration is NULL in thread initialization");
582  goto fail;
583  }
584 
585  ptv = SCCalloc(1, sizeof(DPDKThreadVars));
586  if (unlikely(ptv == NULL)) {
587  SCLogError("Unable to allocate memory");
588  goto fail;
589  }
590 
591  ptv->tv = tv;
592  ptv->pkts = 0;
593  ptv->bytes = 0;
594  ptv->livedev = LiveGetDevice(dpdk_config->iface);
595 
596  ptv->capture_dpdk_packets = StatsRegisterCounter("capture.packets", ptv->tv);
597  ptv->capture_dpdk_rx_errs = StatsRegisterCounter("capture.rx_errors", ptv->tv);
598  ptv->capture_dpdk_tx_errs = StatsRegisterCounter("capture.tx_errors", ptv->tv);
599  ptv->capture_dpdk_imissed = StatsRegisterCounter("capture.dpdk.imissed", ptv->tv);
600  ptv->capture_dpdk_rx_no_mbufs = StatsRegisterCounter("capture.dpdk.no_mbufs", ptv->tv);
601  ptv->capture_dpdk_ierrors = StatsRegisterCounter("capture.dpdk.ierrors", ptv->tv);
602 
603  ptv->copy_mode = dpdk_config->copy_mode;
604  ptv->checksum_mode = dpdk_config->checksum_mode;
605 
606  ptv->threads = dpdk_config->threads;
607  ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) ? true : false;
608  ptv->port_id = dpdk_config->port_id;
609  ptv->out_port_id = dpdk_config->out_port_id;
610  ptv->port_socket_id = dpdk_config->socket_id;
611 
612  thread_numa = GetNumaNode();
613  if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
614  thread_numa != ptv->port_socket_id) {
615  SC_ATOMIC_ADD(dpdk_config->inconsistent_numa_cnt, 1);
616  SCLogPerf("%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
617  ptv->port_socket_id, thread_numa);
618  }
619 
620  ptv->workers_sync = dpdk_config->workers_sync;
621  uint16_t queue_id = SC_ATOMIC_ADD(dpdk_config->queue_id, 1);
622  ptv->queue_id = queue_id;
623 
624  // the last thread starts the device
625  if (queue_id == dpdk_config->threads - 1) {
626  retval = rte_eth_dev_start(ptv->port_id);
627  if (retval < 0) {
628  SCLogError("%s: error (%s) during device startup", dpdk_config->iface,
629  rte_strerror(-retval));
630  goto fail;
631  }
632 
633  struct rte_eth_dev_info dev_info;
634  retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
635  if (retval != 0) {
636  SCLogError("%s: error (%s) when getting device info", dpdk_config->iface,
637  rte_strerror(-retval));
638  goto fail;
639  }
640 
641  uint32_t timeout = dpdk_config->linkup_timeout * 10;
642  while (timeout > 0) {
643  struct rte_eth_link link = { 0 };
644  retval = rte_eth_link_get_nowait(ptv->port_id, &link);
645  if (retval != 0) {
646  if (retval == -ENOTSUP) {
647  SCLogInfo("%s: link status not supported, skipping", dpdk_config->iface);
648  } else {
649  SCLogInfo("%s: error (%s) when getting link status, skipping",
650  dpdk_config->iface, rte_strerror(-retval));
651  }
652  break;
653  }
654  if (link.link_status) {
655  char link_status_str[RTE_ETH_LINK_MAX_STR_LEN];
656 #if RTE_VERSION >= RTE_VERSION_NUM(20, 11, 0, 0)
657 #pragma GCC diagnostic push
658 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
659  rte_eth_link_to_str(link_status_str, sizeof(link_status_str), &link);
660 #pragma GCC diagnostic pop
661 #else
662  snprintf(link_status_str, sizeof(link_status_str),
663  "Link Up, speed %u Mbps, %s", // 22 chars + 10 for digits + 11 for duplex
664  link.link_speed,
665  (link.link_duplex == ETH_LINK_FULL_DUPLEX) ? "full-duplex" : "half-duplex");
666 #endif
667 
668  SCLogInfo("%s: %s", dpdk_config->iface, link_status_str);
669  break;
670  }
671 
672  rte_delay_ms(100);
673  timeout--;
674  }
675 
676  if (dpdk_config->linkup_timeout && timeout == 0) {
677  SCLogWarning("%s: link is down, trying to continue anyway", dpdk_config->iface);
678  }
679 
680  // some PMDs requires additional actions only after the device has started
681  DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
682 
683  uint16_t inconsistent_numa_cnt = SC_ATOMIC_GET(dpdk_config->inconsistent_numa_cnt);
684  if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
685  SCLogWarning("%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
686  dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
687  } else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
688  SCLogNotice(
689  "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
690  dpdk_config->iface);
691  }
692  if (ptv->intr_enabled) {
693  rte_spinlock_init(&intr_lock[ptv->port_id]);
694  }
695  }
696 
697  *data = (void *)ptv;
698  dpdk_config->DerefFunc(dpdk_config);
700 
701 fail:
702  if (dpdk_config != NULL)
703  dpdk_config->DerefFunc(dpdk_config);
704  if (ptv != NULL)
705  SCFree(ptv);
707 }
708 
709 static void PrintDPDKPortXstats(uint32_t port_id, const char *port_name)
710 {
711  struct rte_eth_xstat *xstats;
712  struct rte_eth_xstat_name *xstats_names;
713 
714  int32_t len = rte_eth_xstats_get(port_id, NULL, 0);
715  if (len < 0)
716  FatalError("Error (%s) getting count of rte_eth_xstats failed on port %s",
717  rte_strerror(-len), port_name);
718 
719  xstats = SCCalloc(len, sizeof(*xstats));
720  if (xstats == NULL)
721  FatalError("Failed to allocate memory for the rte_eth_xstat structure");
722 
723  int32_t ret = rte_eth_xstats_get(port_id, xstats, len);
724  if (ret < 0 || ret > len) {
725  SCFree(xstats);
726  FatalError("Error (%s) getting rte_eth_xstats failed on port %s", rte_strerror(-ret),
727  port_name);
728  }
729  xstats_names = SCCalloc(len, sizeof(*xstats_names));
730  if (xstats_names == NULL) {
731  SCFree(xstats);
732  FatalError("Failed to allocate memory for the rte_eth_xstat_name array");
733  }
734  ret = rte_eth_xstats_get_names(port_id, xstats_names, len);
735  if (ret < 0 || ret > len) {
736  SCFree(xstats);
737  SCFree(xstats_names);
738  FatalError("Error (%s) getting names of rte_eth_xstats failed on port %s",
739  rte_strerror(-ret), port_name);
740  }
741  for (int32_t i = 0; i < len; i++) {
742  if (xstats[i].value > 0)
743  SCLogPerf("Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].name,
744  xstats[i].value);
745  }
746 
747  SCFree(xstats);
748  SCFree(xstats_names);
749 }
750 
751 /**
752  * \brief This function prints stats to the screen at exit.
753  * \param tv pointer to ThreadVars
754  * \param data pointer that gets cast into DPDKThreadVars for ptv
755  */
756 static void ReceiveDPDKThreadExitStats(ThreadVars *tv, void *data)
757 {
758  SCEnter();
759  int retval;
760  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
761 
762  if (ptv->queue_id == 0) {
763  struct rte_eth_stats eth_stats;
764  PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
765  retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
766  if (unlikely(retval != 0)) {
767  SCLogError("%s: failed to get stats (%s)", ptv->livedev->dev, strerror(-retval));
768  SCReturn;
769  }
770  SCLogPerf("%s: total RX stats: packets %" PRIu64 " bytes: %" PRIu64 " missed: %" PRIu64
771  " errors: %" PRIu64 " nombufs: %" PRIu64,
772  ptv->livedev->dev, eth_stats.ipackets, eth_stats.ibytes, eth_stats.imissed,
773  eth_stats.ierrors, eth_stats.rx_nombuf);
774  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS)
775  SCLogPerf("%s: total TX stats: packets %" PRIu64 " bytes: %" PRIu64 " errors: %" PRIu64,
776  ptv->livedev->dev, eth_stats.opackets, eth_stats.obytes, eth_stats.oerrors);
777  }
778 
779  DPDKDumpCounters(ptv);
780  SCLogPerf("(%s) received packets %" PRIu64, tv->name, ptv->pkts);
781 }
782 
783 /**
784  * \brief DeInit function closes dpdk at exit.
785  * \param tv pointer to ThreadVars
786  * \param data pointer that gets cast into DPDKThreadVars for ptv
787  */
788 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *tv, void *data)
789 {
790  SCEnter();
791  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
792 
793  if (ptv->queue_id == 0) {
794  struct rte_eth_dev_info dev_info;
795  int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
796  if (retval != 0) {
797  SCLogError("%s: error (%s) when getting device info", ptv->livedev->dev,
798  rte_strerror(-retval));
800  }
801 
802  DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
803 
804  if (ptv->workers_sync) {
805  SCFree(ptv->workers_sync);
806  }
807  }
808 
809  SCFree(ptv);
811 }
812 
813 /**
814  * \brief This function passes off to link type decoders.
815  *
816  * DecodeDPDK decodes packets from DPDK and passes
817  * them off to the proper link type decoder.
818  *
819  * \param t pointer to ThreadVars
820  * \param p pointer to the current packet
821  * \param data pointer that gets cast into DPDKThreadVars for ptv
822  */
823 static TmEcode DecodeDPDK(ThreadVars *tv, Packet *p, void *data)
824 {
825  SCEnter();
827 
829 
830  /* update counters */
832 
833  /* If suri has set vlan during reading, we increase vlan counter */
834  if (p->vlan_idx) {
836  }
837 
838  /* call the decoder */
839  DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
840 
842 
844 }
845 
846 static TmEcode DecodeDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
847 {
848  SCEnter();
849  DecodeThreadVars *dtv = NULL;
850 
852 
853  if (dtv == NULL)
855 
857 
858  *data = (void *)dtv;
859 
861 }
862 
863 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data)
864 {
865  SCEnter();
866  if (data != NULL)
867  DecodeThreadVarsFree(tv, data);
869 }
870 
871 #endif /* HAVE_DPDK */
872 /* eof */
873 /**
874  * @}
875  */
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:73
PacketL4::csum_set
bool csum_set
Definition: decode.h:449
PacketCheckAction
bool PacketCheckAction(const Packet *p, const uint8_t a)
Definition: packet.c:49
tm-threads.h
len
uint8_t len
Definition: app-layer-dnp3.h:2
TMM_RECEIVEDPDK
@ TMM_RECEIVEDPDK
Definition: tm-threads-common.h:56
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:166
CHECKSUM_VALIDATION_OFFLOAD
@ CHECKSUM_VALIDATION_OFFLOAD
Definition: decode.h:48
offset
uint64_t offset
Definition: util-streaming-buffer.h:0
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
PacketFreeOrRelease
void PacketFreeOrRelease(Packet *p)
Return a packet to where it was allocated.
Definition: decode.c:250
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
PKT_IS_PSEUDOPKT
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition: decode.h:1297
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
DPDK_COPY_MODE_IPS
@ DPDK_COPY_MODE_IPS
Definition: source-dpdk.h:34
PacketL4::csum
uint16_t csum
Definition: decode.h:450
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:101
TMM_DECODEDPDK
@ TMM_DECODEDPDK
Definition: tm-threads-common.h:57
action-globals.h
Packet_::flags
uint32_t flags
Definition: decode.h:527
DpdkCopyModeEnum
DpdkCopyModeEnum
Definition: source-dpdk.h:34
threads.h
Packet_::vlan_idx
uint8_t vlan_idx
Definition: decode.h:512
LiveDevice_
Definition: util-device.h:41
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
StatsSetUI64
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:207
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
NoDPDKSupportExit
TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **)
this function prints an error message and exits.
Definition: source-dpdk.c:78
util-privs.h
CHECKSUM_VALIDATION_DISABLE
@ CHECKSUM_VALIDATION_DISABLE
Definition: decode.h:43
PacketDecodeFinalize
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition: decode.c:206
DPDKIfaceConfig_
Definition: source-dpdk.h:53
util-dpdk-ice.h
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:314
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:82
tmqh-packetpool.h
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:81
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
Packet_::datalink
int datalink
Definition: decode.h:622
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1301
DPDKWorkerSync_
Definition: source-dpdk.h:48
DecodeRegisterPerfCounters
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:602
TmModuleReceiveDPDKRegister
void TmModuleReceiveDPDKRegister(void)
Definition: source-dpdk.c:50
decode.h
PKT_SRC_WIRE
@ PKT_SRC_WIRE
Definition: decode.h:52
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
Packet_::ts
SCTime_t ts
Definition: decode.h:538
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
LiveGetDevice
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
Definition: util-device.c:248
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
GET_PKT_DATA
#define GET_PKT_DATA(p)
Definition: decode.h:209
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:309
PacketPoolWait
void PacketPoolWait(void)
Definition: tmqh-packetpool.c:80
SCReturn
#define SCReturn
Definition: util-debug.h:273
Packet_
Definition: decode.h:484
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
type
uint16_t type
Definition: decode-vlan.c:106
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:29
GET_PKT_LEN
#define GET_PKT_LEN(p)
Definition: decode.h:208
TimeGet
SCTime_t TimeGet(void)
Definition: util-time.c:152
Packet_::l4
struct PacketL4 l4
Definition: decode.h:584
TmSlot_
Definition: tm-threads.h:53
PKT_IGNORE_CHECKSUM
#define PKT_IGNORE_CHECKSUM
Definition: decode.h:1258
SCTime_t
Definition: util-time.h:40
Packet_::livedev
struct LiveDevice_ * livedev
Definition: decode.h:601
DPDK_BURST_TX_WAIT_US
#define DPDK_BURST_TX_WAIT_US
Definition: source-dpdk.h:36
TmEcode
TmEcode
Definition: tm-threads-common.h:80
name
const char * name
Definition: tm-threads.c:2135
TmModule_::name
const char * name
Definition: tm-modules.h:44
DecodeThreadVars_::counter_vlan
uint16_t counter_vlan
Definition: decode.h:983
runmodes.h
SCLogInfo
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:224
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
dtv
DecodeThreadVars * dtv
Definition: fuzz_decodepcapfile.c:33
PacketL3::csum_set
bool csum_set
Definition: decode.h:419
DPDK_IRQ_MODE
#define DPDK_IRQ_MODE
Definition: source-dpdk.h:42
Packet_::ReleasePacket
void(* ReleasePacket)(struct Packet_ *)
Definition: decode.h:574
util-dpdk.h
flags
uint8_t flags
Definition: decode-gre.h:0
DecodeThreadVarsFree
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:793
source-dpdk.h
ChecksumValidationMode
ChecksumValidationMode
Definition: decode.h:42
suricata-common.h
packet.h
ACTION_DROP
#define ACTION_DROP
Definition: action-globals.h:30
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
FatalError
#define FatalError(...)
Definition: util-debug.h:502
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
threadvars.h
Packet_::l3
struct PacketL3 l3
Definition: decode.h:583
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
SCFree
#define SCFree(p)
Definition: util-mem.h:61
DecodeThreadVars_
Structure to hold thread specific data for all decode modules.
Definition: decode.h:946
util-dpdk-bonding.h
util-dpdk-mlx5.h
DecodeThreadVarsAlloc
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:775
util-dpdk-ixgbe.h
PacketSetData
int PacketSetData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zero copy is used.
Definition: decode.c:813
util-dpdk-i40e.h
suricata.h
PacketL3::csum
uint16_t csum
Definition: decode.h:420
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition: counters.c:450
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
TmModuleDecodeDPDKRegister
void TmModuleDecodeDPDKRegister(void)
Registration Function for DecodeDPDK.
Definition: source-dpdk.c:64
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:952
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:275
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:267
SC_CAP_NET_RAW
#define SC_CAP_NET_RAW
Definition: util-privs.h:32
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:76
DPDK_COPY_MODE_TAP
@ DPDK_COPY_MODE_TAP
Definition: source-dpdk.h:34
DecodeUpdatePacketCounters
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition: decode.c:741
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:171