suricata
source-dpdk.c
Go to the documentation of this file.
1 /* Copyright (C) 2021 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \defgroup dpdk DPDK running mode
20  *
21  * @{
22  */
23 
24 /**
25  * \file
26  *
27  * \author Lukas Sismis <lukas.sismis@gmail.com>
28  *
29  * DPDK capture interface
30  *
31  */
32 
33 #include "suricata-common.h"
34 #include "runmodes.h"
35 #include "decode.h"
36 #include "packet.h"
37 #include "source-dpdk.h"
38 #include "suricata.h"
39 #include "threads.h"
40 #include "threadvars.h"
41 #include "tm-threads.h"
42 #include "tmqh-packetpool.h"
43 #include "util-privs.h"
44 #include "action-globals.h"
45 
46 #ifndef HAVE_DPDK
47 
48 TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **);
49 
51 {
52  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
59 }
60 
61 /**
62  * \brief Registration Function for DecodeDPDK.
63  */
65 {
66  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
73 }
74 
75 /**
76  * \brief this function prints an error message and exits.
77  */
78 TmEcode NoDPDKSupportExit(ThreadVars *tv, const void *initdata, void **data)
79 {
80  FatalError("Error creating thread %s: you do not have "
81  "support for DPDK enabled, on Linux host please recompile "
82  "with --enable-dpdk",
83  tv->name);
84 }
85 
86 #else /* We have DPDK support */
87 
88 #include "util-affinity.h"
89 #include "util-dpdk.h"
90 #include "util-dpdk-i40e.h"
91 #include "util-dpdk-bonding.h"
92 #include <numa.h>
93 
94 #define BURST_SIZE 32
95 static struct timeval machine_start_time = { 0, 0 };
96 // interrupt mode constants
97 #define MIN_ZERO_POLL_COUNT 10U
98 #define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
99 #define MINIMUM_SLEEP_TIME_US 1U
100 #define STANDARD_SLEEP_TIME_US 100U
101 #define MAX_EPOLL_TIMEOUT_MS 500U
102 static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
103 
104 /**
105  * \brief Structure to hold thread specific variables.
106  */
107 typedef struct DPDKThreadVars_ {
108  /* counters */
109  uint64_t pkts;
110  ThreadVars *tv;
111  TmSlot *slot;
112  LiveDevice *livedev;
113  ChecksumValidationMode checksum_mode;
114  bool intr_enabled;
115  /* references to packet and drop counters */
116  uint16_t capture_dpdk_packets;
117  uint16_t capture_dpdk_rx_errs;
118  uint16_t capture_dpdk_imissed;
119  uint16_t capture_dpdk_rx_no_mbufs;
120  uint16_t capture_dpdk_ierrors;
121  uint16_t capture_dpdk_tx_errs;
122  unsigned int flags;
123  int threads;
124  /* for IPS */
125  DpdkCopyModeEnum copy_mode;
126  uint16_t out_port_id;
127  /* Entry in the peers_list */
128 
129  uint64_t bytes;
130  uint64_t accepted;
131  uint64_t dropped;
132  uint16_t port_id;
133  uint16_t queue_id;
134  int32_t port_socket_id;
135  struct rte_mempool *pkt_mempool;
136  struct rte_mbuf *received_mbufs[BURST_SIZE];
137  DPDKWorkerSync *workers_sync;
138 } DPDKThreadVars;
139 
140 static TmEcode ReceiveDPDKThreadInit(ThreadVars *, const void *, void **);
141 static void ReceiveDPDKThreadExitStats(ThreadVars *, void *);
142 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *, void *);
143 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot);
144 
145 static TmEcode DecodeDPDKThreadInit(ThreadVars *, const void *, void **);
146 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data);
147 static TmEcode DecodeDPDK(ThreadVars *, Packet *, void *);
148 
149 static uint64_t CyclesToMicroseconds(uint64_t cycles);
150 static uint64_t CyclesToSeconds(uint64_t cycles);
151 static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset);
152 static uint64_t DPDKGetSeconds(void);
153 
154 static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
155 {
156  uint32_t event_data = port_id << UINT16_WIDTH | queue_id;
157  int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
158  RTE_INTR_EVENT_ADD, (void *)((uintptr_t)event_data));
159 
160  if (ret != 0) {
161  SCLogError("%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
162  queue_id, rte_strerror(-ret));
163  return false;
164  }
165  return true;
166 }
167 
168 static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
169 {
170  if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
171  return MINIMUM_SLEEP_TIME_US;
172 
173  return STANDARD_SLEEP_TIME_US;
174 }
175 
176 static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool on)
177 {
178  rte_spinlock_lock(&(intr_lock[port_id]));
179 
180  if (on)
181  rte_eth_dev_rx_intr_enable(port_id, queue_id);
182  else
183  rte_eth_dev_rx_intr_disable(port_id, queue_id);
184 
185  rte_spinlock_unlock(&(intr_lock[port_id]));
186 }
187 
188 static inline void DPDKFreeMbufArray(
189  struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
190 {
191  for (int i = offset; i < mbuf_cnt; i++) {
192  rte_pktmbuf_free(mbuf_array[i]);
193  }
194 }
195 
196 static uint64_t CyclesToMicroseconds(const uint64_t cycles)
197 {
198  const uint64_t ticks_per_us = rte_get_tsc_hz() / 1000000;
199  if (ticks_per_us == 0) {
200  return 0;
201  }
202  return cycles / ticks_per_us;
203 }
204 
205 static uint64_t CyclesToSeconds(const uint64_t cycles)
206 {
207  const uint64_t ticks_per_s = rte_get_tsc_hz();
208  if (ticks_per_s == 0) {
209  return 0;
210  }
211  return cycles / ticks_per_s;
212 }
213 
214 static void CyclesAddToTimeval(
215  const uint64_t cycles, struct timeval *orig_tv, struct timeval *new_tv)
216 {
217  uint64_t usec = CyclesToMicroseconds(cycles) + orig_tv->tv_usec;
218  new_tv->tv_sec = orig_tv->tv_sec + usec / 1000000;
219  new_tv->tv_usec = (usec % 1000000);
220 }
221 
223 {
224  gettimeofday(&machine_start_time, NULL);
225  machine_start_time.tv_sec -= DPDKGetSeconds();
226 }
227 
228 /**
229  * Initializes real_tv to the correct real time. Adds TSC counter value to the timeval of
230  * the machine start
231  * @param machine_start_tv - timestamp when the machine was started
232  * @param real_tv
233  */
234 static SCTime_t DPDKSetTimevalReal(struct timeval *machine_start_tv)
235 {
236  struct timeval real_tv;
237  CyclesAddToTimeval(rte_get_tsc_cycles(), machine_start_tv, &real_tv);
238  return SCTIME_FROM_TIMEVAL(&real_tv);
239 }
240 
241 /* get number of seconds from the reset of TSC counter (typically from the machine start) */
242 static uint64_t DPDKGetSeconds(void)
243 {
244  return CyclesToSeconds(rte_get_tsc_cycles());
245 }
246 
247 static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
248 {
249  if (strcmp(driver_name, "net_bonding") == 0) {
250  driver_name = BondingDeviceDriverGet(ptv->port_id);
251  }
252 
253  // The PMD Driver i40e has a special way to set the RSS, it can be set via rte_flow rules
254  // and only after the start of the port
255  if (strcmp(driver_name, "net_i40e") == 0)
256  i40eDeviceSetRSS(ptv->port_id, ptv->threads);
257 }
258 
259 static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
260 {
261  if (strcmp(driver_name, "net_bonding") == 0) {
262  driver_name = BondingDeviceDriverGet(ptv->port_id);
263  }
264 
265  if (strcmp(driver_name, "net_i40e") == 0) {
266 #if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
267  // Flush the RSS rules that have been inserted in the post start section
268  struct rte_flow_error flush_error = { 0 };
269  int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
270  if (retval != 0) {
271  SCLogError("%s: unable to flush rte_flow rules: %s Flush error msg: %s",
272  ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
273  }
274 #endif /* RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0) */
275  }
276 }
277 
278 /**
279  * Attempts to retrieve NUMA node id on which the caller runs
280  * @return NUMA id on success, -1 otherwise
281  */
282 static int GetNumaNode(void)
283 {
284  int cpu = 0;
285  int node = -1;
286 
287 #if defined(__linux__)
288  cpu = sched_getcpu();
289  node = numa_node_of_cpu(cpu);
290 #else
291  SCLogWarning("NUMA node retrieval is not supported on this OS.");
292 #endif
293 
294  return node;
295 }
296 
297 /**
298  * \brief Registration Function for ReceiveDPDK.
299  * \todo Unit tests are needed for this module.
300  */
302 {
303  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
304  tmm_modules[TMM_RECEIVEDPDK].ThreadInit = ReceiveDPDKThreadInit;
306  tmm_modules[TMM_RECEIVEDPDK].PktAcqLoop = ReceiveDPDKLoop;
308  tmm_modules[TMM_RECEIVEDPDK].ThreadExitPrintStats = ReceiveDPDKThreadExitStats;
309  tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = ReceiveDPDKThreadDeinit;
312 }
313 
314 /**
315  * \brief Registration Function for DecodeDPDK.
316  * \todo Unit tests are needed for this module.
317  */
319 {
320  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
321  tmm_modules[TMM_DECODEDPDK].ThreadInit = DecodeDPDKThreadInit;
322  tmm_modules[TMM_DECODEDPDK].Func = DecodeDPDK;
324  tmm_modules[TMM_DECODEDPDK].ThreadDeinit = DecodeDPDKThreadDeinit;
327 }
328 
329 static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
330 {
331  /* Some NICs (e.g. Intel) do not support queue statistics and the drops can be fetched only on
332  * the port level. Therefore setting it to the first worker to have at least continuous update
333  * on the dropped packets. */
334  if (ptv->queue_id == 0) {
335  struct rte_eth_stats eth_stats;
336  int retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
337  if (unlikely(retval != 0)) {
338  SCLogError("%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
339  return;
340  }
341 
342  StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets,
343  ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
344  SC_ATOMIC_SET(ptv->livedev->pkts,
345  eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
346  StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_errs,
347  eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
348  StatsSetUI64(ptv->tv, ptv->capture_dpdk_imissed, eth_stats.imissed);
349  StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
350  StatsSetUI64(ptv->tv, ptv->capture_dpdk_ierrors, eth_stats.ierrors);
351  StatsSetUI64(ptv->tv, ptv->capture_dpdk_tx_errs, eth_stats.oerrors);
353  ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
354  } else {
355  StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets, ptv->pkts);
356  }
357 }
358 
359 static void DPDKReleasePacket(Packet *p)
360 {
361  int retval;
362  /* Need to be in copy mode and need to detect early release
363  where Ethernet header could not be set (and pseudo packet)
364  When enabling promiscuous mode on Intel cards, 2 ICMPv6 packets are generated.
365  These get into the infinite cycle between the NIC and the switch in some cases */
366  if ((p->dpdk_v.copy_mode == DPDK_COPY_MODE_TAP ||
367  (p->dpdk_v.copy_mode == DPDK_COPY_MODE_IPS && !PacketCheckAction(p, ACTION_DROP)))
368 #if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
369  && !(PKT_IS_ICMPV6(p) && p->icmpv6h->type == 143)
370 #endif
371  ) {
373  retval =
374  rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
375  // rte_eth_tx_burst can return only 0 (failure) or 1 (success) because we are only
376  // transmitting burst of size 1 and the function rte_eth_tx_burst returns number of
377  // successfully sent packets.
378  if (unlikely(retval < 1)) {
379  // sometimes a repeated transmit can help to send out the packet
380  rte_delay_us(DPDK_BURST_TX_WAIT_US);
381  retval = rte_eth_tx_burst(
382  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
383  if (unlikely(retval < 1)) {
384  SCLogDebug("Unable to transmit the packet on port %u queue %u",
385  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
386  rte_pktmbuf_free(p->dpdk_v.mbuf);
387  p->dpdk_v.mbuf = NULL;
388  }
389  }
390  } else {
391  rte_pktmbuf_free(p->dpdk_v.mbuf);
392  p->dpdk_v.mbuf = NULL;
393  }
394 
396 }
397 
398 static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
399 {
400  SCEnter();
401  // Indicate that the thread is actually running its application level
402  // code (i.e., it can poll packets)
404  PacketPoolWait();
405 
406  rte_eth_stats_reset(ptv->port_id);
407  rte_eth_xstats_reset(ptv->port_id);
408 
409  if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
411 
413 }
414 
415 static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
416 {
417  static thread_local uint64_t last_timeout_msec = 0;
418  SCTime_t t = DPDKSetTimevalReal(&machine_start_time);
419  uint64_t msecs = SCTIME_MSECS(t);
420  if (msecs > last_timeout_msec + 100) {
421  TmThreadsCaptureHandleTimeout(tv, NULL);
422  last_timeout_msec = msecs;
423  }
424 }
425 
426 /**
427  * \brief Decides if it should retry the packet poll or continue with the packet processing
428  * \return true if the poll should be retried, false otherwise
429  */
430 static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
431 {
432  static thread_local uint32_t zero_pkt_polls_cnt = 0;
433 
434  if (nb_rx > 0) {
435  zero_pkt_polls_cnt = 0;
436  return false;
437  }
438 
439  LoopHandleTimeoutOnIdle(tv);
440  if (!ptv->intr_enabled)
441  return true;
442 
443  zero_pkt_polls_cnt++;
444  if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
445  return true;
446 
447  uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
448  if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
449  rte_delay_us(pwd_idle_hint);
450  } else {
451  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
452  struct rte_epoll_event event;
453  rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
454  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
455  return true;
456  }
457 
458  return false;
459 }
460 
461 /**
462  * \brief Initializes a packet from an mbuf
463  * \return true if the packet was initialized successfully, false otherwise
464  */
465 static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
466 {
468  if (unlikely(p == NULL)) {
469  return NULL;
470  }
473  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
475  }
476 
477  p->ts = DPDKSetTimevalReal(&machine_start_time);
478  p->dpdk_v.mbuf = mbuf;
479  p->ReleasePacket = DPDKReleasePacket;
480  p->dpdk_v.copy_mode = ptv->copy_mode;
481  p->dpdk_v.out_port_id = ptv->out_port_id;
482  p->dpdk_v.out_queue_id = ptv->queue_id;
483  p->livedev = ptv->livedev;
484 
485  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
487  } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
488  uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
489  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
490  (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
491  SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
493  } else {
494  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
495  SCLogDebug("HW detected BAD IP checksum");
496  // chsum recalc will not be triggered but rule keyword check will be
497  p->level3_comp_csum = 0;
498  }
499  if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
500  SCLogDebug("HW detected BAD L4 chsum");
501  p->level4_comp_csum = 0;
502  }
503  }
504  }
505 
506  return p;
507 }
508 
509 static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
510 {
511  static thread_local bool segmented_mbufs_warned = false;
512  if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
513  char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
514  "Check your configuration or report the issue";
515  enum rte_proc_type_t eal_t = rte_eal_process_type();
516  if (eal_t == RTE_PROC_SECONDARY) {
517  SCLogWarning("%s. To avoid segmented mbufs, "
518  "try to increase mbuf size in your primary application",
519  warn_s);
520  } else if (eal_t == RTE_PROC_PRIMARY) {
521  SCLogWarning("%s. To avoid segmented mbufs, "
522  "try to increase MTU in your suricata.yaml",
523  warn_s);
524  }
525 
526  segmented_mbufs_warned = true;
527  }
528 }
529 
530 static void HandleShutdown(DPDKThreadVars *ptv)
531 {
532  SCLogDebug("Stopping Suricata!");
533  SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
534  while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
535  rte_delay_us(10);
536  }
537  if (ptv->queue_id == 0) {
538  rte_delay_us(20); // wait for all threads to get out of the sync loop
539  SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
540  // If Suricata runs in peered mode, the peer threads might still want to send
541  // packets to our port. Instead, we know, that we are done with the peered port, so
542  // we stop it. The peered threads will stop our port.
543  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
544  rte_eth_dev_stop(ptv->out_port_id);
545  } else {
546  // in IDS we stop our port - no peer threads are running
547  rte_eth_dev_stop(ptv->port_id);
548  }
549  }
550  DPDKDumpCounters(ptv);
551 }
552 
553 static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
554 {
555  static thread_local time_t last_dump = 0;
556  time_t current_time = DPDKGetSeconds();
557  /* Trigger one dump of stats every second */
558  if (current_time != last_dump) {
559  DPDKDumpCounters(ptv);
560  last_dump = current_time;
561  }
562 }
563 
564 /**
565  * \brief Main DPDK reading Loop function
566  */
567 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
568 {
569  SCEnter();
570  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
571  ptv->slot = ((TmSlot *)slot)->slot_next;
572  TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
573  if (ret != TM_ECODE_OK) {
574  SCReturnInt(ret);
575  }
576  while (true) {
577  if (unlikely(suricata_ctl_flags != 0)) {
578  HandleShutdown(ptv);
579  break;
580  }
581 
582  uint16_t nb_rx =
583  rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
584  if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
585  continue;
586  }
587 
588  ptv->pkts += (uint64_t)nb_rx;
589  for (uint16_t i = 0; i < nb_rx; i++) {
590  Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
591  if (p == NULL) {
592  rte_pktmbuf_free(ptv->received_mbufs[i]);
593  continue;
594  }
595  DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
596  PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
597  rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
598  if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
599  TmqhOutputPacketpool(ptv->tv, p);
600  DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
601  SCReturnInt(EXIT_FAILURE);
602  }
603  }
604 
605  PeriodicDPDKDumpCounters(ptv);
607  }
608 
610 }
611 
612 /**
613  * \brief Init function for ReceiveDPDK.
614  *
615  * \param tv pointer to ThreadVars
616  * \param initdata pointer to the interface passed from the user
617  * \param data pointer gets populated with DPDKThreadVars
618  *
619  */
620 static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
621 {
622  SCEnter();
623  int retval, thread_numa;
624  DPDKThreadVars *ptv = NULL;
625  DPDKIfaceConfig *dpdk_config = (DPDKIfaceConfig *)initdata;
626 
627  if (initdata == NULL) {
628  SCLogError("DPDK configuration is NULL in thread initialization");
629  goto fail;
630  }
631 
632  ptv = SCCalloc(1, sizeof(DPDKThreadVars));
633  if (unlikely(ptv == NULL)) {
634  SCLogError("Unable to allocate memory");
635  goto fail;
636  }
637 
638  ptv->tv = tv;
639  ptv->pkts = 0;
640  ptv->bytes = 0;
641  ptv->livedev = LiveGetDevice(dpdk_config->iface);
642 
643  ptv->capture_dpdk_packets = StatsRegisterCounter("capture.packets", ptv->tv);
644  ptv->capture_dpdk_rx_errs = StatsRegisterCounter("capture.rx_errors", ptv->tv);
645  ptv->capture_dpdk_tx_errs = StatsRegisterCounter("capture.tx_errors", ptv->tv);
646  ptv->capture_dpdk_imissed = StatsRegisterCounter("capture.dpdk.imissed", ptv->tv);
647  ptv->capture_dpdk_rx_no_mbufs = StatsRegisterCounter("capture.dpdk.no_mbufs", ptv->tv);
648  ptv->capture_dpdk_ierrors = StatsRegisterCounter("capture.dpdk.ierrors", ptv->tv);
649 
650  ptv->copy_mode = dpdk_config->copy_mode;
651  ptv->checksum_mode = dpdk_config->checksum_mode;
652 
653  ptv->threads = dpdk_config->threads;
654  ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) ? true : false;
655  ptv->port_id = dpdk_config->port_id;
656  ptv->out_port_id = dpdk_config->out_port_id;
657  ptv->port_socket_id = dpdk_config->socket_id;
658  // pass the pointer to the mempool and then forget about it. Mempool is freed in thread deinit.
659  ptv->pkt_mempool = dpdk_config->pkt_mempool;
660  dpdk_config->pkt_mempool = NULL;
661 
662  thread_numa = GetNumaNode();
663  if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
664  thread_numa != ptv->port_socket_id) {
665  SC_ATOMIC_ADD(dpdk_config->inconsistent_numa_cnt, 1);
666  SCLogPerf("%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
667  ptv->port_socket_id, thread_numa);
668  }
669 
670  ptv->workers_sync = dpdk_config->workers_sync;
671  uint16_t queue_id = SC_ATOMIC_ADD(dpdk_config->queue_id, 1);
672  ptv->queue_id = queue_id;
673 
674  // the last thread starts the device
675  if (queue_id == dpdk_config->threads - 1) {
676  retval = rte_eth_dev_start(ptv->port_id);
677  if (retval < 0) {
678  SCLogError("%s: error (%s) during device startup", dpdk_config->iface,
679  rte_strerror(-retval));
680  goto fail;
681  }
682 
683  struct rte_eth_dev_info dev_info;
684  retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
685  if (retval != 0) {
686  SCLogError("%s: error (%s) when getting device info", dpdk_config->iface,
687  rte_strerror(-retval));
688  goto fail;
689  }
690 
691  // some PMDs requires additional actions only after the device has started
692  DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
693 
694  uint16_t inconsistent_numa_cnt = SC_ATOMIC_GET(dpdk_config->inconsistent_numa_cnt);
695  if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
696  SCLogWarning("%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
697  dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
698  } else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
699  SCLogNotice(
700  "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
701  dpdk_config->iface);
702  }
703  if (ptv->intr_enabled) {
704  rte_spinlock_init(&intr_lock[ptv->port_id]);
705  }
706  }
707 
708  *data = (void *)ptv;
709  dpdk_config->DerefFunc(dpdk_config);
711 
712 fail:
713  if (dpdk_config != NULL)
714  dpdk_config->DerefFunc(dpdk_config);
715  if (ptv != NULL)
716  SCFree(ptv);
718 }
719 
720 static void PrintDPDKPortXstats(uint32_t port_id, const char *port_name)
721 {
722  struct rte_eth_xstat *xstats;
723  struct rte_eth_xstat_name *xstats_names;
724 
725  int32_t len = rte_eth_xstats_get(port_id, NULL, 0);
726  if (len < 0)
727  FatalError("Error (%s) getting count of rte_eth_xstats failed on port %s",
728  rte_strerror(-len), port_name);
729 
730  xstats = SCCalloc(len, sizeof(*xstats));
731  if (xstats == NULL)
732  FatalError("Failed to allocate memory for the rte_eth_xstat structure");
733 
734  int32_t ret = rte_eth_xstats_get(port_id, xstats, len);
735  if (ret < 0 || ret > len) {
736  SCFree(xstats);
737  FatalError("Error (%s) getting rte_eth_xstats failed on port %s", rte_strerror(-ret),
738  port_name);
739  }
740  xstats_names = SCCalloc(len, sizeof(*xstats_names));
741  if (xstats_names == NULL) {
742  SCFree(xstats);
743  FatalError("Failed to allocate memory for the rte_eth_xstat_name array");
744  }
745  ret = rte_eth_xstats_get_names(port_id, xstats_names, len);
746  if (ret < 0 || ret > len) {
747  SCFree(xstats);
748  SCFree(xstats_names);
749  FatalError("Error (%s) getting names of rte_eth_xstats failed on port %s",
750  rte_strerror(-ret), port_name);
751  }
752  for (int32_t i = 0; i < len; i++) {
753  if (xstats[i].value > 0)
754  SCLogPerf("Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].name,
755  xstats[i].value);
756  }
757 
758  SCFree(xstats);
759  SCFree(xstats_names);
760 }
761 
762 /**
763  * \brief This function prints stats to the screen at exit.
764  * \param tv pointer to ThreadVars
765  * \param data pointer that gets cast into DPDKThreadVars for ptv
766  */
767 static void ReceiveDPDKThreadExitStats(ThreadVars *tv, void *data)
768 {
769  SCEnter();
770  int retval;
771  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
772 
773  if (ptv->queue_id == 0) {
774  struct rte_eth_stats eth_stats;
775  PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
776  retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
777  if (unlikely(retval != 0)) {
778  SCLogError("%s: failed to get stats (%s)", ptv->livedev->dev, strerror(-retval));
779  SCReturn;
780  }
781  SCLogPerf("%s: total RX stats: packets %" PRIu64 " bytes: %" PRIu64 " missed: %" PRIu64
782  " errors: %" PRIu64 " nombufs: %" PRIu64,
783  ptv->livedev->dev, eth_stats.ipackets, eth_stats.ibytes, eth_stats.imissed,
784  eth_stats.ierrors, eth_stats.rx_nombuf);
785  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS)
786  SCLogPerf("%s: total TX stats: packets %" PRIu64 " bytes: %" PRIu64 " errors: %" PRIu64,
787  ptv->livedev->dev, eth_stats.opackets, eth_stats.obytes, eth_stats.oerrors);
788  }
789 
790  DPDKDumpCounters(ptv);
791  SCLogPerf("(%s) received packets %" PRIu64, tv->name, ptv->pkts);
792 }
793 
794 /**
795  * \brief DeInit function closes dpdk at exit.
796  * \param tv pointer to ThreadVars
797  * \param data pointer that gets cast into DPDKThreadVars for ptv
798  */
799 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *tv, void *data)
800 {
801  SCEnter();
802  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
803 
804  if (ptv->queue_id == 0) {
805  struct rte_eth_dev_info dev_info;
806  int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
807  if (retval != 0) {
808  SCLogError("%s: error (%s) when getting device info", ptv->livedev->dev,
809  rte_strerror(-retval));
811  }
812 
813  DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
814 
815  if (ptv->workers_sync) {
816  SCFree(ptv->workers_sync);
817  }
818  }
819 
820  ptv->pkt_mempool = NULL; // MP is released when device is closed
821 
822  SCFree(ptv);
824 }
825 
826 /**
827  * \brief This function passes off to link type decoders.
828  *
829  * DecodeDPDK decodes packets from DPDK and passes
830  * them off to the proper link type decoder.
831  *
832  * \param t pointer to ThreadVars
833  * \param p pointer to the current packet
834  * \param data pointer that gets cast into DPDKThreadVars for ptv
835  */
836 static TmEcode DecodeDPDK(ThreadVars *tv, Packet *p, void *data)
837 {
838  SCEnter();
840 
842 
843  /* update counters */
845 
846  /* If suri has set vlan during reading, we increase vlan counter */
847  if (p->vlan_idx) {
849  }
850 
851  /* call the decoder */
852  DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
853 
855 
857 }
858 
859 static TmEcode DecodeDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
860 {
861  SCEnter();
862  DecodeThreadVars *dtv = NULL;
863 
865 
866  if (dtv == NULL)
868 
870 
871  *data = (void *)dtv;
872 
874 }
875 
876 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data)
877 {
878  SCEnter();
879  if (data != NULL)
880  DecodeThreadVarsFree(tv, data);
882 }
883 
884 #endif /* HAVE_DPDK */
885 /* eof */
886 /**
887  * @}
888  */
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:74
PacketCheckAction
bool PacketCheckAction(const Packet *p, const uint8_t a)
Definition: packet.c:48
tm-threads.h
len
uint8_t len
Definition: app-layer-dnp3.h:2
TMM_RECEIVEDPDK
@ TMM_RECEIVEDPDK
Definition: tm-threads-common.h:58
ICMPV6Hdr_::type
uint8_t type
Definition: decode-icmpv6.h:145
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:167
CHECKSUM_VALIDATION_OFFLOAD
@ CHECKSUM_VALIDATION_OFFLOAD
Definition: decode.h:51
offset
uint64_t offset
Definition: util-streaming-buffer.h:0
DPDKSetTimevalOfMachineStart
void DPDKSetTimevalOfMachineStart(void)
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
PacketFreeOrRelease
void PacketFreeOrRelease(Packet *p)
Return a packet to where it was allocated.
Definition: decode.c:247
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
PKT_IS_PSEUDOPKT
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition: decode.h:1075
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
DPDK_COPY_MODE_IPS
@ DPDK_COPY_MODE_IPS
Definition: source-dpdk.h:33
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:99
TMM_DECODEDPDK
@ TMM_DECODEDPDK
Definition: tm-threads-common.h:59
action-globals.h
Packet_::flags
uint32_t flags
Definition: decode.h:474
DpdkCopyModeEnum
DpdkCopyModeEnum
Definition: source-dpdk.h:33
threads.h
Packet_::vlan_idx
uint8_t vlan_idx
Definition: decode.h:465
LiveDevice_
Definition: util-device.h:50
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
StatsSetUI64
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:210
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
NoDPDKSupportExit
TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **)
this function prints an error message and exits.
Definition: source-dpdk.c:78
util-privs.h
CHECKSUM_VALIDATION_DISABLE
@ CHECKSUM_VALIDATION_DISABLE
Definition: decode.h:46
PacketDecodeFinalize
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition: decode.c:203
DPDKIfaceConfig_
Definition: source-dpdk.h:52
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:317
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:85
tmqh-packetpool.h
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:55
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:84
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:50
Packet_::datalink
int datalink
Definition: decode.h:620
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1078
DPDKWorkerSync_
Definition: source-dpdk.h:47
DecodeRegisterPerfCounters
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:599
Packet_::level3_comp_csum
int32_t level3_comp_csum
Definition: decode.h:541
TmModuleReceiveDPDKRegister
void TmModuleReceiveDPDKRegister(void)
Definition: source-dpdk.c:50
decode.h
Packet_::level4_comp_csum
int32_t level4_comp_csum
Definition: decode.h:543
PKT_SRC_WIRE
@ PKT_SRC_WIRE
Definition: decode.h:55
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:58
Packet_::ts
SCTime_t ts
Definition: decode.h:485
LiveGetDevice
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
Definition: util-device.c:248
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
GET_PKT_DATA
#define GET_PKT_DATA(p)
Definition: decode.h:221
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
util-affinity.h
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:79
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:53
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
PKT_IS_ICMPV6
#define PKT_IS_ICMPV6(p)
Definition: decode.h:251
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:300
PacketPoolWait
void PacketPoolWait(void)
Definition: tmqh-packetpool.c:80
SCReturn
#define SCReturn
Definition: util-debug.h:273
Packet_::icmpv6h
ICMPV6Hdr * icmpv6h
Definition: decode.h:577
Packet_
Definition: decode.h:437
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:33
GET_PKT_LEN
#define GET_PKT_LEN(p)
Definition: decode.h:220
TmSlot_
Definition: tm-threads.h:53
PKT_IGNORE_CHECKSUM
#define PKT_IGNORE_CHECKSUM
Definition: decode.h:1038
SCTime_t
Definition: util-time.h:40
Packet_::livedev
struct LiveDevice_ * livedev
Definition: decode.h:599
DPDK_BURST_TX_WAIT_US
#define DPDK_BURST_TX_WAIT_US
Definition: source-dpdk.h:35
TmEcode
TmEcode
Definition: tm-threads-common.h:83
TmModule_::name
const char * name
Definition: tm-modules.h:45
DecodeThreadVars_::counter_vlan
uint16_t counter_vlan
Definition: decode.h:721
runmodes.h
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
dtv
DecodeThreadVars * dtv
Definition: fuzz_decodepcapfile.c:33
DPDK_IRQ_MODE
#define DPDK_IRQ_MODE
Definition: source-dpdk.h:41
Packet_::ReleasePacket
void(* ReleasePacket)(struct Packet_ *)
Definition: decode.h:529
util-dpdk.h
flags
uint8_t flags
Definition: decode-gre.h:0
DecodeThreadVarsFree
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:787
source-dpdk.h
ChecksumValidationMode
ChecksumValidationMode
Definition: decode.h:45
suricata-common.h
packet.h
ACTION_DROP
#define ACTION_DROP
Definition: action-globals.h:30
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:48
FatalError
#define FatalError(...)
Definition: util-debug.h:502
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:49
threadvars.h
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
SCFree
#define SCFree(p)
Definition: util-mem.h:61
DecodeThreadVars_
Structure to hold thread specific data for all decode modules.
Definition: decode.h:685
util-dpdk-bonding.h
DecodeThreadVarsAlloc
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:769
PacketSetData
int PacketSetData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zero copy is used.
Definition: decode.c:807
util-dpdk-i40e.h
suricata.h
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition: counters.c:461
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
TmModuleDecodeDPDKRegister
void TmModuleDecodeDPDKRegister(void)
Registration Function for DecodeDPDK.
Definition: source-dpdk.c:64
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:971
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:275
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:264
SC_CAP_NET_RAW
#define SC_CAP_NET_RAW
Definition: util-privs.h:32
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:77
DPDK_COPY_MODE_TAP
@ DPDK_COPY_MODE_TAP
Definition: source-dpdk.h:33
DecodeUpdatePacketCounters
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition: decode.c:735
LINKTYPE_ETHERNET
#define LINKTYPE_ETHERNET
Definition: decode.h:989
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:172