suricata
source-dpdk.c
Go to the documentation of this file.
1 /* Copyright (C) 2021-2025 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \defgroup dpdk DPDK running mode
20  *
21  * @{
22  */
23 
24 /**
25  * \file
26  *
27  * \author Lukas Sismis <lukas.sismis@gmail.com>
28  *
29  * DPDK capture interface
30  *
31  */
32 
33 #include "suricata-common.h"
34 #include "runmodes.h"
35 #include "decode.h"
36 #include "packet.h"
37 #include "source-dpdk.h"
38 #include "suricata.h"
39 #include "threads.h"
40 #include "threadvars.h"
41 #include "tm-threads.h"
42 #include "tmqh-packetpool.h"
43 #include "util-privs.h"
44 #include "action-globals.h"
45 
46 #ifndef HAVE_DPDK
47 
48 TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **);
49 
51 {
52  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
59 }
60 
61 /**
62  * \brief Registration Function for DecodeDPDK.
63  */
65 {
66  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
73 }
74 
75 /**
76  * \brief this function prints an error message and exits.
77  */
78 TmEcode NoDPDKSupportExit(ThreadVars *tv, const void *initdata, void **data)
79 {
80  FatalError("Error creating thread %s: you do not have "
81  "support for DPDK enabled, on Linux host please recompile "
82  "with --enable-dpdk",
83  tv->name);
84 }
85 
86 #else /* We have DPDK support */
87 
88 #include "util-affinity.h"
89 #include "util-dpdk.h"
90 #include "util-dpdk-i40e.h"
91 #include "util-dpdk-ice.h"
92 #include "util-dpdk-ixgbe.h"
93 #include "util-dpdk-mlx5.h"
94 #include "util-dpdk-bonding.h"
95 #include <numa.h>
96 
97 #define BURST_SIZE 32
98 // interrupt mode constants
99 #define MIN_ZERO_POLL_COUNT 10U
100 #define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
101 #define MINIMUM_SLEEP_TIME_US 1U
102 #define STANDARD_SLEEP_TIME_US 100U
103 #define MAX_EPOLL_TIMEOUT_MS 500U
104 static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
105 
106 /**
107  * \brief Structure to hold thread specific variables.
108  */
109 typedef struct DPDKThreadVars_ {
110  /* counters */
111  uint64_t pkts;
112  ThreadVars *tv;
113  TmSlot *slot;
114  LiveDevice *livedev;
115  ChecksumValidationMode checksum_mode;
116  bool intr_enabled;
117  /* references to packet and drop counters */
118  uint16_t capture_dpdk_packets;
119  uint16_t capture_dpdk_rx_errs;
120  uint16_t capture_dpdk_imissed;
121  uint16_t capture_dpdk_rx_no_mbufs;
122  uint16_t capture_dpdk_ierrors;
123  uint16_t capture_dpdk_tx_errs;
124  unsigned int flags;
125  int threads;
126  /* for IPS */
127  DpdkCopyModeEnum copy_mode;
128  uint16_t out_port_id;
129  /* Entry in the peers_list */
130 
131  uint64_t bytes;
132  uint64_t accepted;
133  uint64_t dropped;
134  uint16_t port_id;
135  uint16_t queue_id;
136  int32_t port_socket_id;
137  struct rte_mempool *pkt_mempool;
138  struct rte_mbuf *received_mbufs[BURST_SIZE];
139  DPDKWorkerSync *workers_sync;
140 } DPDKThreadVars;
141 
142 static TmEcode ReceiveDPDKThreadInit(ThreadVars *, const void *, void **);
143 static void ReceiveDPDKThreadExitStats(ThreadVars *, void *);
144 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *, void *);
145 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot);
146 
147 static TmEcode DecodeDPDKThreadInit(ThreadVars *, const void *, void **);
148 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data);
149 static TmEcode DecodeDPDK(ThreadVars *, Packet *, void *);
150 
151 static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset);
152 static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
153 {
154  uint32_t event_data = port_id << UINT16_WIDTH | queue_id;
155  int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
156  RTE_INTR_EVENT_ADD, (void *)((uintptr_t)event_data));
157 
158  if (ret != 0) {
159  SCLogError("%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
160  queue_id, rte_strerror(-ret));
161  return false;
162  }
163  return true;
164 }
165 
166 static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
167 {
168  if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
169  return MINIMUM_SLEEP_TIME_US;
170 
171  return STANDARD_SLEEP_TIME_US;
172 }
173 
174 static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool on)
175 {
176  rte_spinlock_lock(&(intr_lock[port_id]));
177 
178  if (on)
179  rte_eth_dev_rx_intr_enable(port_id, queue_id);
180  else
181  rte_eth_dev_rx_intr_disable(port_id, queue_id);
182 
183  rte_spinlock_unlock(&(intr_lock[port_id]));
184 }
185 
186 static inline void DPDKFreeMbufArray(
187  struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
188 {
189  for (int i = offset; i < mbuf_cnt; i++) {
190  rte_pktmbuf_free(mbuf_array[i]);
191  }
192 }
193 
194 static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
195 {
196  if (strcmp(driver_name, "net_bonding") == 0)
197  driver_name = BondingDeviceDriverGet(ptv->port_id);
198  if (strcmp(driver_name, "net_i40e") == 0)
199  i40eDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
200  else if (strcmp(driver_name, "net_ixgbe") == 0)
201  ixgbeDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
202  else if (strcmp(driver_name, "net_ice") == 0)
203  iceDeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
204  else if (strcmp(driver_name, "mlx5_pci") == 0)
205  mlx5DeviceSetRSS(ptv->port_id, ptv->threads, ptv->livedev->dev);
206 }
207 
208 static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
209 {
210  if (strcmp(driver_name, "net_bonding") == 0) {
211  driver_name = BondingDeviceDriverGet(ptv->port_id);
212  }
213 
214  if (
215 #if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
216  strcmp(driver_name, "net_i40e") == 0 ||
217 #endif /* RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0) */
218  strcmp(driver_name, "net_ixgbe") == 0 || strcmp(driver_name, "net_ice") == 0 ||
219  strcmp(driver_name, "mlx5_pci") == 0) {
220  // Flush the RSS rules that have been inserted in the post start section
221  struct rte_flow_error flush_error = { 0 };
222  int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
223  if (retval != 0) {
224  SCLogError("%s: unable to flush rte_flow rules: %s Flush error msg: %s",
225  ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
226  }
227  }
228 }
229 
230 /**
231  * Attempts to retrieve NUMA node id on which the caller runs
232  * @return NUMA id on success, -1 otherwise
233  */
234 static int GetNumaNode(void)
235 {
236  int cpu = 0;
237  int node = -1;
238 
239 #if defined(__linux__)
240  cpu = sched_getcpu();
241  node = numa_node_of_cpu(cpu);
242 #else
243  SCLogWarning("NUMA node retrieval is not supported on this OS.");
244 #endif
245 
246  return node;
247 }
248 
249 /**
250  * \brief Registration Function for ReceiveDPDK.
251  * \todo Unit tests are needed for this module.
252  */
254 {
255  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
256  tmm_modules[TMM_RECEIVEDPDK].ThreadInit = ReceiveDPDKThreadInit;
258  tmm_modules[TMM_RECEIVEDPDK].PktAcqLoop = ReceiveDPDKLoop;
260  tmm_modules[TMM_RECEIVEDPDK].ThreadExitPrintStats = ReceiveDPDKThreadExitStats;
261  tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = ReceiveDPDKThreadDeinit;
264 }
265 
266 /**
267  * \brief Registration Function for DecodeDPDK.
268  * \todo Unit tests are needed for this module.
269  */
271 {
272  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
273  tmm_modules[TMM_DECODEDPDK].ThreadInit = DecodeDPDKThreadInit;
274  tmm_modules[TMM_DECODEDPDK].Func = DecodeDPDK;
276  tmm_modules[TMM_DECODEDPDK].ThreadDeinit = DecodeDPDKThreadDeinit;
279 }
280 
281 static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
282 {
283  /* Some NICs (e.g. Intel) do not support queue statistics and the drops can be fetched only on
284  * the port level. Therefore setting it to the first worker to have at least continuous update
285  * on the dropped packets. */
286  if (ptv->queue_id == 0) {
287  struct rte_eth_stats eth_stats;
288  int retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
289  if (unlikely(retval != 0)) {
290  SCLogError("%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
291  return;
292  }
293 
294  StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets,
295  ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
296  SC_ATOMIC_SET(ptv->livedev->pkts,
297  eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
298  StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_errs,
299  eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
300  StatsSetUI64(ptv->tv, ptv->capture_dpdk_imissed, eth_stats.imissed);
301  StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
302  StatsSetUI64(ptv->tv, ptv->capture_dpdk_ierrors, eth_stats.ierrors);
303  StatsSetUI64(ptv->tv, ptv->capture_dpdk_tx_errs, eth_stats.oerrors);
305  ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
306  } else {
307  StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets, ptv->pkts);
308  }
309 }
310 
311 static void DPDKReleasePacket(Packet *p)
312 {
313  int retval;
314  /* Need to be in copy mode and need to detect early release
315  where Ethernet header could not be set (and pseudo packet)
316  When enabling promiscuous mode on Intel cards, 2 ICMPv6 packets are generated.
317  These get into the infinite cycle between the NIC and the switch in some cases */
318  if ((p->dpdk_v.copy_mode == DPDK_COPY_MODE_TAP ||
319  (p->dpdk_v.copy_mode == DPDK_COPY_MODE_IPS && !PacketCheckAction(p, ACTION_DROP)))
320 #if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
321  && !(PacketIsICMPv6(p) && PacketGetICMPv6(p)->type == 143)
322 #endif
323  ) {
325  retval =
326  rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
327  // rte_eth_tx_burst can return only 0 (failure) or 1 (success) because we are only
328  // transmitting burst of size 1 and the function rte_eth_tx_burst returns number of
329  // successfully sent packets.
330  if (unlikely(retval < 1)) {
331  // sometimes a repeated transmit can help to send out the packet
332  rte_delay_us(DPDK_BURST_TX_WAIT_US);
333  retval = rte_eth_tx_burst(
334  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
335  if (unlikely(retval < 1)) {
336  SCLogDebug("Unable to transmit the packet on port %u queue %u",
337  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
338  rte_pktmbuf_free(p->dpdk_v.mbuf);
339  p->dpdk_v.mbuf = NULL;
340  }
341  }
342  } else {
343  rte_pktmbuf_free(p->dpdk_v.mbuf);
344  p->dpdk_v.mbuf = NULL;
345  }
346 
348 }
349 
350 static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
351 {
352  SCEnter();
353  // Indicate that the thread is actually running its application level
354  // code (i.e., it can poll packets)
356  PacketPoolWait();
357 
358  rte_eth_stats_reset(ptv->port_id);
359  rte_eth_xstats_reset(ptv->port_id);
360 
361  if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
363 
365 }
366 
367 static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
368 {
369  static thread_local uint64_t last_timeout_msec = 0;
370  SCTime_t t = TimeGet();
371  uint64_t msecs = SCTIME_MSECS(t);
372  if (msecs > last_timeout_msec + 100) {
373  TmThreadsCaptureHandleTimeout(tv, NULL);
374  last_timeout_msec = msecs;
375  }
376 }
377 
378 /**
379  * \brief Decides if it should retry the packet poll or continue with the packet processing
380  * \return true if the poll should be retried, false otherwise
381  */
382 static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
383 {
384  static thread_local uint32_t zero_pkt_polls_cnt = 0;
385 
386  if (nb_rx > 0) {
387  zero_pkt_polls_cnt = 0;
388  return false;
389  }
390 
391  LoopHandleTimeoutOnIdle(tv);
392  if (!ptv->intr_enabled)
393  return true;
394 
395  zero_pkt_polls_cnt++;
396  if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
397  return true;
398 
399  uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
400  if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
401  rte_delay_us(pwd_idle_hint);
402  } else {
403  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
404  struct rte_epoll_event event;
405  rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
406  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
407  return true;
408  }
409 
410  return false;
411 }
412 
413 /**
414  * \brief Initializes a packet from an mbuf
415  * \return true if the packet was initialized successfully, false otherwise
416  */
417 static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
418 {
420  if (unlikely(p == NULL)) {
421  return NULL;
422  }
425  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
427  }
428 
429  p->ts = TimeGet();
430  p->dpdk_v.mbuf = mbuf;
431  p->ReleasePacket = DPDKReleasePacket;
432  p->dpdk_v.copy_mode = ptv->copy_mode;
433  p->dpdk_v.out_port_id = ptv->out_port_id;
434  p->dpdk_v.out_queue_id = ptv->queue_id;
435  p->livedev = ptv->livedev;
436 
437  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
439  } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
440  uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
441  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
442  (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
443  SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
445  } else {
446  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
447  SCLogDebug("HW detected BAD IP checksum");
448  // chsum recalc will not be triggered but rule keyword check will be
449  p->l3.csum_set = true;
450  p->l3.csum = 0;
451  }
452  if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
453  SCLogDebug("HW detected BAD L4 chsum");
454  p->l4.csum_set = true;
455  p->l4.csum = 0;
456  }
457  }
458  }
459 
460  return p;
461 }
462 
463 static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
464 {
465  static thread_local bool segmented_mbufs_warned = false;
466  if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
467  char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
468  "Check your configuration or report the issue";
469  enum rte_proc_type_t eal_t = rte_eal_process_type();
470  if (eal_t == RTE_PROC_SECONDARY) {
471  SCLogWarning("%s. To avoid segmented mbufs, "
472  "try to increase mbuf size in your primary application",
473  warn_s);
474  } else if (eal_t == RTE_PROC_PRIMARY) {
475  SCLogWarning("%s. To avoid segmented mbufs, "
476  "try to increase MTU in your suricata.yaml",
477  warn_s);
478  }
479 
480  segmented_mbufs_warned = true;
481  }
482 }
483 
484 static void HandleShutdown(DPDKThreadVars *ptv)
485 {
486  SCLogDebug("Stopping Suricata!");
487  SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
488  while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
489  rte_delay_us(10);
490  }
491  if (ptv->queue_id == 0) {
492  rte_delay_us(20); // wait for all threads to get out of the sync loop
493  SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
494  // If Suricata runs in peered mode, the peer threads might still want to send
495  // packets to our port. Instead, we know, that we are done with the peered port, so
496  // we stop it. The peered threads will stop our port.
497  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
498  rte_eth_dev_stop(ptv->out_port_id);
499  } else {
500  // in IDS we stop our port - no peer threads are running
501  rte_eth_dev_stop(ptv->port_id);
502  }
503  }
504  DPDKDumpCounters(ptv);
505 }
506 
507 static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
508 {
509  static thread_local SCTime_t last_dump = { 0 };
510  SCTime_t current_time = TimeGet();
511  /* Trigger one dump of stats every second */
512  if (current_time.secs != last_dump.secs) {
513  DPDKDumpCounters(ptv);
514  last_dump = current_time;
515  }
516 }
517 
518 /**
519  * \brief Main DPDK reading Loop function
520  */
521 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
522 {
523  SCEnter();
524  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
525  ptv->slot = ((TmSlot *)slot)->slot_next;
526  TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
527  if (ret != TM_ECODE_OK) {
528  SCReturnInt(ret);
529  }
530  while (true) {
531  if (unlikely(suricata_ctl_flags != 0)) {
532  HandleShutdown(ptv);
533  break;
534  }
535 
536  uint16_t nb_rx =
537  rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
538  if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
539  continue;
540  }
541 
542  ptv->pkts += (uint64_t)nb_rx;
543  for (uint16_t i = 0; i < nb_rx; i++) {
544  Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
545  if (p == NULL) {
546  rte_pktmbuf_free(ptv->received_mbufs[i]);
547  continue;
548  }
549  DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
550  PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
551  rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
552  if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
553  TmqhOutputPacketpool(ptv->tv, p);
554  DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
555  SCReturnInt(EXIT_FAILURE);
556  }
557  }
558 
559  PeriodicDPDKDumpCounters(ptv);
561  }
562 
564 }
565 
566 /**
567  * \brief Init function for ReceiveDPDK.
568  *
569  * \param tv pointer to ThreadVars
570  * \param initdata pointer to the interface passed from the user
571  * \param data pointer gets populated with DPDKThreadVars
572  *
573  */
574 static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
575 {
576  SCEnter();
577  int retval, thread_numa;
578  DPDKThreadVars *ptv = NULL;
579  DPDKIfaceConfig *dpdk_config = (DPDKIfaceConfig *)initdata;
580 
581  if (initdata == NULL) {
582  SCLogError("DPDK configuration is NULL in thread initialization");
583  goto fail;
584  }
585 
586  ptv = SCCalloc(1, sizeof(DPDKThreadVars));
587  if (unlikely(ptv == NULL)) {
588  SCLogError("Unable to allocate memory");
589  goto fail;
590  }
591 
592  ptv->tv = tv;
593  ptv->pkts = 0;
594  ptv->bytes = 0;
595  ptv->livedev = LiveGetDevice(dpdk_config->iface);
596 
597  ptv->capture_dpdk_packets = StatsRegisterCounter("capture.packets", ptv->tv);
598  ptv->capture_dpdk_rx_errs = StatsRegisterCounter("capture.rx_errors", ptv->tv);
599  ptv->capture_dpdk_tx_errs = StatsRegisterCounter("capture.tx_errors", ptv->tv);
600  ptv->capture_dpdk_imissed = StatsRegisterCounter("capture.dpdk.imissed", ptv->tv);
601  ptv->capture_dpdk_rx_no_mbufs = StatsRegisterCounter("capture.dpdk.no_mbufs", ptv->tv);
602  ptv->capture_dpdk_ierrors = StatsRegisterCounter("capture.dpdk.ierrors", ptv->tv);
603 
604  ptv->copy_mode = dpdk_config->copy_mode;
605  ptv->checksum_mode = dpdk_config->checksum_mode;
606 
607  ptv->threads = dpdk_config->threads;
608  ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) ? true : false;
609  ptv->port_id = dpdk_config->port_id;
610  ptv->out_port_id = dpdk_config->out_port_id;
611  ptv->port_socket_id = dpdk_config->socket_id;
612  // pass the pointer to the mempool and then forget about it. Mempool is freed in thread deinit.
613  ptv->pkt_mempool = dpdk_config->pkt_mempool;
614  dpdk_config->pkt_mempool = NULL;
615 
616  thread_numa = GetNumaNode();
617  if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
618  thread_numa != ptv->port_socket_id) {
619  SC_ATOMIC_ADD(dpdk_config->inconsistent_numa_cnt, 1);
620  SCLogPerf("%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
621  ptv->port_socket_id, thread_numa);
622  }
623 
624  ptv->workers_sync = dpdk_config->workers_sync;
625  uint16_t queue_id = SC_ATOMIC_ADD(dpdk_config->queue_id, 1);
626  ptv->queue_id = queue_id;
627 
628  // the last thread starts the device
629  if (queue_id == dpdk_config->threads - 1) {
630  retval = rte_eth_dev_start(ptv->port_id);
631  if (retval < 0) {
632  SCLogError("%s: error (%s) during device startup", dpdk_config->iface,
633  rte_strerror(-retval));
634  goto fail;
635  }
636 
637  struct rte_eth_dev_info dev_info;
638  retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
639  if (retval != 0) {
640  SCLogError("%s: error (%s) when getting device info", dpdk_config->iface,
641  rte_strerror(-retval));
642  goto fail;
643  }
644 
645  // some PMDs requires additional actions only after the device has started
646  DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
647 
648  uint16_t inconsistent_numa_cnt = SC_ATOMIC_GET(dpdk_config->inconsistent_numa_cnt);
649  if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
650  SCLogWarning("%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
651  dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
652  } else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
653  SCLogNotice(
654  "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
655  dpdk_config->iface);
656  }
657  if (ptv->intr_enabled) {
658  rte_spinlock_init(&intr_lock[ptv->port_id]);
659  }
660  }
661 
662  *data = (void *)ptv;
663  dpdk_config->DerefFunc(dpdk_config);
665 
666 fail:
667  if (dpdk_config != NULL)
668  dpdk_config->DerefFunc(dpdk_config);
669  if (ptv != NULL)
670  SCFree(ptv);
672 }
673 
674 static void PrintDPDKPortXstats(uint32_t port_id, const char *port_name)
675 {
676  struct rte_eth_xstat *xstats;
677  struct rte_eth_xstat_name *xstats_names;
678 
679  int32_t len = rte_eth_xstats_get(port_id, NULL, 0);
680  if (len < 0)
681  FatalError("Error (%s) getting count of rte_eth_xstats failed on port %s",
682  rte_strerror(-len), port_name);
683 
684  xstats = SCCalloc(len, sizeof(*xstats));
685  if (xstats == NULL)
686  FatalError("Failed to allocate memory for the rte_eth_xstat structure");
687 
688  int32_t ret = rte_eth_xstats_get(port_id, xstats, len);
689  if (ret < 0 || ret > len) {
690  SCFree(xstats);
691  FatalError("Error (%s) getting rte_eth_xstats failed on port %s", rte_strerror(-ret),
692  port_name);
693  }
694  xstats_names = SCCalloc(len, sizeof(*xstats_names));
695  if (xstats_names == NULL) {
696  SCFree(xstats);
697  FatalError("Failed to allocate memory for the rte_eth_xstat_name array");
698  }
699  ret = rte_eth_xstats_get_names(port_id, xstats_names, len);
700  if (ret < 0 || ret > len) {
701  SCFree(xstats);
702  SCFree(xstats_names);
703  FatalError("Error (%s) getting names of rte_eth_xstats failed on port %s",
704  rte_strerror(-ret), port_name);
705  }
706  for (int32_t i = 0; i < len; i++) {
707  if (xstats[i].value > 0)
708  SCLogPerf("Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].name,
709  xstats[i].value);
710  }
711 
712  SCFree(xstats);
713  SCFree(xstats_names);
714 }
715 
716 /**
717  * \brief This function prints stats to the screen at exit.
718  * \param tv pointer to ThreadVars
719  * \param data pointer that gets cast into DPDKThreadVars for ptv
720  */
721 static void ReceiveDPDKThreadExitStats(ThreadVars *tv, void *data)
722 {
723  SCEnter();
724  int retval;
725  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
726 
727  if (ptv->queue_id == 0) {
728  struct rte_eth_stats eth_stats;
729  PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
730  retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
731  if (unlikely(retval != 0)) {
732  SCLogError("%s: failed to get stats (%s)", ptv->livedev->dev, strerror(-retval));
733  SCReturn;
734  }
735  SCLogPerf("%s: total RX stats: packets %" PRIu64 " bytes: %" PRIu64 " missed: %" PRIu64
736  " errors: %" PRIu64 " nombufs: %" PRIu64,
737  ptv->livedev->dev, eth_stats.ipackets, eth_stats.ibytes, eth_stats.imissed,
738  eth_stats.ierrors, eth_stats.rx_nombuf);
739  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS)
740  SCLogPerf("%s: total TX stats: packets %" PRIu64 " bytes: %" PRIu64 " errors: %" PRIu64,
741  ptv->livedev->dev, eth_stats.opackets, eth_stats.obytes, eth_stats.oerrors);
742  }
743 
744  DPDKDumpCounters(ptv);
745  SCLogPerf("(%s) received packets %" PRIu64, tv->name, ptv->pkts);
746 }
747 
748 /**
749  * \brief DeInit function closes dpdk at exit.
750  * \param tv pointer to ThreadVars
751  * \param data pointer that gets cast into DPDKThreadVars for ptv
752  */
753 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *tv, void *data)
754 {
755  SCEnter();
756  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
757 
758  if (ptv->queue_id == 0) {
759  struct rte_eth_dev_info dev_info;
760  int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
761  if (retval != 0) {
762  SCLogError("%s: error (%s) when getting device info", ptv->livedev->dev,
763  rte_strerror(-retval));
765  }
766 
767  DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
768 
769  if (ptv->workers_sync) {
770  SCFree(ptv->workers_sync);
771  }
772  }
773 
774  ptv->pkt_mempool = NULL; // MP is released when device is closed
775 
776  SCFree(ptv);
778 }
779 
780 /**
781  * \brief This function passes off to link type decoders.
782  *
783  * DecodeDPDK decodes packets from DPDK and passes
784  * them off to the proper link type decoder.
785  *
786  * \param t pointer to ThreadVars
787  * \param p pointer to the current packet
788  * \param data pointer that gets cast into DPDKThreadVars for ptv
789  */
790 static TmEcode DecodeDPDK(ThreadVars *tv, Packet *p, void *data)
791 {
792  SCEnter();
794 
796 
797  /* update counters */
799 
800  /* If suri has set vlan during reading, we increase vlan counter */
801  if (p->vlan_idx) {
803  }
804 
805  /* call the decoder */
806  DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
807 
809 
811 }
812 
813 static TmEcode DecodeDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
814 {
815  SCEnter();
816  DecodeThreadVars *dtv = NULL;
817 
819 
820  if (dtv == NULL)
822 
824 
825  *data = (void *)dtv;
826 
828 }
829 
830 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data)
831 {
832  SCEnter();
833  if (data != NULL)
834  DecodeThreadVarsFree(tv, data);
836 }
837 
838 #endif /* HAVE_DPDK */
839 /* eof */
840 /**
841  * @}
842  */
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:73
PacketL4::csum_set
bool csum_set
Definition: decode.h:441
PacketCheckAction
bool PacketCheckAction(const Packet *p, const uint8_t a)
Definition: packet.c:49
tm-threads.h
len
uint8_t len
Definition: app-layer-dnp3.h:2
TMM_RECEIVEDPDK
@ TMM_RECEIVEDPDK
Definition: tm-threads-common.h:56
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:166
CHECKSUM_VALIDATION_OFFLOAD
@ CHECKSUM_VALIDATION_OFFLOAD
Definition: decode.h:47
offset
uint64_t offset
Definition: util-streaming-buffer.h:0
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
PacketFreeOrRelease
void PacketFreeOrRelease(Packet *p)
Return a packet to where it was allocated.
Definition: decode.c:250
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
PKT_IS_PSEUDOPKT
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition: decode.h:1321
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
DPDK_COPY_MODE_IPS
@ DPDK_COPY_MODE_IPS
Definition: source-dpdk.h:33
PacketL4::csum
uint16_t csum
Definition: decode.h:442
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:101
TMM_DECODEDPDK
@ TMM_DECODEDPDK
Definition: tm-threads-common.h:57
action-globals.h
Packet_::flags
uint32_t flags
Definition: decode.h:513
DpdkCopyModeEnum
DpdkCopyModeEnum
Definition: source-dpdk.h:33
threads.h
Packet_::vlan_idx
uint8_t vlan_idx
Definition: decode.h:504
LiveDevice_
Definition: util-device.h:50
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
StatsSetUI64
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:207
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
NoDPDKSupportExit
TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **)
this function prints an error message and exits.
Definition: source-dpdk.c:78
util-privs.h
CHECKSUM_VALIDATION_DISABLE
@ CHECKSUM_VALIDATION_DISABLE
Definition: decode.h:42
PacketDecodeFinalize
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition: decode.c:206
DPDKIfaceConfig_
Definition: source-dpdk.h:52
util-dpdk-ice.h
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:314
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:81
tmqh-packetpool.h
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:80
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
Packet_::datalink
int datalink
Definition: decode.h:608
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1325
DPDKWorkerSync_
Definition: source-dpdk.h:47
DecodeRegisterPerfCounters
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:602
TmModuleReceiveDPDKRegister
void TmModuleReceiveDPDKRegister(void)
Definition: source-dpdk.c:50
decode.h
PKT_SRC_WIRE
@ PKT_SRC_WIRE
Definition: decode.h:51
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
Packet_::ts
SCTime_t ts
Definition: decode.h:524
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
LiveGetDevice
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
Definition: util-device.c:248
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
GET_PKT_DATA
#define GET_PKT_DATA(p)
Definition: decode.h:205
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:309
PacketPoolWait
void PacketPoolWait(void)
Definition: tmqh-packetpool.c:80
SCReturn
#define SCReturn
Definition: util-debug.h:273
Packet_
Definition: decode.h:476
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
type
uint16_t type
Definition: decode-vlan.c:106
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:29
GET_PKT_LEN
#define GET_PKT_LEN(p)
Definition: decode.h:204
TimeGet
SCTime_t TimeGet(void)
Definition: util-time.c:152
Packet_::l4
struct PacketL4 l4
Definition: decode.h:570
TmSlot_
Definition: tm-threads.h:53
PKT_IGNORE_CHECKSUM
#define PKT_IGNORE_CHECKSUM
Definition: decode.h:1282
SCTime_t
Definition: util-time.h:40
Packet_::livedev
struct LiveDevice_ * livedev
Definition: decode.h:587
DPDK_BURST_TX_WAIT_US
#define DPDK_BURST_TX_WAIT_US
Definition: source-dpdk.h:35
TmEcode
TmEcode
Definition: tm-threads-common.h:79
name
const char * name
Definition: tm-threads.c:2081
TmModule_::name
const char * name
Definition: tm-modules.h:44
DecodeThreadVars_::counter_vlan
uint16_t counter_vlan
Definition: decode.h:969
runmodes.h
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
dtv
DecodeThreadVars * dtv
Definition: fuzz_decodepcapfile.c:33
PacketL3::csum_set
bool csum_set
Definition: decode.h:411
DPDK_IRQ_MODE
#define DPDK_IRQ_MODE
Definition: source-dpdk.h:41
Packet_::ReleasePacket
void(* ReleasePacket)(struct Packet_ *)
Definition: decode.h:560
util-dpdk.h
flags
uint8_t flags
Definition: decode-gre.h:0
DecodeThreadVarsFree
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:792
source-dpdk.h
ChecksumValidationMode
ChecksumValidationMode
Definition: decode.h:41
suricata-common.h
packet.h
ACTION_DROP
#define ACTION_DROP
Definition: action-globals.h:30
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
FatalError
#define FatalError(...)
Definition: util-debug.h:502
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
threadvars.h
Packet_::l3
struct PacketL3 l3
Definition: decode.h:569
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
SCFree
#define SCFree(p)
Definition: util-mem.h:61
DecodeThreadVars_
Structure to hold thread specific data for all decode modules.
Definition: decode.h:932
util-dpdk-bonding.h
util-dpdk-mlx5.h
DecodeThreadVarsAlloc
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:774
util-dpdk-ixgbe.h
PacketSetData
int PacketSetData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zero copy is used.
Definition: decode.c:812
util-dpdk-i40e.h
suricata.h
PacketL3::csum
uint16_t csum
Definition: decode.h:412
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition: counters.c:449
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
TmModuleDecodeDPDKRegister
void TmModuleDecodeDPDKRegister(void)
Registration Function for DecodeDPDK.
Definition: source-dpdk.c:64
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:951
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:275
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:267
SC_CAP_NET_RAW
#define SC_CAP_NET_RAW
Definition: util-privs.h:32
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:76
DPDK_COPY_MODE_TAP
@ DPDK_COPY_MODE_TAP
Definition: source-dpdk.h:33
DecodeUpdatePacketCounters
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition: decode.c:740
LINKTYPE_ETHERNET
#define LINKTYPE_ETHERNET
Definition: decode.h:1233
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:169