32 #define SC_PCAP_DONT_INCLUDE_PCAP_H 1
64 #include <bpf/libbpf.h>
66 #include <xdp/libxdp.h>
69 #if HAVE_LINUX_IF_ETHER_H
70 #include <linux/if_ether.h>
107 SCLogError(
"Error creating thread %s: you do not have "
108 "support for AF_XDP enabled, on Linux host please recompile "
109 "with --enable-af-xdp",
116 #define POLL_TIMEOUT 100
117 #define NUM_FRAMES_PROD XSK_RING_PROD__DEFAULT_NUM_DESCS
118 #define NUM_FRAMES_CONS XSK_RING_CONS__DEFAULT_NUM_DESCS
119 #define NUM_FRAMES NUM_FRAMES_PROD
120 #define FRAME_SIZE XSK_UMEM__DEFAULT_FRAME_SIZE
121 #define MEM_BYTES (NUM_FRAMES * FRAME_SIZE * 2)
122 #define RECONNECT_TIMEOUT 500000
125 enum state { AFXDP_STATE_DOWN, AFXDP_STATE_UP };
127 struct XskInitProtect {
134 struct xsk_umem *umem;
135 struct xsk_ring_prod fq;
136 struct xsk_ring_cons cq;
137 struct xsk_umem_config cfg;
138 int mmap_alignment_flag;
141 struct QueueAssignment {
147 struct xsk_ring_cons rx;
148 struct xsk_ring_prod tx;
149 struct xsk_socket *xsk;
152 struct QueueAssignment queue;
155 struct xsk_socket_config cfg;
156 bool enable_busy_poll;
157 uint32_t busy_poll_time;
158 uint32_t busy_poll_budget;
166 typedef struct AFXDPThreadVars_ {
179 struct UmemInfo umem;
180 struct XskSockInfo xsk;
181 uint32_t gro_flush_timeout;
182 uint32_t napi_defer_hard_irqs;
202 static void ReceiveAFXDPThreadExitStats(
ThreadVars *,
void *);
242 static inline void AFXDPDumpCounters(AFXDPThreadVars *ptv)
244 struct xdp_statistics stats;
245 socklen_t
len =
sizeof(
struct xdp_statistics);
246 int fd = xsk_socket__fd(ptv->xsk.xsk);
248 if (getsockopt(fd, SOL_XDP, XDP_STATISTICS, &stats, &
len) >= 0) {
249 uint64_t rx_dropped = stats.rx_dropped + stats.rx_invalid_descs + stats.rx_ring_full;
258 SCLogDebug(
"(%s) Kernel: Packets %" PRIu64
", bytes %" PRIu64
", dropped %" PRIu64
"",
286 static TmEcode AFXDPAssignQueueID(AFXDPThreadVars *ptv)
288 if (!ptv->xsk.queue.assigned) {
289 ptv->xsk.queue.queue_num =
SC_ATOMIC_GET(xsk_protect.queue_num);
293 ptv->xsk.queue.assigned =
true;
298 static void AFXDPAllThreadsRunning(AFXDPThreadVars *ptv)
301 if ((ptv->threads - 1) == (
int)ptv->xsk.queue.queue_num) {
302 SCLogDebug(
"All AF_XDP capture threads are running.");
307 static TmEcode AcquireBuffer(AFXDPThreadVars *ptv)
309 int mmap_flags = MAP_PRIVATE | MAP_ANONYMOUS | ptv->umem.mmap_alignment_flag;
310 ptv->umem.buf = mmap(NULL, MEM_BYTES, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
312 if (ptv->umem.buf == MAP_FAILED) {
320 static TmEcode ConfigureXSKUmem(AFXDPThreadVars *ptv)
322 if (xsk_umem__create(&ptv->umem.umem, ptv->umem.buf, MEM_BYTES, &ptv->umem.fq, &ptv->umem.cq,
324 SCLogError(
"failed to create umem: %s", strerror(errno));
331 static TmEcode InitFillRing(AFXDPThreadVars *ptv,
const uint32_t
cnt)
335 uint32_t ret = xsk_ring_prod__reserve(&ptv->umem.fq,
cnt, &idx_fq);
337 SCLogError(
"Failed to initialise the fill ring.");
341 for (uint32_t i = 0; i <
cnt; i++) {
342 *xsk_ring_prod__fill_addr(&ptv->umem.fq, idx_fq++) = i * FRAME_SIZE;
345 xsk_ring_prod__submit(&ptv->umem.fq,
cnt);
354 static TmEcode WriteLinuxTunables(AFXDPThreadVars *ptv)
379 static TmEcode ConfigureBusyPolling(AFXDPThreadVars *ptv)
381 if (!ptv->xsk.enable_busy_poll) {
389 SCLogWarning(
"Kernel version older than required: v5.11,"
390 " upgrade kernel version to use 'enable-busy-poll' option.");
394 #if defined SO_PREFER_BUSY_POLL && defined SO_BUSY_POLL && defined SO_BUSY_POLL_BUDGET
395 const int fd = xsk_socket__fd(ptv->xsk.xsk);
402 if (setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL, (
void *)&sock_opt,
sizeof(sock_opt)) < 0) {
406 sock_opt = ptv->xsk.busy_poll_time;
407 if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (
void *)&sock_opt,
sizeof(sock_opt)) < 0) {
411 sock_opt = ptv->xsk.busy_poll_budget;
412 if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL_BUDGET, (
void *)&sock_opt,
sizeof(sock_opt)) < 0) {
419 "Kernel does not support busy poll, upgrade kernel or disable \"enable-busy-poll\".");
424 static void AFXDPSwitchState(AFXDPThreadVars *ptv,
int state)
426 ptv->afxdp_state = state;
429 static TmEcode OpenXSKSocket(AFXDPThreadVars *ptv)
440 if ((ret = xsk_socket__create(&ptv->xsk.xsk, ptv->livedev->dev, ptv->xsk.queue.queue_num,
441 ptv->umem.umem, &ptv->xsk.rx, &ptv->xsk.tx, &ptv->xsk.cfg))) {
442 SCLogError(
"Failed to create socket: %s", strerror(-ret));
446 SCLogDebug(
"bind to %s on queue %u", ptv->iface, ptv->xsk.queue.queue_num);
449 ptv->xsk.fd.fd = xsk_socket__fd(ptv->xsk.xsk);
450 ptv->xsk.fd.events = POLLIN;
453 AFXDPSwitchState(ptv, AFXDP_STATE_UP);
459 static void AFXDPCloseSocket(AFXDPThreadVars *ptv)
462 xsk_socket__delete(ptv->xsk.xsk);
466 if (ptv->umem.umem) {
467 xsk_umem__delete(ptv->umem.umem);
468 ptv->umem.umem = NULL;
471 memset(&ptv->umem.fq, 0,
sizeof(
struct xsk_ring_prod));
472 memset(&ptv->umem.cq, 0,
sizeof(
struct xsk_ring_cons));
475 static TmEcode AFXDPSocketCreation(AFXDPThreadVars *ptv)
481 if (InitFillRing(ptv, NUM_FRAMES * 2) !=
TM_ECODE_OK) {
492 " performance may be reduced.");
496 #ifdef HAVE_BPF_XDP_QUERY_ID
497 if (bpf_xdp_query_id(ptv->ifindex, ptv->xsk.cfg.xdp_flags, &ptv->prog_id)) {
498 SCLogError(
"Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
502 if (bpf_get_link_xdp_id(ptv->ifindex, &ptv->prog_id, ptv->xsk.cfg.xdp_flags)) {
503 SCLogError(
"Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
517 static TmEcode AFXDPTryReopen(AFXDPThreadVars *ptv)
519 AFXDPCloseSocket(ptv);
520 usleep(RECONNECT_TIMEOUT);
522 int if_flags = GetIfaceFlags(ptv->iface);
523 if (if_flags == -1) {
524 SCLogDebug(
"Couldn't get flags for interface '%s'", ptv->iface);
526 }
else if ((if_flags & (IFF_UP | IFF_RUNNING)) == 0) {
527 SCLogDebug(
"Interface '%s' is down", ptv->iface);
535 SCLogInfo(
"Interface '%s' is back", ptv->iface);
548 static void AFXDPReleasePacket(
Packet *p)
550 *xsk_ring_prod__fill_addr((
struct xsk_ring_prod *)p->afxdp_v.fq, p->afxdp_v.fq_idx) =
556 static inline int DumpStatsEverySecond(AFXDPThreadVars *ptv, time_t *last_dump)
558 int stats_dumped = 0;
559 time_t current_time = time(NULL);
561 if (current_time != *last_dump) {
562 AFXDPDumpCounters(ptv);
563 *last_dump = current_time;
572 static inline ssize_t WakeupSocket(
void *data)
575 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
578 if (ptv->xsk.enable_busy_poll || xsk_ring_prod__needs_wakeup(&ptv->umem.fq)) {
580 res = recvfrom(xsk_socket__fd(ptv->xsk.xsk), NULL, 0, MSG_DONTWAIT, NULL, NULL);
601 if (initdata == NULL) {
606 AFXDPThreadVars *ptv =
SCCalloc(1,
sizeof(AFXDPThreadVars));
616 ptv->ifindex = if_nametoindex(ptv->iface);
619 if (ptv->livedev == NULL) {
625 ptv->promisc = afxdpconfig->
promisc;
626 if (ptv->promisc != 0) {
628 if (SetIfaceFlags(ptv->iface, IFF_PROMISC | IFF_UP) != 0) {
629 SCLogError(
"Failed to switch interface (%s) to promiscuous, error %s", ptv->iface,
636 ptv->threads = afxdpconfig->
threads;
639 ptv->xsk.cfg.rx_size = NUM_FRAMES_CONS;
640 ptv->xsk.cfg.tx_size = NUM_FRAMES_PROD;
641 ptv->xsk.cfg.xdp_flags = afxdpconfig->
mode;
642 ptv->xsk.cfg.bind_flags = afxdpconfig->
bind_flags;
645 ptv->umem.cfg.fill_size = NUM_FRAMES_PROD * 2;
646 ptv->umem.cfg.comp_size = NUM_FRAMES_CONS;
647 ptv->umem.cfg.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE;
648 ptv->umem.cfg.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM;
652 if (ptv->umem.cfg.flags == XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
653 ptv->umem.mmap_alignment_flag = MAP_HUGETLB;
667 ptv->capture_afxdp_poll_timeout =
669 ptv->capture_afxdp_poll_failed =
671 ptv->capture_afxdp_empty_reads =
673 ptv->capture_afxdp_failed_reads =
675 ptv->capture_afxdp_acquire_pkt_failed =
685 ReceiveAFXDPThreadDeinit(
tv, ptv);
702 time_t last_dump = 0;
704 uint32_t idx_rx = 0, idx_fq = 0, rcvd;
706 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
711 AFXDPAllThreadsRunning(ptv);
720 if (
unlikely(ptv->afxdp_state == AFXDP_STATE_DOWN)) {
722 usleep(RECONNECT_TIMEOUT);
726 r = AFXDPTryReopen(ptv);
732 AFXDPDumpCounters(ptv);
739 if (!ptv->xsk.enable_busy_poll) {
751 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
754 DumpStatsEverySecond(ptv, &last_dump);
759 rcvd = xsk_ring_cons__peek(&ptv->xsk.rx, ptv->xsk.busy_poll_budget, &idx_rx);
762 ssize_t ret = WakeupSocket(ptv);
765 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
767 DumpStatsEverySecond(ptv, &last_dump);
771 uint32_t res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
772 while (res != rcvd) {
774 ssize_t ret = WakeupSocket(ptv);
777 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
780 res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
783 gettimeofday(&
ts, NULL);
785 for (uint32_t i = 0; i < rcvd; i++) {
800 uint64_t addr = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx)->addr;
801 uint32_t
len = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx++)->len;
802 uint64_t orig = xsk_umem__extract_addr(addr);
803 addr = xsk_umem__add_offset_to_addr(addr);
805 uint8_t *pkt_data = xsk_umem__get_data(ptv->umem.buf, addr);
809 p->afxdp_v.fq_idx = idx_fq++;
810 p->afxdp_v.orig = orig;
811 p->afxdp_v.fq = &ptv->umem.fq;
815 if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) !=
TM_ECODE_OK) {
821 xsk_ring_prod__submit(&ptv->umem.fq, rcvd);
822 xsk_ring_cons__release(&ptv->xsk.rx, rcvd);
825 DumpStatsEverySecond(ptv, &last_dump);
835 static void RunModeAFXDPRemoveProg(
char *iface_name)
837 unsigned int ifindex = if_nametoindex(iface_name);
839 struct xdp_multiprog *progs = xdp_multiprog__get_from_ifindex(ifindex);
843 enum xdp_attach_mode mode = xdp_multiprog__attach_mode(progs);
845 struct xdp_program *prog = NULL;
848 for (prog = xdp_multiprog__next_prog(NULL, progs); prog;
849 prog = xdp_multiprog__next_prog(prog, progs)) {
850 int ret = xdp_program__detach(prog, ifindex, mode, 0);
852 SCLogDebug(
"Error: cannot detatch XDP program: %s\n", strerror(errno));
856 prog = xdp_multiprog__main_prog(progs);
857 if (xdp_program__is_attached(prog, ifindex) != XDP_MODE_UNSPEC) {
858 int ret = xdp_program__detach(prog, ifindex, mode, 0);
860 SCLogDebug(
"Error: cannot detatch XDP program: %s\n", strerror(errno));
874 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
882 RunModeAFXDPRemoveProg(ptv->iface);
886 xsk_socket__delete(ptv->xsk.xsk);
890 if (ptv->umem.umem) {
891 xsk_umem__delete(ptv->umem.umem);
892 ptv->umem.umem = NULL;
894 munmap(ptv->umem.buf, MEM_BYTES);
905 static void ReceiveAFXDPThreadExitStats(
ThreadVars *
tv,
void *data)
908 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
910 AFXDPDumpCounters(ptv);
912 SCLogPerf(
"(%s) Kernel: Packets %" PRIu64
", bytes %" PRIu64
", dropped %" PRIu64
"",
tv->
name,