suricata
source-af-xdp.c
Go to the documentation of this file.
1 /* Copyright (C) 2011-2022 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \defgroup afxdppacket AF_XDP running mode
20  *
21  * @{
22  */
23 
24 /**
25  * \file
26  *
27  * \author Richard McConnell <richard_mcconnell@rapid7.com>
28  *
29  * AF_XDP socket acquisition support
30  *
31  */
32 #define PCAP_DONT_INCLUDE_PCAP_BPF_H 1
33 #define SC_PCAP_DONT_INCLUDE_PCAP_H 1
35 #include "suricata.h"
36 #include "decode.h"
37 #include "packet-queue.h"
38 #include "threads.h"
39 #include "threadvars.h"
40 #include "tm-queuehandlers.h"
41 #include "tm-modules.h"
42 #include "tm-threads.h"
43 #include "tm-threads-common.h"
44 #include "conf.h"
45 #include "util-cpu.h"
46 #include "util-datalink.h"
47 #include "util-debug.h"
48 #include "util-device.h"
49 #include "util-ebpf.h"
50 #include "util-error.h"
51 #include "util-privs.h"
52 #include "util-optimize.h"
53 #include "util-checksum.h"
54 #include "util-ioctl.h"
55 #include "util-host-info.h"
56 #include "util-sysfs.h"
57 #include "tmqh-packetpool.h"
58 #include "source-af-xdp.h"
59 #include "runmodes.h"
60 #include "flow-storage.h"
61 #include "util-validate.h"
62 
63 #ifdef HAVE_AF_XDP
64 #include <xdp/xsk.h>
65 #include <net/if.h>
66 #endif
67 
68 #if HAVE_LINUX_IF_ETHER_H
69 #include <linux/if_ether.h>
70 #endif
71 
72 #ifndef HAVE_AF_XDP
73 
74 TmEcode NoAFXDPSupportExit(ThreadVars *, const void *, void **);
75 
77 {
78  tmm_modules[TMM_RECEIVEAFXDP].name = "ReceiveAFXDP";
85 }
86 
87 /**
88  * \brief Registration Function for DecodeAFXDP.
89  */
91 {
92  tmm_modules[TMM_DECODEAFXDP].name = "DecodeAFXDP";
99 }
100 
101 /**
102  * \brief this function prints an error message and exits.
103  */
104 TmEcode NoAFXDPSupportExit(ThreadVars *tv, const void *initdata, void **data)
105 {
106  SCLogError("Error creating thread %s: you do not have "
107  "support for AF_XDP enabled, on Linux host please recompile "
108  "with --enable-af-xdp",
109  tv->name);
110  exit(EXIT_FAILURE);
111 }
112 
113 #else /* We have AF_XDP support */
114 
115 #define POLL_TIMEOUT 100
116 #define NUM_FRAMES XSK_RING_PROD__DEFAULT_NUM_DESCS
117 #define FRAME_SIZE XSK_UMEM__DEFAULT_FRAME_SIZE
118 #define MEM_BYTES (NUM_FRAMES * FRAME_SIZE * 2)
119 #define RECONNECT_TIMEOUT 500000
120 
121 /* Interface state */
122 enum state { AFXDP_STATE_DOWN, AFXDP_STATE_UP };
123 
124 struct XskInitProtect {
125  SCMutex queue_protect;
126  SC_ATOMIC_DECLARE(uint8_t, queue_num);
127 } xsk_protect;
128 
129 struct UmemInfo {
130  void *buf;
131  struct xsk_umem *umem;
132  struct xsk_ring_prod fq;
133  struct xsk_ring_cons cq;
134  struct xsk_umem_config cfg;
135  int mmap_alignment_flag;
136 };
137 
138 struct QueueAssignment {
139  uint32_t queue_num;
140  bool assigned;
141 };
142 
143 struct XskSockInfo {
144  struct xsk_ring_cons rx;
145  struct xsk_ring_prod tx;
146  struct xsk_socket *xsk;
147 
148  /* Queue assignment structure */
149  struct QueueAssignment queue;
150 
151  /* Configuration items */
152  struct xsk_socket_config cfg;
153  bool enable_busy_poll;
154  uint32_t busy_poll_time;
155  uint32_t busy_poll_budget;
156 
157  struct pollfd fd;
158 };
159 
160 /**
161  * \brief Structure to hold thread specific variables.
162  */
163 typedef struct AFXDPThreadVars_ {
164  ThreadVars *tv;
165  TmSlot *slot;
166  LiveDevice *livedev;
167 
168  /* thread specific socket */
169  int promisc;
170  int threads;
171 
172  char iface[AFXDP_IFACE_NAME_LENGTH];
173  uint32_t ifindex;
174 
175  /* AF_XDP structure */
176  struct UmemInfo umem;
177  struct XskSockInfo xsk;
178  uint32_t gro_flush_timeout;
179  uint32_t napi_defer_hard_irqs;
180  uint32_t prog_id;
181 
182  /* Handle state */
183  uint8_t afxdp_state;
184 
185  /* Stats parameters */
186  uint64_t pkts;
187  uint64_t bytes;
188  uint16_t capture_afxdp_packets;
189  uint16_t capture_kernel_drops;
190  uint16_t capture_afxdp_poll;
191  uint16_t capture_afxdp_poll_timeout;
192  uint16_t capture_afxdp_poll_failed;
193  uint16_t capture_afxdp_empty_reads;
194  uint16_t capture_afxdp_failed_reads;
195  uint16_t capture_afxdp_acquire_pkt_failed;
196 } AFXDPThreadVars;
197 
198 static TmEcode ReceiveAFXDPThreadInit(ThreadVars *, const void *, void **);
199 static void ReceiveAFXDPThreadExitStats(ThreadVars *, void *);
200 static TmEcode ReceiveAFXDPThreadDeinit(ThreadVars *, void *);
201 static TmEcode ReceiveAFXDPLoop(ThreadVars *tv, void *data, void *slot);
202 
203 static TmEcode DecodeAFXDPThreadInit(ThreadVars *, const void *, void **);
204 static TmEcode DecodeAFXDPThreadDeinit(ThreadVars *tv, void *data);
205 static TmEcode DecodeAFXDP(ThreadVars *, Packet *, void *);
206 
207 /**
208  * \brief Registration Function for RecieveAFXDP.
209  * \todo Unit tests are needed for this module.
210  */
212 {
213  tmm_modules[TMM_RECEIVEAFXDP].name = "ReceiveAFXDP";
214  tmm_modules[TMM_RECEIVEAFXDP].ThreadInit = ReceiveAFXDPThreadInit;
216  tmm_modules[TMM_RECEIVEAFXDP].PktAcqLoop = ReceiveAFXDPLoop;
218  tmm_modules[TMM_RECEIVEAFXDP].ThreadExitPrintStats = ReceiveAFXDPThreadExitStats;
219  tmm_modules[TMM_RECEIVEAFXDP].ThreadDeinit = ReceiveAFXDPThreadDeinit;
222 }
223 
224 /**
225  * \brief Registration Function for DecodeAFXDP.
226  * \todo Unit tests are needed for this module.
227  */
229 {
230  tmm_modules[TMM_DECODEAFXDP].name = "DecodeAFXDP";
231  tmm_modules[TMM_DECODEAFXDP].ThreadInit = DecodeAFXDPThreadInit;
232  tmm_modules[TMM_DECODEAFXDP].Func = DecodeAFXDP;
234  tmm_modules[TMM_DECODEAFXDP].ThreadDeinit = DecodeAFXDPThreadDeinit;
237 }
238 
239 static inline void AFXDPDumpCounters(AFXDPThreadVars *ptv)
240 {
241  struct xdp_statistics stats;
242  socklen_t len = sizeof(struct xdp_statistics);
243  int fd = xsk_socket__fd(ptv->xsk.xsk);
244 
245  if (getsockopt(fd, SOL_XDP, XDP_STATISTICS, &stats, &len) >= 0) {
246  uint64_t rx_dropped = stats.rx_dropped + stats.rx_invalid_descs + stats.rx_ring_full;
247 
248  StatsAddUI64(ptv->tv, ptv->capture_kernel_drops,
249  rx_dropped - StatsGetLocalCounterValue(ptv->tv, ptv->capture_kernel_drops));
250  StatsAddUI64(ptv->tv, ptv->capture_afxdp_packets, ptv->pkts);
251 
252  (void)SC_ATOMIC_SET(ptv->livedev->drop, rx_dropped);
253  (void)SC_ATOMIC_ADD(ptv->livedev->pkts, ptv->pkts);
254 
255  SCLogDebug("(%s) Kernel: Packets %" PRIu64 ", bytes %" PRIu64 ", dropped %" PRIu64 "",
256  ptv->tv->name, StatsGetLocalCounterValue(ptv->tv, ptv->capture_afxdp_packets),
257  ptv->bytes, StatsGetLocalCounterValue(ptv->tv, ptv->capture_kernel_drops));
258 
259  ptv->pkts = 0;
260  }
261 }
262 
263 /**
264  * \brief Init function for socket creation.
265  *
266  * Mutex used to synchronise initialisation - each socket opens a
267  * different queue. The specific order in which each queue is
268  * opened is not important, but it is vital the queue_num's
269  * are different.
270  *
271  * \param tv pointer to ThreadVars
272  */
274 {
275  SCEnter();
276 
277  SCMutexInit(&xsk_protect.queue_protect, NULL);
278  SC_ATOMIC_SET(xsk_protect.queue_num, 0);
280 }
281 
282 void AFXDPMutexClean(void)
283 {
284  SCMutexDestroy(&xsk_protect.queue_protect);
285 }
286 
287 static TmEcode AFXDPAssignQueueID(AFXDPThreadVars *ptv)
288 {
289  if (ptv->xsk.queue.assigned == false) {
290  ptv->xsk.queue.queue_num = SC_ATOMIC_GET(xsk_protect.queue_num);
291  SC_ATOMIC_ADD(xsk_protect.queue_num, 1);
292 
293  /* Queue only needs assigned once, on startup */
294  ptv->xsk.queue.assigned = true;
295  }
297 }
298 
299 static void AFXDPAllThreadsRunning(AFXDPThreadVars *ptv)
300 {
301  SCMutexLock(&xsk_protect.queue_protect);
302  if ((ptv->threads - 1) == (int)ptv->xsk.queue.queue_num) {
303  SCLogDebug("All AF_XDP capture threads are running.");
304  }
305  SCMutexUnlock(&xsk_protect.queue_protect);
306 }
307 
308 static TmEcode AcquireBuffer(AFXDPThreadVars *ptv)
309 {
310  int mmap_flags = MAP_PRIVATE | MAP_ANONYMOUS | ptv->umem.mmap_alignment_flag;
311  ptv->umem.buf = mmap(NULL, MEM_BYTES, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
312 
313  if (ptv->umem.buf == MAP_FAILED) {
314  SCLogError("mmap: failed to acquire memory");
316  }
317 
319 }
320 
321 static TmEcode ConfigureXSKUmem(AFXDPThreadVars *ptv)
322 {
323  if (xsk_umem__create(&ptv->umem.umem, ptv->umem.buf, MEM_BYTES, &ptv->umem.fq, &ptv->umem.cq,
324  &ptv->umem.cfg)) {
325  SCLogError("failed to create umem: %s", strerror(errno));
327  }
328 
330 }
331 
332 static TmEcode InitFillRing(AFXDPThreadVars *ptv, const uint32_t cnt)
333 {
334  uint32_t idx_fq = 0;
335 
336  uint32_t ret = xsk_ring_prod__reserve(&ptv->umem.fq, cnt, &idx_fq);
337  if (ret != cnt) {
338  SCLogError("Failed to initialise the fill ring.");
340  }
341 
342  for (uint32_t i = 0; i < cnt; i++) {
343  *xsk_ring_prod__fill_addr(&ptv->umem.fq, idx_fq++) = i * FRAME_SIZE;
344  }
345 
346  xsk_ring_prod__submit(&ptv->umem.fq, cnt);
348 }
349 
350 /**
351  * \brief Linux knobs are tuned to enable a NAPI polling context
352  *
353  * \param tv pointer to AFXDPThreadVars
354  */
355 static TmEcode WriteLinuxTunables(AFXDPThreadVars *ptv)
356 {
357  char fname[SYSFS_MAX_FILENAME_SIZE];
358 
359  if (snprintf(fname, SYSFS_MAX_FILENAME_SIZE, "class/net/%s/gro_flush_timeout", ptv->iface) <
360  0) {
362  }
363 
364  if (SysFsWriteValue(fname, ptv->gro_flush_timeout) != TM_ECODE_OK) {
366  }
367 
368  if (snprintf(fname, SYSFS_MAX_FILENAME_SIZE, "class/net/%s/napi_defer_hard_irqs", ptv->iface) <
369  0) {
371  }
372 
373  if (SysFsWriteValue(fname, ptv->napi_defer_hard_irqs) != TM_ECODE_OK) {
375  }
376 
378 }
379 
380 static TmEcode ConfigureBusyPolling(AFXDPThreadVars *ptv)
381 {
382  if (!ptv->xsk.enable_busy_poll) {
384  }
385 
386  /* Kernel version must be >= 5.11 to avail of SO_PREFER_BUSY_POLL
387  * see linux commit: 7fd3253a7de6a317a0683f83739479fb880bffc8
388  */
389  if (!SCKernelVersionIsAtLeast(5, 11)) {
390  SCLogWarning("Kernel version older than required: v5.11,"
391  " upgrade kernel version to use 'enable-busy-poll' option.");
393  }
394 
395 #if defined SO_PREFER_BUSY_POLL && defined SO_BUSY_POLL && defined SO_BUSY_POLL_BUDGET
396  const int fd = xsk_socket__fd(ptv->xsk.xsk);
397  int sock_opt = 1;
398 
399  if (WriteLinuxTunables(ptv) != TM_ECODE_OK) {
401  }
402 
403  if (setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
405  }
406 
407  sock_opt = ptv->xsk.busy_poll_time;
408  if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
410  }
411 
412  sock_opt = ptv->xsk.busy_poll_budget;
413  if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL_BUDGET, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
415  }
416 
418 #else
419  SCLogWarning(
420  "Kernel does not support busy poll, upgrade kernel or disable \"enable-busy-poll\".");
422 #endif
423 }
424 
425 static void AFXDPSwitchState(AFXDPThreadVars *ptv, int state)
426 {
427  ptv->afxdp_state = state;
428 }
429 
430 static TmEcode OpenXSKSocket(AFXDPThreadVars *ptv)
431 {
432  int ret;
433 
434  SCMutexLock(&xsk_protect.queue_protect);
435 
436  if (AFXDPAssignQueueID(ptv) != TM_ECODE_OK) {
437  SCLogError("Failed to assign queue ID");
439  }
440 
441  if ((ret = xsk_socket__create(&ptv->xsk.xsk, ptv->livedev->dev, ptv->xsk.queue.queue_num,
442  ptv->umem.umem, &ptv->xsk.rx, &ptv->xsk.tx, &ptv->xsk.cfg))) {
443  SCLogError("Failed to create socket: %s", strerror(-ret));
445  }
446  SCLogDebug("bind to %s on queue %u", ptv->iface, ptv->xsk.queue.queue_num);
447 
448  /* For polling and socket options */
449  ptv->xsk.fd.fd = xsk_socket__fd(ptv->xsk.xsk);
450  ptv->xsk.fd.events = POLLIN;
451 
452  /* Set state */
453  AFXDPSwitchState(ptv, AFXDP_STATE_UP);
454 
455  SCMutexUnlock(&xsk_protect.queue_protect);
457 }
458 
459 static void AFXDPCloseSocket(AFXDPThreadVars *ptv)
460 {
461  if (ptv->xsk.xsk) {
462  xsk_socket__delete(ptv->xsk.xsk);
463  ptv->xsk.xsk = NULL;
464  }
465 
466  if (ptv->umem.umem) {
467  xsk_umem__delete(ptv->umem.umem);
468  ptv->umem.umem = NULL;
469  }
470 
471  memset(&ptv->umem.fq, 0, sizeof(struct xsk_ring_prod));
472  memset(&ptv->umem.cq, 0, sizeof(struct xsk_ring_cons));
473 }
474 
475 static TmEcode AFXDPSocketCreation(AFXDPThreadVars *ptv)
476 {
477  if (ConfigureXSKUmem(ptv) != TM_ECODE_OK) {
479  }
480 
481  if (InitFillRing(ptv, NUM_FRAMES * 2) != TM_ECODE_OK) {
483  }
484 
485  /* Open AF_XDP socket */
486  if (OpenXSKSocket(ptv) != TM_ECODE_OK) {
488  }
489 
490  if (ConfigureBusyPolling(ptv) != TM_ECODE_OK) {
491  SCLogWarning("Failed to configure busy polling"
492  " performance may be reduced.");
493  }
494 
495  /* Has the eBPF program successfully bound? */
496 #ifdef HAVE_BPF_XDP_QUERY_ID
497  if (bpf_xdp_query_id(ptv->ifindex, ptv->xsk.cfg.xdp_flags, &ptv->prog_id)) {
498  SCLogError("Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
500  }
501 #else
502  if (bpf_get_link_xdp_id(ptv->ifindex, &ptv->prog_id, ptv->xsk.cfg.xdp_flags)) {
503  SCLogError("Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
505  }
506 #endif
507 
509 }
510 
511 /**
512  * \brief Try to reopen AF_XDP socket
513  *
514  * \retval: TM_ECODE_OK in case of success
515  * TM_ECODE_FAILED if error occurs or a condition is not met.
516  */
517 static TmEcode AFXDPTryReopen(AFXDPThreadVars *ptv)
518 {
519  AFXDPCloseSocket(ptv);
520  usleep(RECONNECT_TIMEOUT);
521 
522  int if_flags = GetIfaceFlags(ptv->iface);
523  if (if_flags == -1) {
524  SCLogDebug("Couldn't get flags for interface '%s'", ptv->iface);
525  goto sock_err;
526  } else if ((if_flags & (IFF_UP | IFF_RUNNING)) == 0) {
527  SCLogDebug("Interface '%s' is down", ptv->iface);
528  goto sock_err;
529  }
530 
531  if (AFXDPSocketCreation(ptv) != TM_ECODE_OK) {
533  }
534 
535  SCLogInfo("Interface '%s' is back", ptv->iface);
537 
538 sock_err:
540 }
541 
542 /**
543  * \brief Write packet entry to the fill ring, freeing
544  * this slot for re/fill with inbound packet descriptor
545  * \param pointer to Packet
546  * \retval: None
547  */
548 static void AFXDPReleasePacket(Packet *p)
549 {
550  *xsk_ring_prod__fill_addr((struct xsk_ring_prod *)p->afxdp_v.fq, p->afxdp_v.fq_idx) =
551  p->afxdp_v.orig;
552 
554 }
555 
556 static inline int DumpStatsEverySecond(AFXDPThreadVars *ptv, time_t *last_dump)
557 {
558  int stats_dumped = 0;
559  time_t current_time = time(NULL);
560 
561  if (current_time != *last_dump) {
562  AFXDPDumpCounters(ptv);
563  *last_dump = current_time;
564  stats_dumped = 1;
565  }
566 
568 
569  return stats_dumped;
570 }
571 
572 static inline ssize_t WakeupSocket(void *data)
573 {
574  ssize_t res = 0;
575  AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
576 
577  /* Assuming kernel >= 5.11 in use if xdp_busy_poll is enabled */
578  if (ptv->xsk.enable_busy_poll || xsk_ring_prod__needs_wakeup(&ptv->umem.fq)) {
579  res = recvfrom(xsk_socket__fd(ptv->xsk.xsk), NULL, 0, MSG_DONTWAIT, NULL, NULL);
580  }
581 
582  return res;
583 }
584 
585 /**
586  * \brief Init function for ReceiveAFXDP.
587  *
588  * \param tv pointer to ThreadVars
589  * \param initdata pointer to the interface passed from the user
590  * \param data pointer gets populated with AFPThreadVars
591  *
592  * \todo Create a general AFP setup function.
593  */
594 static TmEcode ReceiveAFXDPThreadInit(ThreadVars *tv, const void *initdata, void **data)
595 {
596  SCEnter();
597 
598  AFXDPIfaceConfig *afxdpconfig = (AFXDPIfaceConfig *)initdata;
599 
600  if (initdata == NULL) {
601  SCLogError("initdata == NULL");
603  }
604 
605  AFXDPThreadVars *ptv = SCMalloc(sizeof(AFXDPThreadVars));
606  if (unlikely(ptv == NULL)) {
607  afxdpconfig->DerefFunc(afxdpconfig);
609  }
610  memset(ptv, 0, sizeof(AFXDPThreadVars));
611 
612  ptv->tv = tv;
613 
614  strlcpy(ptv->iface, afxdpconfig->iface, AFXDP_IFACE_NAME_LENGTH);
615  ptv->iface[AFXDP_IFACE_NAME_LENGTH - 1] = '\0';
616  ptv->ifindex = if_nametoindex(ptv->iface);
617 
618  ptv->livedev = LiveGetDevice(ptv->iface);
619  if (ptv->livedev == NULL) {
620  SCLogError("Unable to find Live device");
621  SCFree(ptv);
623  }
624 
625  ptv->promisc = afxdpconfig->promisc;
626  if (ptv->promisc != 0) {
627  /* Force promiscuous mode */
628  if (SetIfaceFlags(ptv->iface, IFF_PROMISC | IFF_UP) != 0) {
629  SCLogError("Failed to switch interface (%s) to promiscuous, error %s", ptv->iface,
630  strerror(errno));
631  SCFree(ptv);
633  }
634  }
635 
636  ptv->threads = afxdpconfig->threads;
637 
638  /* Socket configuration */
639  ptv->xsk.cfg.rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS;
640  ptv->xsk.cfg.tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS;
641  ptv->xsk.cfg.xdp_flags = afxdpconfig->mode;
642  ptv->xsk.cfg.bind_flags = afxdpconfig->bind_flags;
643 
644  /* UMEM configuration */
645  ptv->umem.cfg.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2;
646  ptv->umem.cfg.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS;
647  ptv->umem.cfg.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE;
648  ptv->umem.cfg.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM;
649  ptv->umem.cfg.flags = afxdpconfig->mem_alignment;
650 
651  /* Use hugepages if unaligned chunk mode */
652  if (ptv->umem.cfg.flags == XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
653  ptv->umem.mmap_alignment_flag = MAP_HUGETLB;
654  }
655 
656  /* Busy polling configuration */
657  ptv->xsk.enable_busy_poll = afxdpconfig->enable_busy_poll;
658  ptv->xsk.busy_poll_budget = afxdpconfig->busy_poll_budget;
659  ptv->xsk.busy_poll_time = afxdpconfig->busy_poll_time;
660  ptv->gro_flush_timeout = afxdpconfig->gro_flush_timeout;
661  ptv->napi_defer_hard_irqs = afxdpconfig->napi_defer_hard_irqs;
662 
663  /* Stats registration */
664  ptv->capture_afxdp_packets = StatsRegisterCounter("capture.afxdp_packets", ptv->tv);
665  ptv->capture_kernel_drops = StatsRegisterCounter("capture.kernel_drops", ptv->tv);
666  ptv->capture_afxdp_poll = StatsRegisterCounter("capture.afxdp.poll", ptv->tv);
667  ptv->capture_afxdp_poll_timeout = StatsRegisterCounter("capture.afxdp.poll_timeout", ptv->tv);
668  ptv->capture_afxdp_poll_failed = StatsRegisterCounter("capture.afxdp.poll_failed", ptv->tv);
669  ptv->capture_afxdp_empty_reads = StatsRegisterCounter("capture.afxdp.empty_reads", ptv->tv);
670  ptv->capture_afxdp_failed_reads = StatsRegisterCounter("capture.afxdp.failed_reads", ptv->tv);
671  ptv->capture_afxdp_acquire_pkt_failed =
672  StatsRegisterCounter("capture.afxdp.acquire_pkt_failed", ptv->tv);
673 
674  /* Reserve memory for umem */
675  if (AcquireBuffer(ptv) != TM_ECODE_OK) {
676  SCFree(ptv);
678  }
679 
680  if (AFXDPSocketCreation(ptv) != TM_ECODE_OK) {
681  ReceiveAFXDPThreadDeinit(tv, ptv);
683  }
684 
685  *data = (void *)ptv;
686  afxdpconfig->DerefFunc(afxdpconfig);
688 }
689 
690 /**
691  * \brief Main AF_XDP reading Loop function
692  */
693 static TmEcode ReceiveAFXDPLoop(ThreadVars *tv, void *data, void *slot)
694 {
695  SCEnter();
696 
697  Packet *p;
698  time_t last_dump = 0;
699  struct timeval ts;
700  uint32_t idx_rx = 0, idx_fq = 0, rcvd;
701  int r;
702  AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
703  TmSlot *s = (TmSlot *)slot;
704 
705  ptv->slot = s->slot_next;
706 
707  AFXDPAllThreadsRunning(ptv);
708 
709  // Indicate that the thread is actually running its application level code (i.e., it can poll
710  // packets)
712 
713  PacketPoolWait();
714  while (1) {
715  /* Start by checking the state of our interface */
716  if (unlikely(ptv->afxdp_state == AFXDP_STATE_DOWN)) {
717  do {
718  usleep(RECONNECT_TIMEOUT);
719  if (unlikely(suricata_ctl_flags != 0)) {
720  break;
721  }
722  r = AFXDPTryReopen(ptv);
723  } while (r != TM_ECODE_OK);
724  }
725 
726  if (unlikely(suricata_ctl_flags != 0)) {
727  SCLogDebug("Stopping Suricata!");
728  AFXDPDumpCounters(ptv);
729  break;
730  }
731 
732  /* Busy polling is not set, using poll() to maintain (relatively) decent
733  * performance. xdp_busy_poll must be disabled for kernels < 5.11
734  */
735  if (!ptv->xsk.enable_busy_poll) {
736  StatsIncr(ptv->tv, ptv->capture_afxdp_poll);
737 
738  r = poll(&ptv->xsk.fd, 1, POLL_TIMEOUT);
739 
740  /* Report poll results */
741  if (r <= 0) {
742  if (r == 0) {
743  StatsIncr(ptv->tv, ptv->capture_afxdp_poll_timeout);
744  } else if (r < 0) {
745  StatsIncr(ptv->tv, ptv->capture_afxdp_poll_failed);
746  SCLogWarning("poll failed with retval %d", r);
747  AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
748  }
749 
750  DumpStatsEverySecond(ptv, &last_dump);
751  continue;
752  }
753  }
754 
755  rcvd = xsk_ring_cons__peek(&ptv->xsk.rx, ptv->xsk.busy_poll_budget, &idx_rx);
756  if (!rcvd) {
757  StatsIncr(ptv->tv, ptv->capture_afxdp_empty_reads);
758  ssize_t ret = WakeupSocket(ptv);
759  if (ret < 0) {
760  SCLogWarning("recv failed with retval %ld", ret);
761  AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
762  }
763  DumpStatsEverySecond(ptv, &last_dump);
764  continue;
765  }
766 
767  uint32_t res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
768  while (res != rcvd) {
769  StatsIncr(ptv->tv, ptv->capture_afxdp_failed_reads);
770  ssize_t ret = WakeupSocket(ptv);
771  if (ret < 0) {
772  SCLogWarning("recv failed with retval %ld", ret);
773  AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
774  continue;
775  }
776  res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
777  }
778 
779  gettimeofday(&ts, NULL);
780  ptv->pkts += rcvd;
781  for (uint32_t i = 0; i < rcvd; i++) {
783  if (unlikely(p == NULL)) {
784  StatsIncr(ptv->tv, ptv->capture_afxdp_acquire_pkt_failed);
785  continue;
786  }
787 
790  p->livedev = ptv->livedev;
791  p->ReleasePacket = AFXDPReleasePacket;
793 
794  p->ts = SCTIME_FROM_TIMEVAL(&ts);
795 
796  uint64_t addr = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx)->addr;
797  uint32_t len = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx++)->len;
798  uint64_t orig = xsk_umem__extract_addr(addr);
799  addr = xsk_umem__add_offset_to_addr(addr);
800 
801  uint8_t *pkt_data = xsk_umem__get_data(ptv->umem.buf, addr);
802 
803  ptv->bytes += len;
804 
805  p->afxdp_v.fq_idx = idx_fq++;
806  p->afxdp_v.orig = orig;
807  p->afxdp_v.fq = &ptv->umem.fq;
808 
809  PacketSetData(p, pkt_data, len);
810 
811  if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
812  TmqhOutputPacketpool(ptv->tv, p);
813  SCReturnInt(EXIT_FAILURE);
814  }
815  }
816 
817  xsk_ring_prod__submit(&ptv->umem.fq, rcvd);
818  xsk_ring_cons__release(&ptv->xsk.rx, rcvd);
819 
820  /* Trigger one dump of stats every second */
821  DumpStatsEverySecond(ptv, &last_dump);
822  }
823 
825 }
826 
827 /**
828  * \brief DeInit function closes af-xdp socket at exit.
829  * \param tv pointer to ThreadVars
830  * \param data pointer that gets cast into AFXDPPThreadVars for ptv
831  */
832 static TmEcode ReceiveAFXDPThreadDeinit(ThreadVars *tv, void *data)
833 {
834  AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
835 
836  if (ptv->xsk.xsk) {
837  xsk_socket__delete(ptv->xsk.xsk);
838  ptv->xsk.xsk = NULL;
839  }
840 
841  if (ptv->umem.umem) {
842  xsk_umem__delete(ptv->umem.umem);
843  ptv->umem.umem = NULL;
844  }
845  munmap(ptv->umem.buf, MEM_BYTES);
846 
847  SCFree(ptv);
849 }
850 
851 /**
852  * \brief This function prints stats to the screen at exit.
853  * \param tv pointer to ThreadVars
854  * \param data pointer that gets cast into AFXDPThreadVars for ptv
855  */
856 static void ReceiveAFXDPThreadExitStats(ThreadVars *tv, void *data)
857 {
858  SCEnter();
859  AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
860 
861  AFXDPDumpCounters(ptv);
862 
863  SCLogPerf("(%s) Kernel: Packets %" PRIu64 ", bytes %" PRIu64 ", dropped %" PRIu64 "", tv->name,
864  StatsGetLocalCounterValue(tv, ptv->capture_afxdp_packets), ptv->bytes,
865  StatsGetLocalCounterValue(tv, ptv->capture_kernel_drops));
866 }
867 
868 /**
869  * \brief This function passes off to link type decoders.
870  *
871  * DecodeAFXDP decodes packets from AF_XDP and passes
872  * them off to the proper link type decoder.
873  *
874  * \param t pointer to ThreadVars
875  * \param p pointer to the current packet
876  * \param data pointer that gets cast into AFXDPThreadVars for ptv
877  */
878 static TmEcode DecodeAFXDP(ThreadVars *tv, Packet *p, void *data)
879 {
880  SCEnter();
881 
883 
885 
886  /* update counters */
888 
889  /* If suri has set vlan during reading, we increase vlan counter */
890  if (p->vlan_idx) {
892  }
893 
894  /* call the decoder */
895  DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
896 
898 
900 }
901 
902 static TmEcode DecodeAFXDPThreadInit(ThreadVars *tv, const void *initdata, void **data)
903 {
904  SCEnter();
906  if (dtv == NULL)
908 
910 
911  *data = (void *)dtv;
912 
914 }
915 
916 static TmEcode DecodeAFXDPThreadDeinit(ThreadVars *tv, void *data)
917 {
918  if (data != NULL)
919  DecodeThreadVarsFree(tv, data);
921 }
922 
923 #endif /* HAVE_AF_XDP */
924 /* eof */
925 /**
926  * @}
927  */
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:73
tm-threads.h
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:167
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
PacketFreeOrRelease
void PacketFreeOrRelease(Packet *p)
Return a packet to where it was allocated.
Definition: decode.c:191
AFXDPIfaceConfig::mode
uint32_t mode
Definition: source-af-xdp.h:36
PKT_IS_PSEUDOPKT
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition: decode.h:1053
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:387
AFXDPIfaceConfig::iface
char iface[AFXDP_IFACE_NAME_LENGTH]
Definition: source-af-xdp.h:30
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:99
AFXDPIfaceConfig::mem_alignment
int mem_alignment
Definition: source-af-xdp.h:38
AFXDPIfaceConfig::gro_flush_timeout
uint32_t gro_flush_timeout
Definition: source-af-xdp.h:42
AFXDPIfaceConfig::DerefFunc
void(* DerefFunc)(void *)
Definition: source-af-xdp.h:46
util-checksum.h
Packet_::flags
uint32_t flags
Definition: decode.h:467
threads.h
TMM_RECEIVEAFXDP
@ TMM_RECEIVEAFXDP
Definition: tm-threads-common.h:55
Packet_::vlan_idx
uint8_t vlan_idx
Definition: decode.h:458
LiveDevice_
Definition: util-device.h:49
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:333
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
packet-queue.h
SCKernelVersionIsAtLeast
int SCKernelVersionIsAtLeast(int major, int minor)
Definition: util-host-info.c:37
tm-threads-common.h
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
tm-modules.h
AFXDP_IFACE_NAME_LENGTH
#define AFXDP_IFACE_NAME_LENGTH
Definition: source-af-xdp.h:27
util-privs.h
StatsSyncCountersIfSignalled
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:141
PacketDecodeFinalize
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition: decode.c:147
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:356
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:85
AFXDPIfaceConfig::napi_defer_hard_irqs
uint32_t napi_defer_hard_irqs
Definition: source-af-xdp.h:43
tmqh-packetpool.h
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:84
NoAFXDPSupportExit
TmEcode NoAFXDPSupportExit(ThreadVars *, const void *, void **)
this function prints an error message and exits.
Definition: source-af-xdp.c:104
AFXDPQueueProtectionInit
TmEcode AFXDPQueueProtectionInit(void)
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
Packet_::datalink
int datalink
Definition: decode.h:611
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1056
DecodeRegisterPerfCounters
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:525
decode.h
util-device.h
util-sysfs.h
util-debug.h
PKT_SRC_WIRE
@ PKT_SRC_WIRE
Definition: decode.h:54
util-error.h
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
AFXDPIfaceConfig::enable_busy_poll
bool enable_busy_poll
Definition: source-af-xdp.h:39
util-cpu.h
SysFsWriteValue
TmEcode SysFsWriteValue(const char *path, int64_t value)
Definition: util-sysfs.c:30
Packet_::ts
SCTime_t ts
Definition: decode.h:475
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
LiveGetDevice
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
Definition: util-device.c:248
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
GET_PKT_DATA
#define GET_PKT_DATA(p)
Definition: decode.h:220
util-ebpf.h
AFXDPIfaceConfig::busy_poll_budget
uint32_t busy_poll_budget
Definition: source-af-xdp.h:41
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:71
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
TMM_DECODEAFXDP
@ TMM_DECODEAFXDP
Definition: tm-threads-common.h:57
AFXDPIfaceConfig::promisc
int promisc
Definition: source-af-xdp.h:33
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
AFXDPIfaceConfig::bind_flags
uint32_t bind_flags
Definition: source-af-xdp.h:37
StatsGetLocalCounterValue
uint64_t StatsGetLocalCounterValue(ThreadVars *tv, uint16_t id)
Get the value of the local copy of the counter that hold this id.
Definition: counters.c:1260
SC_ATOMIC_DECLARE
#define SC_ATOMIC_DECLARE(type, name)
wrapper for declaring atomic variables.
Definition: util-atomic.h:281
AFXDPIfaceConfig::threads
int threads
Definition: source-af-xdp.h:32
PacketPoolWait
void PacketPoolWait(void)
Definition: tmqh-packetpool.c:69
Packet_
Definition: decode.h:430
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:32
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:33
GET_PKT_LEN
#define GET_PKT_LEN(p)
Definition: decode.h:219
AFXDPIfaceConfig::busy_poll_time
uint32_t busy_poll_time
Definition: source-af-xdp.h:40
conf.h
TmSlot_
Definition: tm-threads.h:53
PKT_IGNORE_CHECKSUM
#define PKT_IGNORE_CHECKSUM
Definition: decode.h:1016
Packet_::livedev
struct LiveDevice_ * livedev
Definition: decode.h:590
TmEcode
TmEcode
Definition: tm-threads-common.h:83
util-host-info.h
TmModule_::name
const char * name
Definition: tm-modules.h:44
DecodeThreadVars_::counter_vlan
uint16_t counter_vlan
Definition: decode.h:704
runmodes.h
SCLogInfo
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:224
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:31
dtv
DecodeThreadVars * dtv
Definition: fuzz_decodepcapfile.c:33
TmModuleDecodeAFXDPRegister
void TmModuleDecodeAFXDPRegister(void)
Registration Function for DecodeAFXDP.
Definition: source-af-xdp.c:90
tm-queuehandlers.h
Packet_::ReleasePacket
void(* ReleasePacket)(struct Packet_ *)
Definition: decode.h:519
flow-storage.h
SYSFS_MAX_FILENAME_SIZE
#define SYSFS_MAX_FILENAME_SIZE
Definition: util-sysfs.h:32
DecodeThreadVarsFree
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:707
suricata-common.h
source-af-xdp.h
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
TmModuleReceiveAFXDPRegister
void TmModuleReceiveAFXDPRegister(void)
Definition: source-af-xdp.c:76
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
threadvars.h
util-validate.h
StatsAddUI64
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:146
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
AFXDPIfaceConfig
Definition: source-af-xdp.h:29
POLL_TIMEOUT
#define POLL_TIMEOUT
Definition: source-af-packet.c:174
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
SCFree
#define SCFree(p)
Definition: util-mem.h:61
DecodeThreadVars_
Structure to hold thread specific data for all decode modules.
Definition: decode.h:668
util-ioctl.h
DecodeThreadVarsAlloc
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:688
PacketSetData
int PacketSetData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zero copy is used.
Definition: decode.c:727
suricata.h
AFXDPMutexClean
void AFXDPMutexClean(void)
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:376
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:961
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:275
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:208
SC_CAP_NET_RAW
#define SC_CAP_NET_RAW
Definition: util-privs.h:32
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:104
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:76
DecodeUpdatePacketCounters
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition: decode.c:654
LINKTYPE_ETHERNET
#define LINKTYPE_ETHERNET
Definition: decode.h:969
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:172