32 #define SC_PCAP_DONT_INCLUDE_PCAP_H 1
64 #include <bpf/libbpf.h>
66 #include <xdp/libxdp.h>
69 #if HAVE_LINUX_IF_ETHER_H
70 #include <linux/if_ether.h>
107 SCLogError(
"Error creating thread %s: you do not have "
108 "support for AF_XDP enabled, on Linux host please recompile "
109 "with --enable-af-xdp",
116 #define POLL_TIMEOUT 100
117 #define NUM_FRAMES_PROD XSK_RING_PROD__DEFAULT_NUM_DESCS
118 #define NUM_FRAMES_CONS XSK_RING_CONS__DEFAULT_NUM_DESCS
119 #define NUM_FRAMES NUM_FRAMES_PROD
120 #define FRAME_SIZE XSK_UMEM__DEFAULT_FRAME_SIZE
121 #define MEM_BYTES (NUM_FRAMES * FRAME_SIZE * 2)
122 #define RECONNECT_TIMEOUT 500000
125 enum state { AFXDP_STATE_DOWN, AFXDP_STATE_UP };
127 struct XskInitProtect {
134 struct xsk_umem *umem;
135 struct xsk_ring_prod fq;
136 struct xsk_ring_cons cq;
137 struct xsk_umem_config cfg;
138 int mmap_alignment_flag;
141 struct QueueAssignment {
147 struct xsk_ring_cons rx;
148 struct xsk_ring_prod tx;
149 struct xsk_socket *xsk;
152 struct QueueAssignment queue;
155 struct xsk_socket_config cfg;
156 bool enable_busy_poll;
157 uint32_t busy_poll_time;
158 uint32_t busy_poll_budget;
166 typedef struct AFXDPThreadVars_ {
179 struct UmemInfo umem;
180 struct XskSockInfo xsk;
181 uint32_t gro_flush_timeout;
182 uint32_t napi_defer_hard_irqs;
202 static void ReceiveAFXDPThreadExitStats(
ThreadVars *,
void *);
242 static inline void AFXDPDumpCounters(AFXDPThreadVars *ptv)
244 struct xdp_statistics stats;
245 socklen_t
len =
sizeof(
struct xdp_statistics);
246 int fd = xsk_socket__fd(ptv->xsk.xsk);
248 if (getsockopt(fd, SOL_XDP, XDP_STATISTICS, &stats, &
len) >= 0) {
249 uint64_t rx_dropped = stats.rx_dropped + stats.rx_invalid_descs + stats.rx_ring_full;
258 SCLogDebug(
"(%s) Kernel: Packets %" PRIu64
", bytes %" PRIu64
", dropped %" PRIu64
"",
286 static TmEcode AFXDPAssignQueueID(AFXDPThreadVars *ptv)
288 if (!ptv->xsk.queue.assigned) {
289 ptv->xsk.queue.queue_num =
SC_ATOMIC_GET(xsk_protect.queue_num);
293 ptv->xsk.queue.assigned =
true;
298 static void AFXDPAllThreadsRunning(AFXDPThreadVars *ptv)
301 if ((ptv->threads - 1) == (
int)ptv->xsk.queue.queue_num) {
302 SCLogDebug(
"All AF_XDP capture threads are running.");
307 static TmEcode AcquireBuffer(AFXDPThreadVars *ptv)
309 int mmap_flags = MAP_PRIVATE | MAP_ANONYMOUS | ptv->umem.mmap_alignment_flag;
310 ptv->umem.buf = mmap(NULL, MEM_BYTES, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
312 if (ptv->umem.buf == MAP_FAILED) {
320 static TmEcode ConfigureXSKUmem(AFXDPThreadVars *ptv)
322 if (xsk_umem__create(&ptv->umem.umem, ptv->umem.buf, MEM_BYTES, &ptv->umem.fq, &ptv->umem.cq,
324 SCLogError(
"failed to create umem: %s", strerror(errno));
331 static TmEcode InitFillRing(AFXDPThreadVars *ptv,
const uint32_t
cnt)
335 uint32_t ret = xsk_ring_prod__reserve(&ptv->umem.fq,
cnt, &idx_fq);
337 SCLogError(
"Failed to initialise the fill ring.");
341 for (uint32_t i = 0; i <
cnt; i++) {
342 *xsk_ring_prod__fill_addr(&ptv->umem.fq, idx_fq++) = i * FRAME_SIZE;
345 xsk_ring_prod__submit(&ptv->umem.fq,
cnt);
354 static TmEcode WriteLinuxTunables(AFXDPThreadVars *ptv)
379 static TmEcode ConfigureBusyPolling(AFXDPThreadVars *ptv)
381 if (!ptv->xsk.enable_busy_poll) {
389 SCLogWarning(
"Kernel version older than required: v5.11,"
390 " upgrade kernel version to use 'enable-busy-poll' option.");
394 #if defined SO_PREFER_BUSY_POLL && defined SO_BUSY_POLL && defined SO_BUSY_POLL_BUDGET
395 const int fd = xsk_socket__fd(ptv->xsk.xsk);
402 if (setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL, (
void *)&sock_opt,
sizeof(sock_opt)) < 0) {
406 sock_opt = ptv->xsk.busy_poll_time;
407 if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (
void *)&sock_opt,
sizeof(sock_opt)) < 0) {
411 sock_opt = ptv->xsk.busy_poll_budget;
412 if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL_BUDGET, (
void *)&sock_opt,
sizeof(sock_opt)) < 0) {
419 "Kernel does not support busy poll, upgrade kernel or disable \"enable-busy-poll\".");
424 static void AFXDPSwitchState(AFXDPThreadVars *ptv,
int state)
426 ptv->afxdp_state = state;
429 static TmEcode OpenXSKSocket(AFXDPThreadVars *ptv)
440 if ((ret = xsk_socket__create(&ptv->xsk.xsk, ptv->livedev->dev, ptv->xsk.queue.queue_num,
441 ptv->umem.umem, &ptv->xsk.rx, &ptv->xsk.tx, &ptv->xsk.cfg))) {
442 SCLogError(
"Failed to create socket: %s", strerror(-ret));
445 SCLogDebug(
"bind to %s on queue %u", ptv->iface, ptv->xsk.queue.queue_num);
448 ptv->xsk.fd.fd = xsk_socket__fd(ptv->xsk.xsk);
449 ptv->xsk.fd.events = POLLIN;
452 AFXDPSwitchState(ptv, AFXDP_STATE_UP);
458 static void AFXDPCloseSocket(AFXDPThreadVars *ptv)
461 xsk_socket__delete(ptv->xsk.xsk);
465 if (ptv->umem.umem) {
466 xsk_umem__delete(ptv->umem.umem);
467 ptv->umem.umem = NULL;
470 memset(&ptv->umem.fq, 0,
sizeof(
struct xsk_ring_prod));
471 memset(&ptv->umem.cq, 0,
sizeof(
struct xsk_ring_cons));
474 static TmEcode AFXDPSocketCreation(AFXDPThreadVars *ptv)
480 if (InitFillRing(ptv, NUM_FRAMES * 2) !=
TM_ECODE_OK) {
491 " performance may be reduced.");
495 #ifdef HAVE_BPF_XDP_QUERY_ID
496 if (bpf_xdp_query_id(ptv->ifindex, ptv->xsk.cfg.xdp_flags, &ptv->prog_id)) {
497 SCLogError(
"Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
501 if (bpf_get_link_xdp_id(ptv->ifindex, &ptv->prog_id, ptv->xsk.cfg.xdp_flags)) {
502 SCLogError(
"Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
516 static TmEcode AFXDPTryReopen(AFXDPThreadVars *ptv)
518 AFXDPCloseSocket(ptv);
519 usleep(RECONNECT_TIMEOUT);
521 int if_flags = GetIfaceFlags(ptv->iface);
522 if (if_flags == -1) {
523 SCLogDebug(
"Couldn't get flags for interface '%s'", ptv->iface);
525 }
else if ((if_flags & (IFF_UP | IFF_RUNNING)) == 0) {
526 SCLogDebug(
"Interface '%s' is down", ptv->iface);
534 SCLogInfo(
"Interface '%s' is back", ptv->iface);
547 static void AFXDPReleasePacket(
Packet *p)
549 *xsk_ring_prod__fill_addr((
struct xsk_ring_prod *)p->afxdp_v.fq, p->afxdp_v.fq_idx) =
555 static inline int DumpStatsEverySecond(AFXDPThreadVars *ptv, time_t *last_dump)
557 int stats_dumped = 0;
558 time_t current_time = time(NULL);
560 if (current_time != *last_dump) {
561 AFXDPDumpCounters(ptv);
562 *last_dump = current_time;
571 static inline ssize_t WakeupSocket(
void *data)
574 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
577 if (ptv->xsk.enable_busy_poll || xsk_ring_prod__needs_wakeup(&ptv->umem.fq)) {
579 res = recvfrom(xsk_socket__fd(ptv->xsk.xsk), NULL, 0, MSG_DONTWAIT, NULL, NULL);
600 if (initdata == NULL) {
605 AFXDPThreadVars *ptv =
SCCalloc(1,
sizeof(AFXDPThreadVars));
615 ptv->ifindex = if_nametoindex(ptv->iface);
618 if (ptv->livedev == NULL) {
624 ptv->promisc = afxdpconfig->
promisc;
625 if (ptv->promisc != 0) {
627 if (SetIfaceFlags(ptv->iface, IFF_PROMISC | IFF_UP) != 0) {
628 SCLogError(
"Failed to switch interface (%s) to promiscuous, error %s", ptv->iface,
635 ptv->threads = afxdpconfig->
threads;
638 ptv->xsk.cfg.rx_size = NUM_FRAMES_CONS;
639 ptv->xsk.cfg.tx_size = NUM_FRAMES_PROD;
640 ptv->xsk.cfg.xdp_flags = afxdpconfig->
mode;
641 ptv->xsk.cfg.bind_flags = afxdpconfig->
bind_flags;
644 ptv->umem.cfg.fill_size = NUM_FRAMES_PROD * 2;
645 ptv->umem.cfg.comp_size = NUM_FRAMES_CONS;
646 ptv->umem.cfg.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE;
647 ptv->umem.cfg.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM;
651 if (ptv->umem.cfg.flags == XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
652 ptv->umem.mmap_alignment_flag = MAP_HUGETLB;
666 ptv->capture_afxdp_poll_timeout =
668 ptv->capture_afxdp_poll_failed =
670 ptv->capture_afxdp_empty_reads =
672 ptv->capture_afxdp_failed_reads =
674 ptv->capture_afxdp_acquire_pkt_failed =
684 ReceiveAFXDPThreadDeinit(
tv, ptv);
701 time_t last_dump = 0;
703 uint32_t idx_rx = 0, idx_fq = 0, rcvd;
705 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
710 AFXDPAllThreadsRunning(ptv);
719 if (
unlikely(ptv->afxdp_state == AFXDP_STATE_DOWN)) {
721 usleep(RECONNECT_TIMEOUT);
725 r = AFXDPTryReopen(ptv);
731 AFXDPDumpCounters(ptv);
738 if (!ptv->xsk.enable_busy_poll) {
750 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
753 DumpStatsEverySecond(ptv, &last_dump);
758 rcvd = xsk_ring_cons__peek(&ptv->xsk.rx, ptv->xsk.busy_poll_budget, &idx_rx);
761 ssize_t ret = WakeupSocket(ptv);
764 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
766 DumpStatsEverySecond(ptv, &last_dump);
770 uint32_t res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
771 while (res != rcvd) {
773 ssize_t ret = WakeupSocket(ptv);
776 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
779 res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
782 gettimeofday(&
ts, NULL);
784 for (uint32_t i = 0; i < rcvd; i++) {
799 uint64_t addr = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx)->addr;
800 uint32_t
len = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx++)->len;
801 uint64_t orig = xsk_umem__extract_addr(addr);
802 addr = xsk_umem__add_offset_to_addr(addr);
804 uint8_t *pkt_data = xsk_umem__get_data(ptv->umem.buf, addr);
808 p->afxdp_v.fq_idx = idx_fq++;
809 p->afxdp_v.orig = orig;
810 p->afxdp_v.fq = &ptv->umem.fq;
814 if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) !=
TM_ECODE_OK) {
820 xsk_ring_prod__submit(&ptv->umem.fq, rcvd);
821 xsk_ring_cons__release(&ptv->xsk.rx, rcvd);
824 DumpStatsEverySecond(ptv, &last_dump);
834 static void RunModeAFXDPRemoveProg(
char *iface_name)
836 unsigned int ifindex = if_nametoindex(iface_name);
838 struct xdp_multiprog *progs = xdp_multiprog__get_from_ifindex(ifindex);
842 enum xdp_attach_mode mode = xdp_multiprog__attach_mode(progs);
844 struct xdp_program *prog = NULL;
847 for (prog = xdp_multiprog__next_prog(NULL, progs); prog;
848 prog = xdp_multiprog__next_prog(prog, progs)) {
849 int ret = xdp_program__detach(prog, ifindex, mode, 0);
851 SCLogDebug(
"Error: cannot detatch XDP program: %s\n", strerror(errno));
855 prog = xdp_multiprog__main_prog(progs);
856 if (xdp_program__is_attached(prog, ifindex) != XDP_MODE_UNSPEC) {
857 int ret = xdp_program__detach(prog, ifindex, mode, 0);
859 SCLogDebug(
"Error: cannot detatch XDP program: %s\n", strerror(errno));
873 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
881 RunModeAFXDPRemoveProg(ptv->iface);
885 xsk_socket__delete(ptv->xsk.xsk);
889 if (ptv->umem.umem) {
890 xsk_umem__delete(ptv->umem.umem);
891 ptv->umem.umem = NULL;
893 munmap(ptv->umem.buf, MEM_BYTES);
904 static void ReceiveAFXDPThreadExitStats(
ThreadVars *
tv,
void *data)
907 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
909 AFXDPDumpCounters(ptv);
911 SCLogPerf(
"(%s) Kernel: Packets %" PRIu64
", bytes %" PRIu64
", dropped %" PRIu64
"",
tv->
name,