suricata
source-dpdk.c
Go to the documentation of this file.
1 /* Copyright (C) 2021 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \defgroup dpdk DPDK running mode
20  *
21  * @{
22  */
23 
24 /**
25  * \file
26  *
27  * \author Lukas Sismis <lukas.sismis@gmail.com>
28  *
29  * DPDK capture interface
30  *
31  */
32 
33 #include "suricata-common.h"
34 #include "runmodes.h"
35 #include "decode.h"
36 #include "packet.h"
37 #include "source-dpdk.h"
38 #include "suricata.h"
39 #include "threads.h"
40 #include "threadvars.h"
41 #include "tm-threads.h"
42 #include "tmqh-packetpool.h"
43 #include "util-privs.h"
44 #include "action-globals.h"
45 
46 #ifndef HAVE_DPDK
47 
48 TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **);
49 
51 {
52  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
59 }
60 
61 /**
62  * \brief Registration Function for DecodeDPDK.
63  */
65 {
66  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
73 }
74 
75 /**
76  * \brief this function prints an error message and exits.
77  */
78 TmEcode NoDPDKSupportExit(ThreadVars *tv, const void *initdata, void **data)
79 {
80  FatalError("Error creating thread %s: you do not have "
81  "support for DPDK enabled, on Linux host please recompile "
82  "with --enable-dpdk",
83  tv->name);
84 }
85 
86 #else /* We have DPDK support */
87 
88 #include "util-affinity.h"
89 #include "util-dpdk.h"
90 #include "util-dpdk-i40e.h"
91 #include "util-dpdk-bonding.h"
92 #include <numa.h>
93 
94 #define BURST_SIZE 32
95 // interrupt mode constants
96 #define MIN_ZERO_POLL_COUNT 10U
97 #define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
98 #define MINIMUM_SLEEP_TIME_US 1U
99 #define STANDARD_SLEEP_TIME_US 100U
100 #define MAX_EPOLL_TIMEOUT_MS 500U
101 static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
102 
103 /**
104  * \brief Structure to hold thread specific variables.
105  */
106 typedef struct DPDKThreadVars_ {
107  /* counters */
108  uint64_t pkts;
109  ThreadVars *tv;
110  TmSlot *slot;
111  LiveDevice *livedev;
112  ChecksumValidationMode checksum_mode;
113  bool intr_enabled;
114  /* references to packet and drop counters */
115  uint16_t capture_dpdk_packets;
116  uint16_t capture_dpdk_rx_errs;
117  uint16_t capture_dpdk_imissed;
118  uint16_t capture_dpdk_rx_no_mbufs;
119  uint16_t capture_dpdk_ierrors;
120  uint16_t capture_dpdk_tx_errs;
121  unsigned int flags;
122  int threads;
123  /* for IPS */
124  DpdkCopyModeEnum copy_mode;
125  uint16_t out_port_id;
126  /* Entry in the peers_list */
127 
128  uint64_t bytes;
129  uint64_t accepted;
130  uint64_t dropped;
131  uint16_t port_id;
132  uint16_t queue_id;
133  int32_t port_socket_id;
134  struct rte_mempool *pkt_mempool;
135  struct rte_mbuf *received_mbufs[BURST_SIZE];
136  DPDKWorkerSync *workers_sync;
137 } DPDKThreadVars;
138 
139 static TmEcode ReceiveDPDKThreadInit(ThreadVars *, const void *, void **);
140 static void ReceiveDPDKThreadExitStats(ThreadVars *, void *);
141 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *, void *);
142 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot);
143 
144 static TmEcode DecodeDPDKThreadInit(ThreadVars *, const void *, void **);
145 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data);
146 static TmEcode DecodeDPDK(ThreadVars *, Packet *, void *);
147 
148 static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset);
149 static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
150 {
151  uint32_t event_data = port_id << UINT16_WIDTH | queue_id;
152  int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
153  RTE_INTR_EVENT_ADD, (void *)((uintptr_t)event_data));
154 
155  if (ret != 0) {
156  SCLogError("%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
157  queue_id, rte_strerror(-ret));
158  return false;
159  }
160  return true;
161 }
162 
163 static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
164 {
165  if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
166  return MINIMUM_SLEEP_TIME_US;
167 
168  return STANDARD_SLEEP_TIME_US;
169 }
170 
171 static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool on)
172 {
173  rte_spinlock_lock(&(intr_lock[port_id]));
174 
175  if (on)
176  rte_eth_dev_rx_intr_enable(port_id, queue_id);
177  else
178  rte_eth_dev_rx_intr_disable(port_id, queue_id);
179 
180  rte_spinlock_unlock(&(intr_lock[port_id]));
181 }
182 
183 static inline void DPDKFreeMbufArray(
184  struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
185 {
186  for (int i = offset; i < mbuf_cnt; i++) {
187  rte_pktmbuf_free(mbuf_array[i]);
188  }
189 }
190 
191 static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
192 {
193  if (strcmp(driver_name, "net_bonding") == 0) {
194  driver_name = BondingDeviceDriverGet(ptv->port_id);
195  }
196 
197  // The PMD Driver i40e has a special way to set the RSS, it can be set via rte_flow rules
198  // and only after the start of the port
199  if (strcmp(driver_name, "net_i40e") == 0)
200  i40eDeviceSetRSS(ptv->port_id, ptv->threads);
201 }
202 
203 static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
204 {
205  if (strcmp(driver_name, "net_bonding") == 0) {
206  driver_name = BondingDeviceDriverGet(ptv->port_id);
207  }
208 
209  if (strcmp(driver_name, "net_i40e") == 0) {
210 #if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
211  // Flush the RSS rules that have been inserted in the post start section
212  struct rte_flow_error flush_error = { 0 };
213  int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
214  if (retval != 0) {
215  SCLogError("%s: unable to flush rte_flow rules: %s Flush error msg: %s",
216  ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
217  }
218 #endif /* RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0) */
219  }
220 }
221 
222 /**
223  * Attempts to retrieve NUMA node id on which the caller runs
224  * @return NUMA id on success, -1 otherwise
225  */
226 static int GetNumaNode(void)
227 {
228  int cpu = 0;
229  int node = -1;
230 
231 #if defined(__linux__)
232  cpu = sched_getcpu();
233  node = numa_node_of_cpu(cpu);
234 #else
235  SCLogWarning("NUMA node retrieval is not supported on this OS.");
236 #endif
237 
238  return node;
239 }
240 
241 /**
242  * \brief Registration Function for ReceiveDPDK.
243  * \todo Unit tests are needed for this module.
244  */
246 {
247  tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
248  tmm_modules[TMM_RECEIVEDPDK].ThreadInit = ReceiveDPDKThreadInit;
250  tmm_modules[TMM_RECEIVEDPDK].PktAcqLoop = ReceiveDPDKLoop;
252  tmm_modules[TMM_RECEIVEDPDK].ThreadExitPrintStats = ReceiveDPDKThreadExitStats;
253  tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = ReceiveDPDKThreadDeinit;
256 }
257 
258 /**
259  * \brief Registration Function for DecodeDPDK.
260  * \todo Unit tests are needed for this module.
261  */
263 {
264  tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
265  tmm_modules[TMM_DECODEDPDK].ThreadInit = DecodeDPDKThreadInit;
266  tmm_modules[TMM_DECODEDPDK].Func = DecodeDPDK;
268  tmm_modules[TMM_DECODEDPDK].ThreadDeinit = DecodeDPDKThreadDeinit;
271 }
272 
273 static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
274 {
275  /* Some NICs (e.g. Intel) do not support queue statistics and the drops can be fetched only on
276  * the port level. Therefore setting it to the first worker to have at least continuous update
277  * on the dropped packets. */
278  if (ptv->queue_id == 0) {
279  struct rte_eth_stats eth_stats;
280  int retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
281  if (unlikely(retval != 0)) {
282  SCLogError("%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
283  return;
284  }
285 
286  StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets,
287  ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
288  SC_ATOMIC_SET(ptv->livedev->pkts,
289  eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
290  StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_errs,
291  eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
292  StatsSetUI64(ptv->tv, ptv->capture_dpdk_imissed, eth_stats.imissed);
293  StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
294  StatsSetUI64(ptv->tv, ptv->capture_dpdk_ierrors, eth_stats.ierrors);
295  StatsSetUI64(ptv->tv, ptv->capture_dpdk_tx_errs, eth_stats.oerrors);
297  ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
298  } else {
299  StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets, ptv->pkts);
300  }
301 }
302 
303 static void DPDKReleasePacket(Packet *p)
304 {
305  int retval;
306  /* Need to be in copy mode and need to detect early release
307  where Ethernet header could not be set (and pseudo packet)
308  When enabling promiscuous mode on Intel cards, 2 ICMPv6 packets are generated.
309  These get into the infinite cycle between the NIC and the switch in some cases */
310  if ((p->dpdk_v.copy_mode == DPDK_COPY_MODE_TAP ||
311  (p->dpdk_v.copy_mode == DPDK_COPY_MODE_IPS && !PacketCheckAction(p, ACTION_DROP)))
312 #if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
313  && !(PacketIsICMPv6(p) && PacketGetICMPv6(p)->type == 143)
314 #endif
315  ) {
317  retval =
318  rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
319  // rte_eth_tx_burst can return only 0 (failure) or 1 (success) because we are only
320  // transmitting burst of size 1 and the function rte_eth_tx_burst returns number of
321  // successfully sent packets.
322  if (unlikely(retval < 1)) {
323  // sometimes a repeated transmit can help to send out the packet
324  rte_delay_us(DPDK_BURST_TX_WAIT_US);
325  retval = rte_eth_tx_burst(
326  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
327  if (unlikely(retval < 1)) {
328  SCLogDebug("Unable to transmit the packet on port %u queue %u",
329  p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
330  rte_pktmbuf_free(p->dpdk_v.mbuf);
331  p->dpdk_v.mbuf = NULL;
332  }
333  }
334  } else {
335  rte_pktmbuf_free(p->dpdk_v.mbuf);
336  p->dpdk_v.mbuf = NULL;
337  }
338 
340 }
341 
342 static TmEcode ReceiveDPDKLoopInit(ThreadVars *tv, DPDKThreadVars *ptv)
343 {
344  SCEnter();
345  // Indicate that the thread is actually running its application level
346  // code (i.e., it can poll packets)
348  PacketPoolWait();
349 
350  rte_eth_stats_reset(ptv->port_id);
351  rte_eth_xstats_reset(ptv->port_id);
352 
353  if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
355 
357 }
358 
359 static inline void LoopHandleTimeoutOnIdle(ThreadVars *tv)
360 {
361  static thread_local uint64_t last_timeout_msec = 0;
362  SCTime_t t = TimeGet();
363  uint64_t msecs = SCTIME_MSECS(t);
364  if (msecs > last_timeout_msec + 100) {
365  TmThreadsCaptureHandleTimeout(tv, NULL);
366  last_timeout_msec = msecs;
367  }
368 }
369 
370 /**
371  * \brief Decides if it should retry the packet poll or continue with the packet processing
372  * \return true if the poll should be retried, false otherwise
373  */
374 static inline bool RXPacketCountHeuristic(ThreadVars *tv, DPDKThreadVars *ptv, uint16_t nb_rx)
375 {
376  static thread_local uint32_t zero_pkt_polls_cnt = 0;
377 
378  if (nb_rx > 0) {
379  zero_pkt_polls_cnt = 0;
380  return false;
381  }
382 
383  LoopHandleTimeoutOnIdle(tv);
384  if (!ptv->intr_enabled)
385  return true;
386 
387  zero_pkt_polls_cnt++;
388  if (zero_pkt_polls_cnt <= MIN_ZERO_POLL_COUNT)
389  return true;
390 
391  uint32_t pwd_idle_hint = InterruptsSleepHeuristic(zero_pkt_polls_cnt);
392  if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
393  rte_delay_us(pwd_idle_hint);
394  } else {
395  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
396  struct rte_epoll_event event;
397  rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
398  InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
399  return true;
400  }
401 
402  return false;
403 }
404 
405 /**
406  * \brief Initializes a packet from an mbuf
407  * \return true if the packet was initialized successfully, false otherwise
408  */
409 static inline Packet *PacketInitFromMbuf(DPDKThreadVars *ptv, struct rte_mbuf *mbuf)
410 {
412  if (unlikely(p == NULL)) {
413  return NULL;
414  }
417  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
419  }
420 
421  p->ts = TimeGet();
422  p->dpdk_v.mbuf = mbuf;
423  p->ReleasePacket = DPDKReleasePacket;
424  p->dpdk_v.copy_mode = ptv->copy_mode;
425  p->dpdk_v.out_port_id = ptv->out_port_id;
426  p->dpdk_v.out_queue_id = ptv->queue_id;
427  p->livedev = ptv->livedev;
428 
429  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
431  } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
432  uint64_t ol_flags = p->dpdk_v.mbuf->ol_flags;
433  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
434  (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
435  SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
437  } else {
438  if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
439  SCLogDebug("HW detected BAD IP checksum");
440  // chsum recalc will not be triggered but rule keyword check will be
441  p->l3.csum_set = true;
442  p->l3.csum = 0;
443  }
444  if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
445  SCLogDebug("HW detected BAD L4 chsum");
446  p->l4.csum_set = true;
447  p->l4.csum = 0;
448  }
449  }
450  }
451 
452  return p;
453 }
454 
455 static inline void DPDKSegmentedMbufWarning(struct rte_mbuf *mbuf)
456 {
457  static thread_local bool segmented_mbufs_warned = false;
458  if (!segmented_mbufs_warned && !rte_pktmbuf_is_contiguous(mbuf)) {
459  char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
460  "Check your configuration or report the issue";
461  enum rte_proc_type_t eal_t = rte_eal_process_type();
462  if (eal_t == RTE_PROC_SECONDARY) {
463  SCLogWarning("%s. To avoid segmented mbufs, "
464  "try to increase mbuf size in your primary application",
465  warn_s);
466  } else if (eal_t == RTE_PROC_PRIMARY) {
467  SCLogWarning("%s. To avoid segmented mbufs, "
468  "try to increase MTU in your suricata.yaml",
469  warn_s);
470  }
471 
472  segmented_mbufs_warned = true;
473  }
474 }
475 
476 static void HandleShutdown(DPDKThreadVars *ptv)
477 {
478  SCLogDebug("Stopping Suricata!");
479  SC_ATOMIC_ADD(ptv->workers_sync->worker_checked_in, 1);
480  while (SC_ATOMIC_GET(ptv->workers_sync->worker_checked_in) < ptv->workers_sync->worker_cnt) {
481  rte_delay_us(10);
482  }
483  if (ptv->queue_id == 0) {
484  rte_delay_us(20); // wait for all threads to get out of the sync loop
485  SC_ATOMIC_SET(ptv->workers_sync->worker_checked_in, 0);
486  // If Suricata runs in peered mode, the peer threads might still want to send
487  // packets to our port. Instead, we know, that we are done with the peered port, so
488  // we stop it. The peered threads will stop our port.
489  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
490  rte_eth_dev_stop(ptv->out_port_id);
491  } else {
492  // in IDS we stop our port - no peer threads are running
493  rte_eth_dev_stop(ptv->port_id);
494  }
495  }
496  DPDKDumpCounters(ptv);
497 }
498 
499 static void PeriodicDPDKDumpCounters(DPDKThreadVars *ptv)
500 {
501  static thread_local SCTime_t last_dump = { 0 };
502  SCTime_t current_time = TimeGet();
503  /* Trigger one dump of stats every second */
504  if (current_time.secs != last_dump.secs) {
505  DPDKDumpCounters(ptv);
506  last_dump = current_time;
507  }
508 }
509 
510 /**
511  * \brief Main DPDK reading Loop function
512  */
513 static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
514 {
515  SCEnter();
516  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
517  ptv->slot = ((TmSlot *)slot)->slot_next;
518  TmEcode ret = ReceiveDPDKLoopInit(tv, ptv);
519  if (ret != TM_ECODE_OK) {
520  SCReturnInt(ret);
521  }
522  while (true) {
523  if (unlikely(suricata_ctl_flags != 0)) {
524  HandleShutdown(ptv);
525  break;
526  }
527 
528  uint16_t nb_rx =
529  rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
530  if (RXPacketCountHeuristic(tv, ptv, nb_rx)) {
531  continue;
532  }
533 
534  ptv->pkts += (uint64_t)nb_rx;
535  for (uint16_t i = 0; i < nb_rx; i++) {
536  Packet *p = PacketInitFromMbuf(ptv, ptv->received_mbufs[i]);
537  if (p == NULL) {
538  rte_pktmbuf_free(ptv->received_mbufs[i]);
539  continue;
540  }
541  DPDKSegmentedMbufWarning(ptv->received_mbufs[i]);
542  PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
543  rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
544  if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
545  TmqhOutputPacketpool(ptv->tv, p);
546  DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
547  SCReturnInt(EXIT_FAILURE);
548  }
549  }
550 
551  PeriodicDPDKDumpCounters(ptv);
553  }
554 
556 }
557 
558 /**
559  * \brief Init function for ReceiveDPDK.
560  *
561  * \param tv pointer to ThreadVars
562  * \param initdata pointer to the interface passed from the user
563  * \param data pointer gets populated with DPDKThreadVars
564  *
565  */
566 static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
567 {
568  SCEnter();
569  int retval, thread_numa;
570  DPDKThreadVars *ptv = NULL;
571  DPDKIfaceConfig *dpdk_config = (DPDKIfaceConfig *)initdata;
572 
573  if (initdata == NULL) {
574  SCLogError("DPDK configuration is NULL in thread initialization");
575  goto fail;
576  }
577 
578  ptv = SCCalloc(1, sizeof(DPDKThreadVars));
579  if (unlikely(ptv == NULL)) {
580  SCLogError("Unable to allocate memory");
581  goto fail;
582  }
583 
584  ptv->tv = tv;
585  ptv->pkts = 0;
586  ptv->bytes = 0;
587  ptv->livedev = LiveGetDevice(dpdk_config->iface);
588 
589  ptv->capture_dpdk_packets = StatsRegisterCounter("capture.packets", ptv->tv);
590  ptv->capture_dpdk_rx_errs = StatsRegisterCounter("capture.rx_errors", ptv->tv);
591  ptv->capture_dpdk_tx_errs = StatsRegisterCounter("capture.tx_errors", ptv->tv);
592  ptv->capture_dpdk_imissed = StatsRegisterCounter("capture.dpdk.imissed", ptv->tv);
593  ptv->capture_dpdk_rx_no_mbufs = StatsRegisterCounter("capture.dpdk.no_mbufs", ptv->tv);
594  ptv->capture_dpdk_ierrors = StatsRegisterCounter("capture.dpdk.ierrors", ptv->tv);
595 
596  ptv->copy_mode = dpdk_config->copy_mode;
597  ptv->checksum_mode = dpdk_config->checksum_mode;
598 
599  ptv->threads = dpdk_config->threads;
600  ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) ? true : false;
601  ptv->port_id = dpdk_config->port_id;
602  ptv->out_port_id = dpdk_config->out_port_id;
603  ptv->port_socket_id = dpdk_config->socket_id;
604  // pass the pointer to the mempool and then forget about it. Mempool is freed in thread deinit.
605  ptv->pkt_mempool = dpdk_config->pkt_mempool;
606  dpdk_config->pkt_mempool = NULL;
607 
608  thread_numa = GetNumaNode();
609  if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
610  thread_numa != ptv->port_socket_id) {
611  SC_ATOMIC_ADD(dpdk_config->inconsistent_numa_cnt, 1);
612  SCLogPerf("%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
613  ptv->port_socket_id, thread_numa);
614  }
615 
616  ptv->workers_sync = dpdk_config->workers_sync;
617  uint16_t queue_id = SC_ATOMIC_ADD(dpdk_config->queue_id, 1);
618  ptv->queue_id = queue_id;
619 
620  // the last thread starts the device
621  if (queue_id == dpdk_config->threads - 1) {
622  retval = rte_eth_dev_start(ptv->port_id);
623  if (retval < 0) {
624  SCLogError("%s: error (%s) during device startup", dpdk_config->iface,
625  rte_strerror(-retval));
626  goto fail;
627  }
628 
629  struct rte_eth_dev_info dev_info;
630  retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
631  if (retval != 0) {
632  SCLogError("%s: error (%s) when getting device info", dpdk_config->iface,
633  rte_strerror(-retval));
634  goto fail;
635  }
636 
637  // some PMDs requires additional actions only after the device has started
638  DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
639 
640  uint16_t inconsistent_numa_cnt = SC_ATOMIC_GET(dpdk_config->inconsistent_numa_cnt);
641  if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
642  SCLogWarning("%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
643  dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
644  } else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
645  SCLogNotice(
646  "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
647  dpdk_config->iface);
648  }
649  if (ptv->intr_enabled) {
650  rte_spinlock_init(&intr_lock[ptv->port_id]);
651  }
652  }
653 
654  *data = (void *)ptv;
655  dpdk_config->DerefFunc(dpdk_config);
657 
658 fail:
659  if (dpdk_config != NULL)
660  dpdk_config->DerefFunc(dpdk_config);
661  if (ptv != NULL)
662  SCFree(ptv);
664 }
665 
666 static void PrintDPDKPortXstats(uint32_t port_id, const char *port_name)
667 {
668  struct rte_eth_xstat *xstats;
669  struct rte_eth_xstat_name *xstats_names;
670 
671  int32_t len = rte_eth_xstats_get(port_id, NULL, 0);
672  if (len < 0)
673  FatalError("Error (%s) getting count of rte_eth_xstats failed on port %s",
674  rte_strerror(-len), port_name);
675 
676  xstats = SCCalloc(len, sizeof(*xstats));
677  if (xstats == NULL)
678  FatalError("Failed to allocate memory for the rte_eth_xstat structure");
679 
680  int32_t ret = rte_eth_xstats_get(port_id, xstats, len);
681  if (ret < 0 || ret > len) {
682  SCFree(xstats);
683  FatalError("Error (%s) getting rte_eth_xstats failed on port %s", rte_strerror(-ret),
684  port_name);
685  }
686  xstats_names = SCCalloc(len, sizeof(*xstats_names));
687  if (xstats_names == NULL) {
688  SCFree(xstats);
689  FatalError("Failed to allocate memory for the rte_eth_xstat_name array");
690  }
691  ret = rte_eth_xstats_get_names(port_id, xstats_names, len);
692  if (ret < 0 || ret > len) {
693  SCFree(xstats);
694  SCFree(xstats_names);
695  FatalError("Error (%s) getting names of rte_eth_xstats failed on port %s",
696  rte_strerror(-ret), port_name);
697  }
698  for (int32_t i = 0; i < len; i++) {
699  if (xstats[i].value > 0)
700  SCLogPerf("Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].name,
701  xstats[i].value);
702  }
703 
704  SCFree(xstats);
705  SCFree(xstats_names);
706 }
707 
708 /**
709  * \brief This function prints stats to the screen at exit.
710  * \param tv pointer to ThreadVars
711  * \param data pointer that gets cast into DPDKThreadVars for ptv
712  */
713 static void ReceiveDPDKThreadExitStats(ThreadVars *tv, void *data)
714 {
715  SCEnter();
716  int retval;
717  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
718 
719  if (ptv->queue_id == 0) {
720  struct rte_eth_stats eth_stats;
721  PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
722  retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
723  if (unlikely(retval != 0)) {
724  SCLogError("%s: failed to get stats (%s)", ptv->livedev->dev, strerror(-retval));
725  SCReturn;
726  }
727  SCLogPerf("%s: total RX stats: packets %" PRIu64 " bytes: %" PRIu64 " missed: %" PRIu64
728  " errors: %" PRIu64 " nombufs: %" PRIu64,
729  ptv->livedev->dev, eth_stats.ipackets, eth_stats.ibytes, eth_stats.imissed,
730  eth_stats.ierrors, eth_stats.rx_nombuf);
731  if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS)
732  SCLogPerf("%s: total TX stats: packets %" PRIu64 " bytes: %" PRIu64 " errors: %" PRIu64,
733  ptv->livedev->dev, eth_stats.opackets, eth_stats.obytes, eth_stats.oerrors);
734  }
735 
736  DPDKDumpCounters(ptv);
737  SCLogPerf("(%s) received packets %" PRIu64, tv->name, ptv->pkts);
738 }
739 
740 /**
741  * \brief DeInit function closes dpdk at exit.
742  * \param tv pointer to ThreadVars
743  * \param data pointer that gets cast into DPDKThreadVars for ptv
744  */
745 static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *tv, void *data)
746 {
747  SCEnter();
748  DPDKThreadVars *ptv = (DPDKThreadVars *)data;
749 
750  if (ptv->queue_id == 0) {
751  struct rte_eth_dev_info dev_info;
752  int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
753  if (retval != 0) {
754  SCLogError("%s: error (%s) when getting device info", ptv->livedev->dev,
755  rte_strerror(-retval));
757  }
758 
759  DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
760 
761  if (ptv->workers_sync) {
762  SCFree(ptv->workers_sync);
763  }
764  }
765 
766  ptv->pkt_mempool = NULL; // MP is released when device is closed
767 
768  SCFree(ptv);
770 }
771 
772 /**
773  * \brief This function passes off to link type decoders.
774  *
775  * DecodeDPDK decodes packets from DPDK and passes
776  * them off to the proper link type decoder.
777  *
778  * \param t pointer to ThreadVars
779  * \param p pointer to the current packet
780  * \param data pointer that gets cast into DPDKThreadVars for ptv
781  */
782 static TmEcode DecodeDPDK(ThreadVars *tv, Packet *p, void *data)
783 {
784  SCEnter();
786 
788 
789  /* update counters */
791 
792  /* If suri has set vlan during reading, we increase vlan counter */
793  if (p->vlan_idx) {
795  }
796 
797  /* call the decoder */
798  DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
799 
801 
803 }
804 
805 static TmEcode DecodeDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
806 {
807  SCEnter();
808  DecodeThreadVars *dtv = NULL;
809 
811 
812  if (dtv == NULL)
814 
816 
817  *data = (void *)dtv;
818 
820 }
821 
822 static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data)
823 {
824  SCEnter();
825  if (data != NULL)
826  DecodeThreadVarsFree(tv, data);
828 }
829 
830 #endif /* HAVE_DPDK */
831 /* eof */
832 /**
833  * @}
834  */
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:73
PacketL4::csum_set
bool csum_set
Definition: decode.h:438
PacketCheckAction
bool PacketCheckAction(const Packet *p, const uint8_t a)
Definition: packet.c:49
tm-threads.h
len
uint8_t len
Definition: app-layer-dnp3.h:2
TMM_RECEIVEDPDK
@ TMM_RECEIVEDPDK
Definition: tm-threads-common.h:56
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:166
CHECKSUM_VALIDATION_OFFLOAD
@ CHECKSUM_VALIDATION_OFFLOAD
Definition: decode.h:47
offset
uint64_t offset
Definition: util-streaming-buffer.h:0
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
PacketFreeOrRelease
void PacketFreeOrRelease(Packet *p)
Return a packet to where it was allocated.
Definition: decode.c:250
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
PKT_IS_PSEUDOPKT
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition: decode.h:1317
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
DPDK_COPY_MODE_IPS
@ DPDK_COPY_MODE_IPS
Definition: source-dpdk.h:33
PacketL4::csum
uint16_t csum
Definition: decode.h:439
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:101
TMM_DECODEDPDK
@ TMM_DECODEDPDK
Definition: tm-threads-common.h:57
action-globals.h
Packet_::flags
uint32_t flags
Definition: decode.h:510
DpdkCopyModeEnum
DpdkCopyModeEnum
Definition: source-dpdk.h:33
threads.h
Packet_::vlan_idx
uint8_t vlan_idx
Definition: decode.h:501
LiveDevice_
Definition: util-device.h:50
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
StatsSetUI64
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:207
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
NoDPDKSupportExit
TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **)
this function prints an error message and exits.
Definition: source-dpdk.c:78
util-privs.h
CHECKSUM_VALIDATION_DISABLE
@ CHECKSUM_VALIDATION_DISABLE
Definition: decode.h:42
PacketDecodeFinalize
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition: decode.c:206
DPDKIfaceConfig_
Definition: source-dpdk.h:52
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:314
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:81
tmqh-packetpool.h
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:80
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
Packet_::datalink
int datalink
Definition: decode.h:605
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1320
DPDKWorkerSync_
Definition: source-dpdk.h:47
DecodeRegisterPerfCounters
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:602
TmModuleReceiveDPDKRegister
void TmModuleReceiveDPDKRegister(void)
Definition: source-dpdk.c:50
decode.h
PKT_SRC_WIRE
@ PKT_SRC_WIRE
Definition: decode.h:51
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
Packet_::ts
SCTime_t ts
Definition: decode.h:521
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
LiveGetDevice
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
Definition: util-device.c:248
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
GET_PKT_DATA
#define GET_PKT_DATA(p)
Definition: decode.h:205
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:300
PacketPoolWait
void PacketPoolWait(void)
Definition: tmqh-packetpool.c:80
SCReturn
#define SCReturn
Definition: util-debug.h:273
Packet_
Definition: decode.h:473
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
type
uint16_t type
Definition: decode-vlan.c:107
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:29
GET_PKT_LEN
#define GET_PKT_LEN(p)
Definition: decode.h:204
TimeGet
SCTime_t TimeGet(void)
Definition: util-time.c:152
Packet_::l4
struct PacketL4 l4
Definition: decode.h:567
TmSlot_
Definition: tm-threads.h:53
PKT_IGNORE_CHECKSUM
#define PKT_IGNORE_CHECKSUM
Definition: decode.h:1280
SCTime_t
Definition: util-time.h:40
Packet_::livedev
struct LiveDevice_ * livedev
Definition: decode.h:584
DPDK_BURST_TX_WAIT_US
#define DPDK_BURST_TX_WAIT_US
Definition: source-dpdk.h:35
TmEcode
TmEcode
Definition: tm-threads-common.h:79
TmModule_::name
const char * name
Definition: tm-modules.h:44
DecodeThreadVars_::counter_vlan
uint16_t counter_vlan
Definition: decode.h:965
runmodes.h
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
dtv
DecodeThreadVars * dtv
Definition: fuzz_decodepcapfile.c:33
PacketL3::csum_set
bool csum_set
Definition: decode.h:408
DPDK_IRQ_MODE
#define DPDK_IRQ_MODE
Definition: source-dpdk.h:41
Packet_::ReleasePacket
void(* ReleasePacket)(struct Packet_ *)
Definition: decode.h:557
util-dpdk.h
flags
uint8_t flags
Definition: decode-gre.h:0
DecodeThreadVarsFree
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:791
source-dpdk.h
ChecksumValidationMode
ChecksumValidationMode
Definition: decode.h:41
suricata-common.h
packet.h
ACTION_DROP
#define ACTION_DROP
Definition: action-globals.h:30
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
FatalError
#define FatalError(...)
Definition: util-debug.h:502
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
threadvars.h
Packet_::l3
struct PacketL3 l3
Definition: decode.h:566
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
SCFree
#define SCFree(p)
Definition: util-mem.h:61
DecodeThreadVars_
Structure to hold thread specific data for all decode modules.
Definition: decode.h:929
util-dpdk-bonding.h
DecodeThreadVarsAlloc
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:773
PacketSetData
int PacketSetData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zero copy is used.
Definition: decode.c:811
util-dpdk-i40e.h
suricata.h
PacketL3::csum
uint16_t csum
Definition: decode.h:409
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition: counters.c:449
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
TmModuleDecodeDPDKRegister
void TmModuleDecodeDPDKRegister(void)
Registration Function for DecodeDPDK.
Definition: source-dpdk.c:64
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:951
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:275
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:267
SC_CAP_NET_RAW
#define SC_CAP_NET_RAW
Definition: util-privs.h:32
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:76
DPDK_COPY_MODE_TAP
@ DPDK_COPY_MODE_TAP
Definition: source-dpdk.h:33
DecodeUpdatePacketCounters
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition: decode.c:739
LINKTYPE_ETHERNET
#define LINKTYPE_ETHERNET
Definition: decode.h:1231
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:169