suricata
source-af-xdp.c
Go to the documentation of this file.
1 /* Copyright (C) 2011-2022 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \defgroup afxdppacket AF_XDP running mode
20  *
21  * @{
22  */
23 
24 /**
25  * \file
26  *
27  * \author Richard McConnell <richard_mcconnell@rapid7.com>
28  *
29  * AF_XDP socket acquisition support
30  *
31  */
32 #define SC_PCAP_DONT_INCLUDE_PCAP_H 1
34 #include "suricata.h"
35 #include "decode.h"
36 #include "packet-queue.h"
37 #include "threads.h"
38 #include "threadvars.h"
39 #include "tm-queuehandlers.h"
40 #include "tm-modules.h"
41 #include "tm-threads.h"
42 #include "tm-threads-common.h"
43 #include "conf.h"
44 #include "util-cpu.h"
45 #include "util-datalink.h"
46 #include "util-debug.h"
47 #include "util-device-private.h"
48 #include "util-ebpf.h"
49 #include "util-error.h"
50 #include "util-privs.h"
51 #include "util-optimize.h"
52 #include "util-checksum.h"
53 #include "util-ioctl.h"
54 #include "util-host-info.h"
55 #include "util-sysfs.h"
56 #include "tmqh-packetpool.h"
57 #include "source-af-xdp.h"
58 #include "runmodes.h"
59 #include "flow-storage.h"
60 #include "util-validate.h"
61 
62 #ifdef HAVE_AF_XDP
63 #include <net/if.h>
64 #include <bpf/libbpf.h>
65 #include <xdp/xsk.h>
66 #include <xdp/libxdp.h>
67 #endif
68 
69 #if HAVE_LINUX_IF_ETHER_H
70 #include <linux/if_ether.h>
71 #endif
72 
73 #ifndef HAVE_AF_XDP
74 
75 TmEcode NoAFXDPSupportExit(ThreadVars *, const void *, void **);
76 
78 {
79  tmm_modules[TMM_RECEIVEAFXDP].name = "ReceiveAFXDP";
86 }
87 
88 /**
89  * \brief Registration Function for DecodeAFXDP.
90  */
92 {
93  tmm_modules[TMM_DECODEAFXDP].name = "DecodeAFXDP";
100 }
101 
102 /**
103  * \brief this function prints an error message and exits.
104  */
105 TmEcode NoAFXDPSupportExit(ThreadVars *tv, const void *initdata, void **data)
106 {
107  SCLogError("Error creating thread %s: you do not have "
108  "support for AF_XDP enabled, on Linux host please recompile "
109  "with --enable-af-xdp",
110  tv->name);
111  exit(EXIT_FAILURE);
112 }
113 
114 #else /* We have AF_XDP support */
115 
116 #define POLL_TIMEOUT 100
117 #define NUM_FRAMES_PROD XSK_RING_PROD__DEFAULT_NUM_DESCS
118 #define NUM_FRAMES_CONS XSK_RING_CONS__DEFAULT_NUM_DESCS
119 #define NUM_FRAMES NUM_FRAMES_PROD
120 #define FRAME_SIZE XSK_UMEM__DEFAULT_FRAME_SIZE
121 #define MEM_BYTES (NUM_FRAMES * FRAME_SIZE * 2)
122 #define RECONNECT_TIMEOUT 500000
123 
124 /* Interface state */
125 enum state { AFXDP_STATE_DOWN, AFXDP_STATE_UP };
126 
127 struct XskInitProtect {
128  SCMutex queue_protect;
129  SC_ATOMIC_DECLARE(uint8_t, queue_num);
130 } xsk_protect;
131 
132 struct UmemInfo {
133  void *buf;
134  struct xsk_umem *umem;
135  struct xsk_ring_prod fq;
136  struct xsk_ring_cons cq;
137  struct xsk_umem_config cfg;
138  int mmap_alignment_flag;
139 };
140 
141 struct QueueAssignment {
142  uint32_t queue_num;
143  bool assigned;
144 };
145 
146 struct XskSockInfo {
147  struct xsk_ring_cons rx;
148  struct xsk_ring_prod tx;
149  struct xsk_socket *xsk;
150 
151  /* Queue assignment structure */
152  struct QueueAssignment queue;
153 
154  /* Configuration items */
155  struct xsk_socket_config cfg;
156  bool enable_busy_poll;
157  uint32_t busy_poll_time;
158  uint32_t busy_poll_budget;
159 
160  struct pollfd fd;
161 };
162 
163 /**
164  * \brief Structure to hold thread specific variables.
165  */
166 typedef struct AFXDPThreadVars_ {
167  ThreadVars *tv;
168  TmSlot *slot;
169  LiveDevice *livedev;
170 
171  /* thread specific socket */
172  int promisc;
173  int threads;
174 
175  char iface[AFXDP_IFACE_NAME_LENGTH];
176  uint32_t ifindex;
177 
178  /* AF_XDP structure */
179  struct UmemInfo umem;
180  struct XskSockInfo xsk;
181  uint32_t gro_flush_timeout;
182  uint32_t napi_defer_hard_irqs;
183  uint32_t prog_id;
184 
185  /* Handle state */
186  uint8_t afxdp_state;
187 
188  /* Stats parameters */
189  uint64_t pkts;
190  uint64_t bytes;
191  StatsCounterId capture_afxdp_packets;
192  StatsCounterId capture_kernel_drops;
193  StatsCounterId capture_afxdp_poll;
194  StatsCounterId capture_afxdp_poll_timeout;
195  StatsCounterId capture_afxdp_poll_failed;
196  StatsCounterId capture_afxdp_empty_reads;
197  StatsCounterId capture_afxdp_failed_reads;
198  StatsCounterId capture_afxdp_acquire_pkt_failed;
199 } AFXDPThreadVars;
200 
201 static TmEcode ReceiveAFXDPThreadInit(ThreadVars *, const void *, void **);
202 static void ReceiveAFXDPThreadExitStats(ThreadVars *, void *);
203 static TmEcode ReceiveAFXDPThreadDeinit(ThreadVars *, void *);
204 static TmEcode ReceiveAFXDPLoop(ThreadVars *tv, void *data, void *slot);
205 
206 static TmEcode DecodeAFXDPThreadInit(ThreadVars *, const void *, void **);
207 static TmEcode DecodeAFXDPThreadDeinit(ThreadVars *tv, void *data);
208 static TmEcode DecodeAFXDP(ThreadVars *, Packet *, void *);
209 
210 /**
211  * \brief Registration Function for RecieveAFXDP.
212  * \todo Unit tests are needed for this module.
213  */
215 {
216  tmm_modules[TMM_RECEIVEAFXDP].name = "ReceiveAFXDP";
217  tmm_modules[TMM_RECEIVEAFXDP].ThreadInit = ReceiveAFXDPThreadInit;
219  tmm_modules[TMM_RECEIVEAFXDP].PktAcqLoop = ReceiveAFXDPLoop;
221  tmm_modules[TMM_RECEIVEAFXDP].ThreadExitPrintStats = ReceiveAFXDPThreadExitStats;
222  tmm_modules[TMM_RECEIVEAFXDP].ThreadDeinit = ReceiveAFXDPThreadDeinit;
225 }
226 
227 /**
228  * \brief Registration Function for DecodeAFXDP.
229  * \todo Unit tests are needed for this module.
230  */
232 {
233  tmm_modules[TMM_DECODEAFXDP].name = "DecodeAFXDP";
234  tmm_modules[TMM_DECODEAFXDP].ThreadInit = DecodeAFXDPThreadInit;
235  tmm_modules[TMM_DECODEAFXDP].Func = DecodeAFXDP;
237  tmm_modules[TMM_DECODEAFXDP].ThreadDeinit = DecodeAFXDPThreadDeinit;
240 }
241 
242 static inline void AFXDPDumpCounters(AFXDPThreadVars *ptv)
243 {
244  struct xdp_statistics stats;
245  socklen_t len = sizeof(struct xdp_statistics);
246  int fd = xsk_socket__fd(ptv->xsk.xsk);
247 
248  if (getsockopt(fd, SOL_XDP, XDP_STATISTICS, &stats, &len) >= 0) {
249  uint64_t rx_dropped = stats.rx_dropped + stats.rx_invalid_descs + stats.rx_ring_full;
250 
251  StatsCounterAddI64(&ptv->tv->stats, ptv->capture_kernel_drops,
252  rx_dropped - StatsCounterGetLocalValue(&ptv->tv->stats, ptv->capture_kernel_drops));
253  StatsCounterAddI64(&ptv->tv->stats, ptv->capture_afxdp_packets, ptv->pkts);
254 
255  (void)SC_ATOMIC_SET(ptv->livedev->drop, rx_dropped);
256  (void)SC_ATOMIC_ADD(ptv->livedev->pkts, ptv->pkts);
257 
258  SCLogDebug("(%s) Kernel: Packets %" PRIu64 ", bytes %" PRIu64 ", dropped %" PRIu64 "",
259  ptv->tv->name,
260  StatsCounterGetLocalValue(&ptv->tv->stats, ptv->capture_afxdp_packets), ptv->bytes,
261  StatsCounterGetLocalValue(&ptv->tv->stats, ptv->capture_kernel_drops));
262 
263  ptv->pkts = 0;
264  }
265 }
266 
267 /**
268  * \brief Init function for socket creation.
269  *
270  * Mutex used to synchronise initialisation - each socket opens a
271  * different queue. The specific order in which each queue is
272  * opened is not important, but it is vital the queue_num's
273  * are different.
274  *
275  * \param tv pointer to ThreadVars
276  */
278 {
279  SCEnter();
280 
281  SCMutexInit(&xsk_protect.queue_protect, NULL);
282  SC_ATOMIC_SET(xsk_protect.queue_num, 0);
284 }
285 
286 static TmEcode AFXDPAssignQueueID(AFXDPThreadVars *ptv)
287 {
288  if (!ptv->xsk.queue.assigned) {
289  ptv->xsk.queue.queue_num = SC_ATOMIC_GET(xsk_protect.queue_num);
290  SC_ATOMIC_ADD(xsk_protect.queue_num, 1);
291 
292  /* Queue only needs assigned once, on startup */
293  ptv->xsk.queue.assigned = true;
294  }
296 }
297 
298 static void AFXDPAllThreadsRunning(AFXDPThreadVars *ptv)
299 {
300  SCMutexLock(&xsk_protect.queue_protect);
301  if ((ptv->threads - 1) == (int)ptv->xsk.queue.queue_num) {
302  SCLogDebug("All AF_XDP capture threads are running.");
303  }
304  SCMutexUnlock(&xsk_protect.queue_protect);
305 }
306 
307 static TmEcode AcquireBuffer(AFXDPThreadVars *ptv)
308 {
309  int mmap_flags = MAP_PRIVATE | MAP_ANONYMOUS | ptv->umem.mmap_alignment_flag;
310  ptv->umem.buf = mmap(NULL, MEM_BYTES, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
311 
312  if (ptv->umem.buf == MAP_FAILED) {
313  SCLogError("mmap: failed to acquire memory");
315  }
316 
318 }
319 
320 static TmEcode ConfigureXSKUmem(AFXDPThreadVars *ptv)
321 {
322  if (xsk_umem__create(&ptv->umem.umem, ptv->umem.buf, MEM_BYTES, &ptv->umem.fq, &ptv->umem.cq,
323  &ptv->umem.cfg)) {
324  SCLogError("failed to create umem: %s", strerror(errno));
326  }
327 
329 }
330 
331 static TmEcode InitFillRing(AFXDPThreadVars *ptv, const uint32_t cnt)
332 {
333  uint32_t idx_fq = 0;
334 
335  uint32_t ret = xsk_ring_prod__reserve(&ptv->umem.fq, cnt, &idx_fq);
336  if (ret != cnt) {
337  SCLogError("Failed to initialise the fill ring.");
339  }
340 
341  for (uint32_t i = 0; i < cnt; i++) {
342  *xsk_ring_prod__fill_addr(&ptv->umem.fq, idx_fq++) = i * FRAME_SIZE;
343  }
344 
345  xsk_ring_prod__submit(&ptv->umem.fq, cnt);
347 }
348 
349 /**
350  * \brief Linux knobs are tuned to enable a NAPI polling context
351  *
352  * \param tv pointer to AFXDPThreadVars
353  */
354 static TmEcode WriteLinuxTunables(AFXDPThreadVars *ptv)
355 {
356  char fname[SYSFS_MAX_FILENAME_SIZE];
357 
358  if (snprintf(fname, SYSFS_MAX_FILENAME_SIZE, "class/net/%s/gro_flush_timeout", ptv->iface) <
359  0) {
361  }
362 
363  if (SysFsWriteValue(fname, ptv->gro_flush_timeout) != TM_ECODE_OK) {
365  }
366 
367  if (snprintf(fname, SYSFS_MAX_FILENAME_SIZE, "class/net/%s/napi_defer_hard_irqs", ptv->iface) <
368  0) {
370  }
371 
372  if (SysFsWriteValue(fname, ptv->napi_defer_hard_irqs) != TM_ECODE_OK) {
374  }
375 
377 }
378 
379 static TmEcode ConfigureBusyPolling(AFXDPThreadVars *ptv)
380 {
381  if (!ptv->xsk.enable_busy_poll) {
383  }
384 
385  /* Kernel version must be >= 5.11 to avail of SO_PREFER_BUSY_POLL
386  * see linux commit: 7fd3253a7de6a317a0683f83739479fb880bffc8
387  */
388  if (!SCKernelVersionIsAtLeast(5, 11)) {
389  SCLogWarning("Kernel version older than required: v5.11,"
390  " upgrade kernel version to use 'enable-busy-poll' option.");
392  }
393 
394 #if defined SO_PREFER_BUSY_POLL && defined SO_BUSY_POLL && defined SO_BUSY_POLL_BUDGET
395  const int fd = xsk_socket__fd(ptv->xsk.xsk);
396  int sock_opt = 1;
397 
398  if (WriteLinuxTunables(ptv) != TM_ECODE_OK) {
400  }
401 
402  if (setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
404  }
405 
406  sock_opt = ptv->xsk.busy_poll_time;
407  if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
409  }
410 
411  sock_opt = ptv->xsk.busy_poll_budget;
412  if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL_BUDGET, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
414  }
415 
417 #else
418  SCLogWarning(
419  "Kernel does not support busy poll, upgrade kernel or disable \"enable-busy-poll\".");
421 #endif
422 }
423 
424 static void AFXDPSwitchState(AFXDPThreadVars *ptv, int state)
425 {
426  ptv->afxdp_state = state;
427 }
428 
429 static TmEcode OpenXSKSocket(AFXDPThreadVars *ptv)
430 {
431  int ret;
432 
433  SCMutexLock(&xsk_protect.queue_protect);
434 
435  if (AFXDPAssignQueueID(ptv) != TM_ECODE_OK) {
436  SCLogError("Failed to assign queue ID");
438  }
439 
440  if ((ret = xsk_socket__create(&ptv->xsk.xsk, ptv->livedev->dev, ptv->xsk.queue.queue_num,
441  ptv->umem.umem, &ptv->xsk.rx, &ptv->xsk.tx, &ptv->xsk.cfg))) {
442  SCLogError("Failed to create socket: %s", strerror(-ret));
444  }
445  SCLogDebug("bind to %s on queue %u", ptv->iface, ptv->xsk.queue.queue_num);
446 
447  /* For polling and socket options */
448  ptv->xsk.fd.fd = xsk_socket__fd(ptv->xsk.xsk);
449  ptv->xsk.fd.events = POLLIN;
450 
451  /* Set state */
452  AFXDPSwitchState(ptv, AFXDP_STATE_UP);
453 
454  SCMutexUnlock(&xsk_protect.queue_protect);
456 }
457 
458 static void AFXDPCloseSocket(AFXDPThreadVars *ptv)
459 {
460  if (ptv->xsk.xsk) {
461  xsk_socket__delete(ptv->xsk.xsk);
462  ptv->xsk.xsk = NULL;
463  }
464 
465  if (ptv->umem.umem) {
466  xsk_umem__delete(ptv->umem.umem);
467  ptv->umem.umem = NULL;
468  }
469 
470  memset(&ptv->umem.fq, 0, sizeof(struct xsk_ring_prod));
471  memset(&ptv->umem.cq, 0, sizeof(struct xsk_ring_cons));
472 }
473 
474 static TmEcode AFXDPSocketCreation(AFXDPThreadVars *ptv)
475 {
476  if (ConfigureXSKUmem(ptv) != TM_ECODE_OK) {
478  }
479 
480  if (InitFillRing(ptv, NUM_FRAMES * 2) != TM_ECODE_OK) {
482  }
483 
484  /* Open AF_XDP socket */
485  if (OpenXSKSocket(ptv) != TM_ECODE_OK) {
487  }
488 
489  if (ConfigureBusyPolling(ptv) != TM_ECODE_OK) {
490  SCLogWarning("Failed to configure busy polling"
491  " performance may be reduced.");
492  }
493 
494  /* Has the eBPF program successfully bound? */
495 #ifdef HAVE_BPF_XDP_QUERY_ID
496  if (bpf_xdp_query_id(ptv->ifindex, ptv->xsk.cfg.xdp_flags, &ptv->prog_id)) {
497  SCLogError("Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
499  }
500 #else
501  if (bpf_get_link_xdp_id(ptv->ifindex, &ptv->prog_id, ptv->xsk.cfg.xdp_flags)) {
502  SCLogError("Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
504  }
505 #endif
506 
508 }
509 
510 /**
511  * \brief Try to reopen AF_XDP socket
512  *
513  * \retval: TM_ECODE_OK in case of success
514  * TM_ECODE_FAILED if error occurs or a condition is not met.
515  */
516 static TmEcode AFXDPTryReopen(AFXDPThreadVars *ptv)
517 {
518  AFXDPCloseSocket(ptv);
519  usleep(RECONNECT_TIMEOUT);
520 
521  int if_flags = GetIfaceFlags(ptv->iface);
522  if (if_flags == -1) {
523  SCLogDebug("Couldn't get flags for interface '%s'", ptv->iface);
524  goto sock_err;
525  } else if ((if_flags & (IFF_UP | IFF_RUNNING)) == 0) {
526  SCLogDebug("Interface '%s' is down", ptv->iface);
527  goto sock_err;
528  }
529 
530  if (AFXDPSocketCreation(ptv) != TM_ECODE_OK) {
532  }
533 
534  SCLogInfo("Interface '%s' is back", ptv->iface);
536 
537 sock_err:
539 }
540 
541 /**
542  * \brief Write packet entry to the fill ring, freeing
543  * this slot for re/fill with inbound packet descriptor
544  * \param pointer to Packet
545  * \retval: None
546  */
547 static void AFXDPReleasePacket(Packet *p)
548 {
549  *xsk_ring_prod__fill_addr((struct xsk_ring_prod *)p->afxdp_v.fq, p->afxdp_v.fq_idx) =
550  p->afxdp_v.orig;
551 
553 }
554 
555 static inline int DumpStatsEverySecond(AFXDPThreadVars *ptv, time_t *last_dump)
556 {
557  int stats_dumped = 0;
558  time_t current_time = time(NULL);
559 
560  if (current_time != *last_dump) {
561  AFXDPDumpCounters(ptv);
562  *last_dump = current_time;
563  stats_dumped = 1;
564  }
565 
566  StatsSyncCountersIfSignalled(&ptv->tv->stats);
567 
568  return stats_dumped;
569 }
570 
571 static inline ssize_t WakeupSocket(void *data)
572 {
573  ssize_t res = 0;
574  AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
575 
576  /* Assuming kernel >= 5.11 in use if xdp_busy_poll is enabled */
577  if (ptv->xsk.enable_busy_poll || xsk_ring_prod__needs_wakeup(&ptv->umem.fq)) {
578  // cppcheck-suppress nullPointer
579  res = recvfrom(xsk_socket__fd(ptv->xsk.xsk), NULL, 0, MSG_DONTWAIT, NULL, NULL);
580  }
581 
582  return res;
583 }
584 
585 /**
586  * \brief Init function for ReceiveAFXDP.
587  *
588  * \param tv pointer to ThreadVars
589  * \param initdata pointer to the interface passed from the user
590  * \param data pointer gets populated with AFPThreadVars
591  *
592  * \todo Create a general AFP setup function.
593  */
594 static TmEcode ReceiveAFXDPThreadInit(ThreadVars *tv, const void *initdata, void **data)
595 {
596  SCEnter();
597 
598  AFXDPIfaceConfig *afxdpconfig = (AFXDPIfaceConfig *)initdata;
599 
600  if (initdata == NULL) {
601  SCLogError("initdata == NULL");
603  }
604 
605  AFXDPThreadVars *ptv = SCCalloc(1, sizeof(AFXDPThreadVars));
606  if (unlikely(ptv == NULL)) {
607  afxdpconfig->DerefFunc(afxdpconfig);
609  }
610 
611  ptv->tv = tv;
612 
613  strlcpy(ptv->iface, afxdpconfig->iface, AFXDP_IFACE_NAME_LENGTH);
614  ptv->iface[AFXDP_IFACE_NAME_LENGTH - 1] = '\0';
615  ptv->ifindex = if_nametoindex(ptv->iface);
616 
617  ptv->livedev = LiveGetDevice(ptv->iface);
618  if (ptv->livedev == NULL) {
619  SCLogError("Unable to find Live device");
620  SCFree(ptv);
622  }
623 
624  ptv->promisc = afxdpconfig->promisc;
625  if (ptv->promisc != 0) {
626  /* Force promiscuous mode */
627  if (SetIfaceFlags(ptv->iface, IFF_PROMISC | IFF_UP) != 0) {
628  SCLogError("Failed to switch interface (%s) to promiscuous, error %s", ptv->iface,
629  strerror(errno));
630  SCFree(ptv);
632  }
633  }
634 
635  ptv->threads = afxdpconfig->threads;
636 
637  /* Socket configuration */
638  ptv->xsk.cfg.rx_size = NUM_FRAMES_CONS;
639  ptv->xsk.cfg.tx_size = NUM_FRAMES_PROD;
640  ptv->xsk.cfg.xdp_flags = afxdpconfig->mode;
641  ptv->xsk.cfg.bind_flags = afxdpconfig->bind_flags;
642 
643  /* UMEM configuration */
644  ptv->umem.cfg.fill_size = NUM_FRAMES_PROD * 2;
645  ptv->umem.cfg.comp_size = NUM_FRAMES_CONS;
646  ptv->umem.cfg.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE;
647  ptv->umem.cfg.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM;
648  ptv->umem.cfg.flags = afxdpconfig->mem_alignment;
649 
650  /* Use hugepages if unaligned chunk mode */
651  if (ptv->umem.cfg.flags == XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
652  ptv->umem.mmap_alignment_flag = MAP_HUGETLB;
653  }
654 
655  /* Busy polling configuration */
656  ptv->xsk.enable_busy_poll = afxdpconfig->enable_busy_poll;
657  ptv->xsk.busy_poll_budget = afxdpconfig->busy_poll_budget;
658  ptv->xsk.busy_poll_time = afxdpconfig->busy_poll_time;
659  ptv->gro_flush_timeout = afxdpconfig->gro_flush_timeout;
660  ptv->napi_defer_hard_irqs = afxdpconfig->napi_defer_hard_irqs;
661 
662  /* Stats registration */
663  ptv->capture_afxdp_packets = StatsRegisterCounter("capture.afxdp_packets", &ptv->tv->stats);
664  ptv->capture_kernel_drops = StatsRegisterCounter("capture.kernel_drops", &ptv->tv->stats);
665  ptv->capture_afxdp_poll = StatsRegisterCounter("capture.afxdp.poll", &ptv->tv->stats);
666  ptv->capture_afxdp_poll_timeout =
667  StatsRegisterCounter("capture.afxdp.poll_timeout", &ptv->tv->stats);
668  ptv->capture_afxdp_poll_failed =
669  StatsRegisterCounter("capture.afxdp.poll_failed", &ptv->tv->stats);
670  ptv->capture_afxdp_empty_reads =
671  StatsRegisterCounter("capture.afxdp.empty_reads", &ptv->tv->stats);
672  ptv->capture_afxdp_failed_reads =
673  StatsRegisterCounter("capture.afxdp.failed_reads", &ptv->tv->stats);
674  ptv->capture_afxdp_acquire_pkt_failed =
675  StatsRegisterCounter("capture.afxdp.acquire_pkt_failed", &ptv->tv->stats);
676 
677  /* Reserve memory for umem */
678  if (AcquireBuffer(ptv) != TM_ECODE_OK) {
679  SCFree(ptv);
681  }
682 
683  if (AFXDPSocketCreation(ptv) != TM_ECODE_OK) {
684  ReceiveAFXDPThreadDeinit(tv, ptv);
686  }
687 
688  *data = (void *)ptv;
689  afxdpconfig->DerefFunc(afxdpconfig);
691 }
692 
693 /**
694  * \brief Main AF_XDP reading Loop function
695  */
696 static TmEcode ReceiveAFXDPLoop(ThreadVars *tv, void *data, void *slot)
697 {
698  SCEnter();
699 
700  Packet *p;
701  time_t last_dump = 0;
702  struct timeval ts;
703  uint32_t idx_rx = 0, idx_fq = 0, rcvd;
704  int r;
705  AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
706  TmSlot *s = (TmSlot *)slot;
707 
708  ptv->slot = s->slot_next;
709 
710  AFXDPAllThreadsRunning(ptv);
711 
712  // Indicate that the thread is actually running its application level code (i.e., it can poll
713  // packets)
715 
716  PacketPoolWait();
717  while (1) {
718  /* Start by checking the state of our interface */
719  if (unlikely(ptv->afxdp_state == AFXDP_STATE_DOWN)) {
720  do {
721  usleep(RECONNECT_TIMEOUT);
722  if (unlikely(suricata_ctl_flags != 0)) {
723  break;
724  }
725  r = AFXDPTryReopen(ptv);
726  } while (r != TM_ECODE_OK);
727  }
728 
729  if (unlikely(suricata_ctl_flags != 0)) {
730  SCLogDebug("Stopping Suricata!");
731  AFXDPDumpCounters(ptv);
732  break;
733  }
734 
735  /* Busy polling is not set, using poll() to maintain (relatively) decent
736  * performance. xdp_busy_poll must be disabled for kernels < 5.11
737  */
738  if (!ptv->xsk.enable_busy_poll) {
739  StatsCounterIncr(&ptv->tv->stats, ptv->capture_afxdp_poll);
740 
741  r = poll(&ptv->xsk.fd, 1, POLL_TIMEOUT);
742 
743  /* Report poll results */
744  if (r <= 0) {
745  if (r == 0) {
746  StatsCounterIncr(&ptv->tv->stats, ptv->capture_afxdp_poll_timeout);
747  } else if (r < 0) {
748  StatsCounterIncr(&ptv->tv->stats, ptv->capture_afxdp_poll_failed);
749  SCLogWarning("poll failed with retval %d", r);
750  AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
751  }
752 
753  DumpStatsEverySecond(ptv, &last_dump);
754  continue;
755  }
756  }
757 
758  rcvd = xsk_ring_cons__peek(&ptv->xsk.rx, ptv->xsk.busy_poll_budget, &idx_rx);
759  if (!rcvd) {
760  StatsCounterIncr(&ptv->tv->stats, ptv->capture_afxdp_empty_reads);
761  ssize_t ret = WakeupSocket(ptv);
762  if (ret < 0) {
763  SCLogWarning("recv failed with retval %ld", ret);
764  AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
765  }
766  DumpStatsEverySecond(ptv, &last_dump);
767  continue;
768  }
769 
770  uint32_t res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
771  while (res != rcvd) {
772  StatsCounterIncr(&ptv->tv->stats, ptv->capture_afxdp_failed_reads);
773  ssize_t ret = WakeupSocket(ptv);
774  if (ret < 0) {
775  SCLogWarning("recv failed with retval %ld", ret);
776  AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
777  continue;
778  }
779  res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
780  }
781 
782  gettimeofday(&ts, NULL);
783  ptv->pkts += rcvd;
784  for (uint32_t i = 0; i < rcvd; i++) {
786  if (unlikely(p == NULL)) {
787  StatsCounterIncr(&ptv->tv->stats, ptv->capture_afxdp_acquire_pkt_failed);
788  continue;
789  }
790 
793  p->livedev = ptv->livedev;
794  p->ReleasePacket = AFXDPReleasePacket;
796 
797  p->ts = SCTIME_FROM_TIMEVAL(&ts);
798 
799  uint64_t addr = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx)->addr;
800  uint32_t len = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx++)->len;
801  uint64_t orig = xsk_umem__extract_addr(addr);
802  addr = xsk_umem__add_offset_to_addr(addr);
803 
804  uint8_t *pkt_data = xsk_umem__get_data(ptv->umem.buf, addr);
805 
806  ptv->bytes += len;
807 
808  p->afxdp_v.fq_idx = idx_fq++;
809  p->afxdp_v.orig = orig;
810  p->afxdp_v.fq = &ptv->umem.fq;
811 
812  PacketSetData(p, pkt_data, len);
813 
814  if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
815  TmqhOutputPacketpool(ptv->tv, p);
816  SCReturnInt(EXIT_FAILURE);
817  }
818  }
819 
820  xsk_ring_prod__submit(&ptv->umem.fq, rcvd);
821  xsk_ring_cons__release(&ptv->xsk.rx, rcvd);
822 
823  /* Trigger one dump of stats every second */
824  DumpStatsEverySecond(ptv, &last_dump);
825  }
826 
828 }
829 
830 /**
831  * \brief function to unload an AF_XDP program
832  *
833  */
834 static void RunModeAFXDPRemoveProg(char *iface_name)
835 {
836  unsigned int ifindex = if_nametoindex(iface_name);
837 
838  struct xdp_multiprog *progs = xdp_multiprog__get_from_ifindex(ifindex);
839  if (progs == NULL) {
840  return;
841  }
842  enum xdp_attach_mode mode = xdp_multiprog__attach_mode(progs);
843 
844  struct xdp_program *prog = NULL;
845 
846  // loop through the multiprogram struct, removing all the programs
847  for (prog = xdp_multiprog__next_prog(NULL, progs); prog;
848  prog = xdp_multiprog__next_prog(prog, progs)) {
849  int ret = xdp_program__detach(prog, ifindex, mode, 0);
850  if (ret) {
851  SCLogDebug("Error: cannot detatch XDP program: %s\n", strerror(errno));
852  }
853  }
854 
855  prog = xdp_multiprog__main_prog(progs);
856  if (xdp_program__is_attached(prog, ifindex) != XDP_MODE_UNSPEC) {
857  int ret = xdp_program__detach(prog, ifindex, mode, 0);
858  if (ret) {
859  SCLogDebug("Error: cannot detatch XDP program: %s\n", strerror(errno));
860  }
861  }
862 }
863 
864 /**
865  * \brief DeInit function closes af-xdp socket at exit.
866  * \param tv pointer to ThreadVars
867  * \param data pointer that gets cast into AFXDPPThreadVars for ptv
868  */
869 static SCMutex sync_deinit = SCMUTEX_INITIALIZER;
870 
871 static TmEcode ReceiveAFXDPThreadDeinit(ThreadVars *tv, void *data)
872 {
873  AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
874 
875  /*
876  * If AF_XDP is enabled, the program must be detached before the AF_XDP sockets
877  * are closed to mitigate a bug that causes an IO_PAGEFAULT in linux kernel
878  * version 5.19, unknown as of now what other versions this affects.
879  */
880  SCMutexLock(&sync_deinit);
881  RunModeAFXDPRemoveProg(ptv->iface);
882  SCMutexUnlock(&sync_deinit);
883 
884  if (ptv->xsk.xsk) {
885  xsk_socket__delete(ptv->xsk.xsk);
886  ptv->xsk.xsk = NULL;
887  }
888 
889  if (ptv->umem.umem) {
890  xsk_umem__delete(ptv->umem.umem);
891  ptv->umem.umem = NULL;
892  }
893  munmap(ptv->umem.buf, MEM_BYTES);
894 
895  SCFree(ptv);
897 }
898 
899 /**
900  * \brief This function prints stats to the screen at exit.
901  * \param tv pointer to ThreadVars
902  * \param data pointer that gets cast into AFXDPThreadVars for ptv
903  */
904 static void ReceiveAFXDPThreadExitStats(ThreadVars *tv, void *data)
905 {
906  SCEnter();
907  AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
908 
909  AFXDPDumpCounters(ptv);
910 
911  SCLogPerf("(%s) Kernel: Packets %" PRIu64 ", bytes %" PRIu64 ", dropped %" PRIu64 "", tv->name,
912  StatsCounterGetLocalValue(&tv->stats, ptv->capture_afxdp_packets), ptv->bytes,
913  StatsCounterGetLocalValue(&tv->stats, ptv->capture_kernel_drops));
914 }
915 
916 /**
917  * \brief This function passes off to link type decoders.
918  *
919  * DecodeAFXDP decodes packets from AF_XDP and passes
920  * them off to the proper link type decoder.
921  *
922  * \param t pointer to ThreadVars
923  * \param p pointer to the current packet
924  * \param data pointer that gets cast into AFXDPThreadVars for ptv
925  */
926 static TmEcode DecodeAFXDP(ThreadVars *tv, Packet *p, void *data)
927 {
928  SCEnter();
929 
931 
933 
934  /* update counters */
936 
937  /* If suri has set vlan during reading, we increase vlan counter */
938  if (p->vlan_idx) {
940  }
941 
942  /* call the decoder */
943  DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
944 
946 
948 }
949 
950 static TmEcode DecodeAFXDPThreadInit(ThreadVars *tv, const void *initdata, void **data)
951 {
952  SCEnter();
954  if (dtv == NULL)
956 
958 
959  *data = (void *)dtv;
960 
962 }
963 
964 static TmEcode DecodeAFXDPThreadDeinit(ThreadVars *tv, void *data)
965 {
966  if (data != NULL)
967  DecodeThreadVarsFree(tv, data);
969 }
970 
971 #endif /* HAVE_AF_XDP */
972 /* eof */
973 /**
974  * @}
975  */
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:77
util-device-private.h
tm-threads.h
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
PacketFreeOrRelease
void PacketFreeOrRelease(Packet *p)
Return a packet to where it was allocated.
Definition: decode.c:280
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(StatsThreadContext *stats)
Definition: counters.c:483
AFXDPIfaceConfig::mode
uint32_t mode
Definition: source-af-xdp.h:36
PKT_IS_PSEUDOPKT
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition: decode.h:1323
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
AFXDPIfaceConfig::iface
char iface[AFXDP_IFACE_NAME_LENGTH]
Definition: source-af-xdp.h:30
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:279
StatsRegisterCounter
StatsCounterId StatsRegisterCounter(const char *name, StatsThreadContext *stats)
Registers a normal, unqualified counter.
Definition: counters.c:1001
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:101
AFXDPIfaceConfig::mem_alignment
int mem_alignment
Definition: source-af-xdp.h:38
AFXDPIfaceConfig::gro_flush_timeout
uint32_t gro_flush_timeout
Definition: source-af-xdp.h:42
AFXDPIfaceConfig::DerefFunc
void(* DerefFunc)(void *)
Definition: source-af-xdp.h:46
util-checksum.h
Packet_::flags
uint32_t flags
Definition: decode.h:544
threads.h
TMM_RECEIVEAFXDP
@ TMM_RECEIVEAFXDP
Definition: tm-threads-common.h:53
Packet_::vlan_idx
uint8_t vlan_idx
Definition: decode.h:529
LiveDevice_
Definition: util-device-private.h:32
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
packet-queue.h
SCKernelVersionIsAtLeast
int SCKernelVersionIsAtLeast(int major, int minor)
Definition: util-host-info.c:37
tm-threads-common.h
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
tm-modules.h
AFXDP_IFACE_NAME_LENGTH
#define AFXDP_IFACE_NAME_LENGTH
Definition: source-af-xdp.h:27
util-privs.h
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:122
StatsCounterId
Definition: counters.h:30
PacketDecodeFinalize
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition: decode.c:236
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:305
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:82
AFXDPIfaceConfig::napi_defer_hard_irqs
uint32_t napi_defer_hard_irqs
Definition: source-af-xdp.h:43
tmqh-packetpool.h
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:58
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:81
NoAFXDPSupportExit
TmEcode NoAFXDPSupportExit(ThreadVars *, const void *, void **)
this function prints an error message and exits.
Definition: source-af-xdp.c:105
AFXDPQueueProtectionInit
TmEcode AFXDPQueueProtectionInit(void)
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:53
Packet_::datalink
int datalink
Definition: decode.h:639
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1327
DecodeRegisterPerfCounters
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:632
decode.h
util-sysfs.h
util-debug.h
PKT_SRC_WIRE
@ PKT_SRC_WIRE
Definition: decode.h:52
util-error.h
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:61
AFXDPIfaceConfig::enable_busy_poll
bool enable_busy_poll
Definition: source-af-xdp.h:39
util-cpu.h
SysFsWriteValue
TmEcode SysFsWriteValue(const char *path, int64_t value)
Definition: util-sysfs.c:28
Packet_::ts
SCTime_t ts
Definition: decode.h:555
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:120
LiveGetDevice
LiveDevice * LiveGetDevice(const char *name)
Get a pointer to the device at idx.
Definition: util-device.c:268
SCEnter
#define SCEnter(...)
Definition: util-debug.h:281
GET_PKT_DATA
#define GET_PKT_DATA(p)
Definition: decode.h:209
util-ebpf.h
AFXDPIfaceConfig::busy_poll_budget
uint32_t busy_poll_budget
Definition: source-af-xdp.h:41
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:79
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:56
TMM_DECODEAFXDP
@ TMM_DECODEAFXDP
Definition: tm-threads-common.h:55
AFXDPIfaceConfig::promisc
int promisc
Definition: source-af-xdp.h:33
StatsCounterIncr
void StatsCounterIncr(StatsThreadContext *stats, StatsCounterId id)
Increments the local counter.
Definition: counters.c:165
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:259
AFXDPIfaceConfig::bind_flags
uint32_t bind_flags
Definition: source-af-xdp.h:37
SC_ATOMIC_DECLARE
#define SC_ATOMIC_DECLARE(type, name)
wrapper for declaring atomic variables.
Definition: util-atomic.h:280
PacketPoolWait
void PacketPoolWait(void)
Definition: tmqh-packetpool.c:71
Packet_
Definition: decode.h:501
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:29
GET_PKT_LEN
#define GET_PKT_LEN(p)
Definition: decode.h:208
AFXDPIfaceConfig::threads
uint16_t threads
Definition: source-af-xdp.h:32
AFXDPIfaceConfig::busy_poll_time
uint32_t busy_poll_time
Definition: source-af-xdp.h:40
conf.h
TmSlot_
Definition: tm-threads.h:53
PKT_IGNORE_CHECKSUM
#define PKT_IGNORE_CHECKSUM
Definition: decode.h:1284
Packet_::livedev
struct LiveDevice_ * livedev
Definition: decode.h:618
TmEcode
TmEcode
Definition: tm-threads-common.h:80
util-host-info.h
TmModule_::name
const char * name
Definition: tm-modules.h:48
runmodes.h
SCLogInfo
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:229
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
dtv
DecodeThreadVars * dtv
Definition: fuzz_decodepcapfile.c:33
TmModuleDecodeAFXDPRegister
void TmModuleDecodeAFXDPRegister(void)
Registration Function for DecodeAFXDP.
Definition: source-af-xdp.c:91
tm-queuehandlers.h
Packet_::ReleasePacket
void(* ReleasePacket)(struct Packet_ *)
Definition: decode.h:591
flow-storage.h
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
SYSFS_MAX_FILENAME_SIZE
#define SYSFS_MAX_FILENAME_SIZE
Definition: util-sysfs.h:32
DecodeThreadVarsFree
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:834
suricata-common.h
source-af-xdp.h
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:238
StatsCounterGetLocalValue
int64_t StatsCounterGetLocalValue(StatsThreadContext *stats, StatsCounterId id)
Get the value of the local copy of the counter that hold this id.
Definition: counters.c:1301
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:51
TmModuleReceiveAFXDPRegister
void TmModuleReceiveAFXDPRegister(void)
Definition: source-af-xdp.c:77
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:52
threadvars.h
util-validate.h
AFXDPIfaceConfig
Definition: source-af-xdp.h:29
POLL_TIMEOUT
#define POLL_TIMEOUT
Definition: source-af-packet.c:174
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:271
SCFree
#define SCFree(p)
Definition: util-mem.h:61
DecodeThreadVars_
Structure to hold thread specific data for all decode modules.
Definition: decode.h:963
util-ioctl.h
DecodeThreadVarsAlloc
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:816
PacketSetData
int PacketSetData(Packet *p, const uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zero copy is used.
Definition: decode.c:854
suricata.h
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
DecodeThreadVars_::counter_vlan
StatsCounterId counter_vlan
Definition: decode.h:1001
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
ThreadVars_::stats
StatsThreadContext stats
Definition: threadvars.h:121
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:285
StatsCounterAddI64
void StatsCounterAddI64(StatsThreadContext *stats, StatsCounterId id, int64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:146
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:297
SC_CAP_NET_RAW
#define SC_CAP_NET_RAW
Definition: util-privs.h:32
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:102
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:80
DecodeUpdatePacketCounters
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition: decode.c:783
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:175