suricata
source-dpdk.c
Go to the documentation of this file.
1 /* Copyright (C) 2021 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \defgroup dpdk DPDK running mode
20  *
21  * @{
22  */
23 
24 /**
25  * \file
26  *
27  * \author Lukas Sismis <lukas.sismis@gmail.com>
28  *
29  * DPDK capture interface
30  *
31  */
32 
33 #include "suricata-common.h"
34 #include "runmodes.h"
35 #include "decode.h"
36 #include "packet.h"
37 #include "source-dpdk.h"
38 #include "suricata.h"
39 #include "threads.h"
40 #include "threadvars.h"
41 #include "tm-threads.h"
42 #include "tmqh-packetpool.h"
43 #include "util-privs.h"
44 #include "action-globals.h"
45 
46 #ifndef HAVE_DPDK
47 
48 TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **);
49 
51 {
52  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
59 }
60 
61 /**
62  * \brief Registration Function for DecodeDPDK.
63  */
65 {
66  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
73 }
74 
75 /**
76  * \brief this function prints an error message and exits.
77  */
78 TmEcode NoDPDKSupportExit(ThreadVars *tv, const void *initdata, void **data)
79 {
80  FatalError("Error creating thread %s: you do not have "
81  "support for DPDK enabled, on Linux host please recompile "
82  "with --enable-dpdk",
83  tv->name);
84 }
85 
86 #else /* We have DPDK support */
87 
88 #include "util-affinity.h"
89 #include "util-dpdk.h"
90 #include "util-dpdk-i40e.h"
91 #include "util-dpdk-bonding.h"
92 #include <numa.h>
93 
94 #define BURST_SIZE 32
95 static struct timeval machine_start_time = { 0, 0 };
96 // interrupt mode constants
97 #define MIN_ZERO_POLL_COUNT 10U
98 #define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
99 #define MINIMUM_SLEEP_TIME_US 1U
100 #define STANDARD_SLEEP_TIME_US 100U
101 #define MAX_EPOLL_TIMEOUT_MS 500U
102 static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
103 
104 /**
105  * \brief Structure to hold thread specific variables.
106  */
107 typedef struct DPDKThreadVars_ {
108  /* counters */
109  uint64_t pkts;
110  ThreadVars *tv;
111  TmSlot *slot;
112  LiveDevice *livedev;
113  ChecksumValidationMode checksum_mode;
114  bool intr_enabled;
115  /* references to packet and drop counters */
116  uint16_t capture_dpdk_packets;
117  uint16_t capture_dpdk_rx_errs;
118  uint16_t capture_dpdk_imissed;
119  uint16_t capture_dpdk_rx_no_mbufs;
120  uint16_t capture_dpdk_ierrors;
121  uint16_t capture_dpdk_tx_errs;
122  unsigned int flags;
123  int threads;
124  /* for IPS */
125  DpdkCopyModeEnum copy_mode;
126  uint16_t out_port_id;
127  /* Entry in the peers_list */
128 
129  uint64_t bytes;
130  uint64_t accepted;
131  uint64_t dropped;
132  uint16_t port_id;
133  uint16_t queue_id;
134  int32_t port_socket_id;
135  struct rte_mempool *pkt_mempool;
136  struct rte_mbuf *received_mbufs[BURST_SIZE];
137  DPDKWorkerSync *workers_sync;
138 } DPDKThreadVars;
139 
140 static TmEcode ReceiveDPDKThreadInit(ThreadVars *, const void *, void **);
141 static void ReceiveDPDKThreadExitStats(ThreadVars *, void *);
142 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *, void *);
143 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot);
144 
145 static TmEcode DecodeDPDKThreadInit(ThreadVars *, const void *, void **);
146 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data);
147 static TmEcode DecodeDPDK(ThreadVars *, Packet *, void *);
148 
149 static uint64_t CyclesToMicroseconds(uint64_t cycles);
150 static uint64_t CyclesToSeconds(uint64_t cycles);
151 static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset);
152 static uint64_t DPDKGetSeconds(void);
153 
154 static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
155 {
156  uint32_t event_data = port_id << UINT16_WIDTH | queue_id;
157  int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
158  RTE_INTR_EVENT_ADD, (void *)((uintptr_t)event_data));
159 
160  if (ret != 0) {
161  SCLogError("%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
162  queue_id, rte_strerror(-ret));
163  return false;
164  }
165  return true;
166 }
167 
168 static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
169 {
170  if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
171  return MINIMUM_SLEEP_TIME_US;
172 
173  return STANDARD_SLEEP_TIME_US;
174 }
175 
176 static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool on)
177 {
178  rte_spinlock_lock(&(intr_lock[port_id]));
179 
180  if (on)
181  rte_eth_dev_rx_intr_enable(port_id, queue_id);
182  else
183  rte_eth_dev_rx_intr_disable(port_id, queue_id);
184 
185  rte_spinlock_unlock(&(intr_lock[port_id]));
186 }
187 
188 static inline void DPDKFreeMbufArray(
189  struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
190 {
191  for (int i = offset; i < mbuf_cnt; i++) {
192  rte_pktmbuf_free(mbuf_array[i]);
193  }
194 }
195 
196 static uint64_t CyclesToMicroseconds(const uint64_t cycles)
197 {
198  const uint64_t ticks_per_us = rte_get_tsc_hz() / 1000000;
199  if (ticks_per_us == 0) {
200  return 0;
201  }
202  return cycles / ticks_per_us;
203 }
204 
205 static uint64_t CyclesToSeconds(const uint64_t cycles)
206 {
207  const uint64_t ticks_per_s = rte_get_tsc_hz();
208  if (ticks_per_s == 0) {
209  return 0;
210  }
211  return cycles / ticks_per_s;
212 }
213 
214 static void CyclesAddToTimeval(
215  const uint64_t cycles, struct timeval *orig_tv, struct timeval *new_tv)
216 {
217  uint64_t usec = CyclesToMicroseconds(cycles) + orig_tv->tv_usec;
218  new_tv->tv_sec = orig_tv->tv_sec + usec / 1000000;
219  new_tv->tv_usec = (usec % 1000000);
220 }
221 
223 {
224  gettimeofday(&machine_start_time, NULL);
225  machine_start_time.tv_sec -= DPDKGetSeconds();
226 }
227 
228 /**
229  * Initializes real_tv to the correct real time. Adds TSC counter value to the timeval of
230  * the machine start
231  * @param machine_start_tv - timestamp when the machine was started
232  * @param real_tv
233  */
234 static SCTime_t DPDKSetTimevalReal(struct timeval *machine_start_tv)
235 {
236  struct timeval real_tv;
237  CyclesAddToTimeval(rte_get_tsc_cycles(), machine_start_tv, &real_tv);
238  return SCTIME_FROM_TIMEVAL(&real_tv);
239 }
240 
241 /* get number of seconds from the reset of TSC counter (typically from the machine start) */
242 static uint64_t DPDKGetSeconds(void)
243 {
244  return CyclesToSeconds(rte_get_tsc_cycles());
245 }
246 
247 static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
248 {
249  if (strcmp(driver_name, "net_bonding") == 0) {
250  driver_name = BondingDeviceDriverGet(ptv->port_id);
251  }
252 
253  // The PMD Driver i40e has a special way to set the RSS, it can be set via rte_flow rules
254  // and only after the start of the port
255  if (strcmp(driver_name, "net_i40e") == 0)
256  i40eDeviceSetRSS(ptv->port_id, ptv->threads);
257 }
258 
259 static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
260 {
261  if (strcmp(driver_name, "net_bonding") == 0) {
262  driver_name = BondingDeviceDriverGet(ptv->port_id);
263  }
264 
265  if (strcmp(driver_name, "net_i40e") == 0) {
266 #if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
267  // Flush the RSS rules that have been inserted in the post start section
268  struct rte_flow_error flush_error = { 0 };
269  int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
270  if (retval != 0) {
271  SCLogError("%s: unable to flush rte_flow rules: %s Flush error msg: %s",
272  ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
273  }
274 #endif /* RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0) */
275  }
276 }
277 
278 /**
279  * Attempts to retrieve NUMA node id on which the caller runs
280  * @return NUMA id on success, -1 otherwise
281  */
282 static int GetNumaNode(void)
283 {
284  int cpu = 0;
285  int node = -1;
286 
287 #if defined(__linux__)
288  cpu = sched_getcpu();
289  node = numa_node_of_cpu(cpu);
290 #else
291  SCLogWarning("NUMA node retrieval is not supported on this OS.");
292 #endif
293 
294  return node;
295 }
296 
297 /**
298  * \brief Registration Function for ReceiveDPDK.
299  * \todo Unit tests are needed for this module.
300  */
302 {
303  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
304  tmm_modules[TMM_RECEIVEDPDK].ThreadInit = ReceiveDPDKThreadInit;
306  tmm_modules[TMM_RECEIVEDPDK].PktAcqLoop = ReceiveDPDKLoop;
308  tmm_modules[TMM_RECEIVEDPDK].ThreadExitPrintStats = ReceiveDPDKThreadExitStats;
309  tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = ReceiveDPDKThreadDeinit;
312 }
313 
314 /**
315  * \brief Registration Function for DecodeDPDK.
316  * \todo Unit tests are needed for this module.
317  */
319 {
320  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
321  tmm_modules[TMM_DECODEDPDK].ThreadInit = DecodeDPDKThreadInit;
322  tmm_modules[TMM_DECODEDPDK].Func = DecodeDPDK;
324  tmm_modules[TMM_DECODEDPDK].ThreadDeinit = DecodeDPDKThreadDeinit;
327 }
328 
329 static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
330 {
331  /* Some NICs (e.g. Intel) do not support queue statistics and the drops can be fetched only on
332  * the port level. Therefore setting it to the first worker to have at least continuous update
333  * on the dropped packets. */
334  if (ptv->queue_id == 0) {
335  struct rte_eth_stats eth_stats;
336  int retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
337  if (unlikely(retval != 0)) {
338  SCLogError("%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
339  return;
340  }
341 
342  StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets,
343  ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
344  SC_ATOMIC_SET(ptv->livedev->pkts,
345  eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
346  StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_errs,
347  eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
348  StatsSetUI64(ptv->tv, ptv->capture_dpdk_imissed, eth_stats.imissed);
349  StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
350  StatsSetUI64(ptv->tv, ptv->capture_dpdk_ierrors, eth_stats.ierrors);
351  StatsSetUI64(ptv->tv, ptv->capture_dpdk_tx_errs, eth_stats.oerrors);
353  ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
354  } else {
355  StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets, ptv->pkts);
356  }
357 }
358 
359 static void DPDKReleasePacket(Packet *p)
360 {
361  int retval;
362  /* Need to be in copy mode and need to detect early release
363  where Ethernet header could not be set (and pseudo packet)
364  When enabling promiscuous mode on Intel cards, 2 ICMPv6 packets are generated.
365  These get into the infinite cycle between the NIC and the switch in some cases */
366  if ((p->dpdk_v.copy_mode == DPDK_COPY_MODE_TAP ||
367  (p->dpdk_v.copy_mode == DPDK_COPY_MODE_IPS && !PacketCheckAction(p, ACTION_DROP)))
368 #if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
369  && !(PacketIsICMPv6(p) && PacketGetICMPv6(p)->type == 143)
370 #endif
371  ) {
373  retval =
374  rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
375  // rte_eth_tx_burst can return only 0 (failure) or 1 (success) because we are only
376  // transmitting burst of size 1 and the function rte_eth_tx_burst returns number of
377  // successfully sent packets.
378  if (unlikely(retval < 1)) {
379  // sometimes a repeated transmit can help to send out the packet
380  rte_delay_us(DPDK_BURST_TX_WAIT_US);
381  retval = rte_eth_tx_burst(
382  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
383  if (unlikely(retval < 1)) {
384  SCLogDebug("Unable to transmit the packet on port %u queue %u",
385  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
386  rte_pktmbuf_free(p->dpdk_v.mbuf);
387  p->dpdk_v.mbuf = NULL;
388  }
389  }
390  } else {
391  rte_pktmbuf_free(p->dpdk_v.mbuf);
392  p->dpdk_v.mbuf = NULL;
393  }
394 
396 }
397 
398 static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
399 {
400  SCEnter();
401  // Indicate that the thread is actually running its application level
402  // code (i.e., it can poll packets)
404  PacketPoolWait();
405 
406  rte_eth_stats_reset(ptv->port_id);
407  rte_eth_xstats_reset(ptv->port_id);
408 
409  if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
411 
413 }
414 
415 static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
416 {
417  static thread_local uint64_t last_timeout_msec = 0;
418  SCTime_t t = DPDKSetTimevalReal(&machine_start_time);
419  uint64_t msecs = SCTIME_MSECS(t);
420  if (msecs > last_timeout_msec + 100) {
421  TmThreadsCaptureHandleTimeout(tv, NULL);
422  last_timeout_msec = msecs;
423  }
424 }
425 
426 /**
427  * \brief Decides if it should retry the packet poll or continue with the packet processing
428  * \return true if the poll should be retried, false otherwise
429  */
430 static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
431 {
432  static thread_local uint32_t zero_pkt_polls_cnt = 0;
433 
434  if (nb_rx > 0) {
435  zero_pkt_polls_cnt = 0;
436  return false;
437  }
438 
439  LoopHandleTimeoutOnIdle(tv);
440  if (!ptv->intr_enabled)
441  return true;
442 
443  zero_pkt_polls_cnt++;
444  if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
445  return true;
446 
447  uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
448  if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
449  rte_delay_us(pwd_idle_hint);
450  } else {
451  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
452  struct rte_epoll_event event;
453  rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
454  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
455  return true;
456  }
457 
458  return false;
459 }
460 
461 /**
462  * \brief Initializes a packet from an mbuf
463  * \return true if the packet was initialized successfully, false otherwise
464  */
465 static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
466 {
468  if (unlikely(p == NULL)) {
469  return NULL;
470  }
473  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
475  }
476 
477  p->ts = DPDKSetTimevalReal(&machine_start_time);
478  p->dpdk_v.mbuf = mbuf;
479  p->ReleasePacket = DPDKReleasePacket;
480  p->dpdk_v.copy_mode = ptv->copy_mode;
481  p->dpdk_v.out_port_id = ptv->out_port_id;
482  p->dpdk_v.out_queue_id = ptv->queue_id;
483  p->livedev = ptv->livedev;
484 
485  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
487  } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
488  uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
489  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
490  (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
491  SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
493  } else {
494  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
495  SCLogDebug("HW detected BAD IP checksum");
496  // chsum recalc will not be triggered but rule keyword check will be
497  p->l3.csum_set = true;
498  p->l3.csum = 0;
499  }
500  if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
501  SCLogDebug("HW detected BAD L4 chsum");
502  p->l4.csum_set = true;
503  p->l4.csum = 0;
504  }
505  }
506  }
507 
508  return p;
509 }
510 
511 static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
512 {
513  static thread_local bool segmented_mbufs_warned = false;
514  if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
515  char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
516  "Check your configuration or report the issue";
517  enum rte_proc_type_t eal_t = rte_eal_process_type();
518  if (eal_t == RTE_PROC_SECONDARY) {
519  SCLogWarning("%s. To avoid segmented mbufs, "
520  "try to increase mbuf size in your primary application",
521  warn_s);
522  } else if (eal_t == RTE_PROC_PRIMARY) {
523  SCLogWarning("%s. To avoid segmented mbufs, "
524  "try to increase MTU in your suricata.yaml",
525  warn_s);
526  }
527 
528  segmented_mbufs_warned = true;
529  }
530 }
531 
532 static void HandleShutdown(DPDKThreadVars *ptv)
533 {
534  SCLogDebug("Stopping Suricata!");
535  SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
536  while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
537  rte_delay_us(10);
538  }
539  if (ptv->queue_id == 0) {
540  rte_delay_us(20); // wait for all threads to get out of the sync loop
541  SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
542  // If Suricata runs in peered mode, the peer threads might still want to send
543  // packets to our port. Instead, we know, that we are done with the peered port, so
544  // we stop it. The peered threads will stop our port.
545  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
546  rte_eth_dev_stop(ptv->out_port_id);
547  } else {
548  // in IDS we stop our port - no peer threads are running
549  rte_eth_dev_stop(ptv->port_id);
550  }
551  }
552  DPDKDumpCounters(ptv);
553 }
554 
555 static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
556 {
557  static thread_local time_t last_dump = 0;
558  time_t current_time = DPDKGetSeconds();
559  /* Trigger one dump of stats every second */
560  if (current_time != last_dump) {
561  DPDKDumpCounters(ptv);
562  last_dump = current_time;
563  }
564 }
565 
566 /**
567  * \brief Main DPDK reading Loop function
568  */
569 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
570 {
571  SCEnter();
572  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
573  ptv->slot = ((TmSlot *)slot)->slot_next;
574  TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
575  if (ret != TM_ECODE_OK) {
576  SCReturnInt(ret);
577  }
578  while (true) {
579  if (unlikely(suricata_ctl_flags != 0)) {
580  HandleShutdown(ptv);
581  break;
582  }
583 
584  uint16_t nb_rx =
585  rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
586  if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
587  continue;
588  }
589 
590  ptv->pkts += (uint64_t)nb_rx;
591  for (uint16_t i = 0; i < nb_rx; i++) {
592  Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
593  if (p == NULL) {
594  rte_pktmbuf_free(ptv->received_mbufs[i]);
595  continue;
596  }
597  DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
598  PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
599  rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
600  if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
601  TmqhOutputPacketpool(ptv->tv, p);
602  DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
603  SCReturnInt(EXIT_FAILURE);
604  }
605  }
606 
607  PeriodicDPDKDumpCounters(ptv);
609  }
610 
612 }
613 
614 /**
615  * \brief Init function for ReceiveDPDK.
616  *
617  * \param tv pointer to ThreadVars
618  * \param initdata pointer to the interface passed from the user
619  * \param data pointer gets populated with DPDKThreadVars
620  *
621  */
622 static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
623 {
624  SCEnter();
625  int retval, thread_numa;
626  DPDKThreadVars *ptv = NULL;
627  DPDKIfaceConfig *dpdk_config = (DPDKIfaceConfig *)initdata;
628 
629  if (initdata == NULL) {
630  SCLogError("DPDK configuration is NULL in thread initialization");
631  goto fail;
632  }
633 
634  ptv = SCCalloc(1, sizeof(DPDKThreadVars));
635  if (unlikely(ptv == NULL)) {
636  SCLogError("Unable to allocate memory");
637  goto fail;
638  }
639 
640  ptv->tv = tv;
641  ptv->pkts = 0;
642  ptv->bytes = 0;
643  ptv->livedev = LiveGetDevice(dpdk_config->iface);
644 
645  ptv->capture_dpdk_packets = StatsRegisterCounter("capture.packets", ptv->tv);
646  ptv->capture_dpdk_rx_errs = StatsRegisterCounter("capture.rx_errors", ptv->tv);
647  ptv->capture_dpdk_tx_errs = StatsRegisterCounter("capture.tx_errors", ptv->tv);
648  ptv->capture_dpdk_imissed = StatsRegisterCounter("capture.dpdk.imissed", ptv->tv);
649  ptv->capture_dpdk_rx_no_mbufs = StatsRegisterCounter("capture.dpdk.no_mbufs", ptv->tv);
650  ptv->capture_dpdk_ierrors = StatsRegisterCounter("capture.dpdk.ierrors", ptv->tv);
651 
652  ptv->copy_mode = dpdk_config->copy_mode;
653  ptv->checksum_mode = dpdk_config->checksum_mode;
654 
655  ptv->threads = dpdk_config->threads;
656  ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) ? true : false;
657  ptv->port_id = dpdk_config->port_id;
658  ptv->out_port_id = dpdk_config->out_port_id;
659  ptv->port_socket_id = dpdk_config->socket_id;
660  // pass the pointer to the mempool and then forget about it. Mempool is freed in thread deinit.
661  ptv->pkt_mempool = dpdk_config->pkt_mempool;
662  dpdk_config->pkt_mempool = NULL;
663 
664  thread_numa = GetNumaNode();
665  if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
666  thread_numa != ptv->port_socket_id) {
667  SC_ATOMIC_ADD(dpdk_config->inconsistent_numa_cnt, 1);
668  SCLogPerf("%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
669  ptv->port_socket_id, thread_numa);
670  }
671 
672  ptv->workers_sync = dpdk_config->workers_sync;
673  uint16_t queue_id = SC_ATOMIC_ADD(dpdk_config->queue_id, 1);
674  ptv->queue_id = queue_id;
675 
676  // the last thread starts the device
677  if (queue_id == dpdk_config->threads - 1) {
678  retval = rte_eth_dev_start(ptv->port_id);
679  if (retval < 0) {
680  SCLogError("%s: error (%s) during device startup", dpdk_config->iface,
681  rte_strerror(-retval));
682  goto fail;
683  }
684 
685  struct rte_eth_dev_info dev_info;
686  retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
687  if (retval != 0) {
688  SCLogError("%s: error (%s) when getting device info", dpdk_config->iface,
689  rte_strerror(-retval));
690  goto fail;
691  }
692 
693  // some PMDs requires additional actions only after the device has started
694  DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
695 
696  uint16_t inconsistent_numa_cnt = SC_ATOMIC_GET(dpdk_config->inconsistent_numa_cnt);
697  if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
698  SCLogWarning("%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
699  dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
700  } else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
701  SCLogNotice(
702  "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
703  dpdk_config->iface);
704  }
705  if (ptv->intr_enabled) {
706  rte_spinlock_init(&intr_lock[ptv->port_id]);
707  }
708  }
709 
710  *data = (void *)ptv;
711  dpdk_config->DerefFunc(dpdk_config);
713 
714 fail:
715  if (dpdk_config != NULL)
716  dpdk_config->DerefFunc(dpdk_config);
717  if (ptv != NULL)
718  SCFree(ptv);
720 }
721 
722 static void PrintDPDKPortXstats(uint32_t port_id, const char *port_name)
723 {
724  struct rte_eth_xstat *xstats;
725  struct rte_eth_xstat_name *xstats_names;
726 
727  int32_t len = rte_eth_xstats_get(port_id, NULL, 0);
728  if (len < 0)
729  FatalError("Error (%s) getting count of rte_eth_xstats failed on port %s",
730  rte_strerror(-len), port_name);
731 
732  xstats = SCCalloc(len, sizeof(*xstats));
733  if (xstats == NULL)
734  FatalError("Failed to allocate memory for the rte_eth_xstat structure");
735 
736  int32_t ret = rte_eth_xstats_get(port_id, xstats, len);
737  if (ret < 0 || ret > len) {
738  SCFree(xstats);
739  FatalError("Error (%s) getting rte_eth_xstats failed on port %s", rte_strerror(-ret),
740  port_name);
741  }
742  xstats_names = SCCalloc(len, sizeof(*xstats_names));
743  if (xstats_names == NULL) {
744  SCFree(xstats);
745  FatalError("Failed to allocate memory for the rte_eth_xstat_name array");
746  }
747  ret = rte_eth_xstats_get_names(port_id, xstats_names, len);
748  if (ret < 0 || ret > len) {
749  SCFree(xstats);
750  SCFree(xstats_names);
751  FatalError("Error (%s) getting names of rte_eth_xstats failed on port %s",
752  rte_strerror(-ret), port_name);
753  }
754  for (int32_t i = 0; i < len; i++) {
755  if (xstats[i].value > 0)
756  SCLogPerf("Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].name,
757  xstats[i].value);
758  }
759 
760  SCFree(xstats);
761  SCFree(xstats_names);
762 }
763 
764 /**
765  * \brief This function prints stats to the screen at exit.
766  * \param tv pointer to ThreadVars
767  * \param data pointer that gets cast into DPDKThreadVars for ptv
768  */
769 static void ReceiveDPDKThreadExitStats(ThreadVars *tv, void *data)
770 {
771  SCEnter();
772  int retval;
773  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
774 
775  if (ptv->queue_id == 0) {
776  struct rte_eth_stats eth_stats;
777  PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
778  retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
779  if (unlikely(retval != 0)) {
780  SCLogError("%s: failed to get stats (%s)", ptv->livedev->dev, strerror(-retval));
781  SCReturn;
782  }
783  SCLogPerf("%s: total RX stats: packets %" PRIu64 " bytes: %" PRIu64 " missed: %" PRIu64
784  " errors: %" PRIu64 " nombufs: %" PRIu64,
785  ptv->livedev->dev, eth_stats.ipackets, eth_stats.ibytes, eth_stats.imissed,
786  eth_stats.ierrors, eth_stats.rx_nombuf);
787  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS)
788  SCLogPerf("%s: total TX stats: packets %" PRIu64 " bytes: %" PRIu64 " errors: %" PRIu64,
789  ptv->livedev->dev, eth_stats.opackets, eth_stats.obytes, eth_stats.oerrors);
790  }
791 
792  DPDKDumpCounters(ptv);
793  SCLogPerf("(%s) received packets %" PRIu64, tv->name, ptv->pkts);
794 }
795 
796 /**
797  * \brief DeInit function closes dpdk at exit.
798  * \param tv pointer to ThreadVars
799  * \param data pointer that gets cast into DPDKThreadVars for ptv
800  */
801 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *tv, void *data)
802 {
803  SCEnter();
804  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
805 
806  if (ptv->queue_id == 0) {
807  struct rte_eth_dev_info dev_info;
808  int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
809  if (retval != 0) {
810  SCLogError("%s: error (%s) when getting device info", ptv->livedev->dev,
811  rte_strerror(-retval));
813  }
814 
815  DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
816 
817  if (ptv->workers_sync) {
818  SCFree(ptv->workers_sync);
819  }
820  }
821 
822  ptv->pkt_mempool = NULL; // MP is released when device is closed
823 
824  SCFree(ptv);
826 }
827 
828 /**
829  * \brief This function passes off to link type decoders.
830  *
831  * DecodeDPDK decodes packets from DPDK and passes
832  * them off to the proper link type decoder.
833  *
834  * \param t pointer to ThreadVars
835  * \param p pointer to the current packet
836  * \param data pointer that gets cast into DPDKThreadVars for ptv
837  */
838 static TmEcode DecodeDPDK(ThreadVars *tv, Packet *p, void *data)
839 {
840  SCEnter();
842 
844 
845  /* update counters */
847 
848  /* If suri has set vlan during reading, we increase vlan counter */
849  if (p->vlan_idx) {
851  }
852 
853  /* call the decoder */
854  DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
855 
857 
859 }
860 
861 static TmEcode DecodeDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
862 {
863  SCEnter();
864  DecodeThreadVars *dtv = NULL;
865 
867 
868  if (dtv == NULL)
870 
872 
873  *data = (void *)dtv;
874 
876 }
877 
878 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data)
879 {
880  SCEnter();
881  if (data != NULL)
882  DecodeThreadVarsFree(tv, data);
884 }
885 
886 #endif /* HAVE_DPDK */
887 /* eof */
888 /**
889  * @}
890  */
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:74
PacketL4::csum_set
bool csum_set
Definition: decode.h:447
PacketCheckAction
bool PacketCheckAction(const Packet *p, const uint8_t a)
Definition: packet.c:48
tm-threads.h
len
uint8_t len
Definition: app-layer-dnp3.h:2
TMM_RECEIVEDPDK
@ TMM_RECEIVEDPDK
Definition: tm-threads-common.h:58
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:166
CHECKSUM_VALIDATION_OFFLOAD
@ CHECKSUM_VALIDATION_OFFLOAD
Definition: decode.h:51
offset
uint64_t offset
Definition: util-streaming-buffer.h:0
DPDKSetTimevalOfMachineStart
void DPDKSetTimevalOfMachineStart(void)
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
PacketFreeOrRelease
void PacketFreeOrRelease(Packet *p)
Return a packet to where it was allocated.
Definition: decode.c:250
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
PKT_IS_PSEUDOPKT
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition: decode.h:1334
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
DPDK_COPY_MODE_IPS
@ DPDK_COPY_MODE_IPS
Definition: source-dpdk.h:33
PacketL4::csum
uint16_t csum
Definition: decode.h:448
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:99
TMM_DECODEDPDK
@ TMM_DECODEDPDK
Definition: tm-threads-common.h:59
action-globals.h
Packet_::flags
uint32_t flags
Definition: decode.h:519
DpdkCopyModeEnum
DpdkCopyModeEnum
Definition: source-dpdk.h:33
threads.h
Packet_::vlan_idx
uint8_t vlan_idx
Definition: decode.h:510
LiveDevice_
Definition: util-device.h:50
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
StatsSetUI64
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:207
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
NoDPDKSupportExit
TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **)
this function prints an error message and exits.
Definition: source-dpdk.c:78
util-privs.h
CHECKSUM_VALIDATION_DISABLE
@ CHECKSUM_VALIDATION_DISABLE
Definition: decode.h:46
PacketDecodeFinalize
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition: decode.c:206
DPDKIfaceConfig_
Definition: source-dpdk.h:52
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:317
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:85
tmqh-packetpool.h
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:55
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:84
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:50
Packet_::datalink
int datalink
Definition: decode.h:622
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1337
DPDKWorkerSync_
Definition: source-dpdk.h:47
DecodeRegisterPerfCounters
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:602
TmModuleReceiveDPDKRegister
void TmModuleReceiveDPDKRegister(void)
Definition: source-dpdk.c:50
decode.h
PKT_SRC_WIRE
@ PKT_SRC_WIRE
Definition: decode.h:55
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:58
Packet_::ts
SCTime_t ts
Definition: decode.h:530
LiveGetDevice
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
Definition: util-device.c:248
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
GET_PKT_DATA
#define GET_PKT_DATA(p)
Definition: decode.h:214
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
util-affinity.h
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:79
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:53
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:300
PacketPoolWait
void PacketPoolWait(void)
Definition: tmqh-packetpool.c:80
SCReturn
#define SCReturn
Definition: util-debug.h:273
Packet_
Definition: decode.h:482
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
type
uint16_t type
Definition: decode-vlan.c:107
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:33
GET_PKT_LEN
#define GET_PKT_LEN(p)
Definition: decode.h:213
Packet_::l4
struct PacketL4 l4
Definition: decode.h:584
TmSlot_
Definition: tm-threads.h:53
PKT_IGNORE_CHECKSUM
#define PKT_IGNORE_CHECKSUM
Definition: decode.h:1297
SCTime_t
Definition: util-time.h:40
Packet_::livedev
struct LiveDevice_ * livedev
Definition: decode.h:601
DPDK_BURST_TX_WAIT_US
#define DPDK_BURST_TX_WAIT_US
Definition: source-dpdk.h:35
TmEcode
TmEcode
Definition: tm-threads-common.h:83
TmModule_::name
const char * name
Definition: tm-modules.h:45
DecodeThreadVars_::counter_vlan
uint16_t counter_vlan
Definition: decode.h:982
runmodes.h
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
dtv
DecodeThreadVars * dtv
Definition: fuzz_decodepcapfile.c:33
PacketL3::csum_set
bool csum_set
Definition: decode.h:417
DPDK_IRQ_MODE
#define DPDK_IRQ_MODE
Definition: source-dpdk.h:41
Packet_::ReleasePacket
void(* ReleasePacket)(struct Packet_ *)
Definition: decode.h:574
util-dpdk.h
flags
uint8_t flags
Definition: decode-gre.h:0
DecodeThreadVarsFree
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:791
source-dpdk.h
ChecksumValidationMode
ChecksumValidationMode
Definition: decode.h:45
suricata-common.h
packet.h
ACTION_DROP
#define ACTION_DROP
Definition: action-globals.h:30
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:48
FatalError
#define FatalError(...)
Definition: util-debug.h:502
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:49
threadvars.h
Packet_::l3
struct PacketL3 l3
Definition: decode.h:583
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
SCFree
#define SCFree(p)
Definition: util-mem.h:61
DecodeThreadVars_
Structure to hold thread specific data for all decode modules.
Definition: decode.h:946
util-dpdk-bonding.h
DecodeThreadVarsAlloc
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:773
PacketSetData
int PacketSetData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zero copy is used.
Definition: decode.c:811
util-dpdk-i40e.h
suricata.h
PacketL3::csum
uint16_t csum
Definition: decode.h:418
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition: counters.c:454
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
TmModuleDecodeDPDKRegister
void TmModuleDecodeDPDKRegister(void)
Registration Function for DecodeDPDK.
Definition: source-dpdk.c:64
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:961
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:275
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:267
SC_CAP_NET_RAW
#define SC_CAP_NET_RAW
Definition: util-privs.h:32
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:77
DPDK_COPY_MODE_TAP
@ DPDK_COPY_MODE_TAP
Definition: source-dpdk.h:33
DecodeUpdatePacketCounters
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition: decode.c:739
LINKTYPE_ETHERNET
#define LINKTYPE_ETHERNET
Definition: decode.h:1248
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:171