32 #define SC_PCAP_DONT_INCLUDE_PCAP_H 1
64 #include <bpf/libbpf.h>
66 #include <xdp/libxdp.h>
69 #if HAVE_LINUX_IF_ETHER_H
70 #include <linux/if_ether.h>
107 SCLogError(
"Error creating thread %s: you do not have "
108 "support for AF_XDP enabled, on Linux host please recompile "
109 "with --enable-af-xdp",
116 #define POLL_TIMEOUT 100
117 #define NUM_FRAMES_PROD XSK_RING_PROD__DEFAULT_NUM_DESCS
118 #define NUM_FRAMES_CONS XSK_RING_CONS__DEFAULT_NUM_DESCS
119 #define NUM_FRAMES NUM_FRAMES_PROD
120 #define FRAME_SIZE XSK_UMEM__DEFAULT_FRAME_SIZE
121 #define MEM_BYTES (NUM_FRAMES * FRAME_SIZE * 2)
122 #define RECONNECT_TIMEOUT 500000
125 enum state { AFXDP_STATE_DOWN, AFXDP_STATE_UP };
127 struct XskInitProtect {
134 struct xsk_umem *umem;
135 struct xsk_ring_prod fq;
136 struct xsk_ring_cons cq;
137 struct xsk_umem_config cfg;
138 int mmap_alignment_flag;
141 struct QueueAssignment {
147 struct xsk_ring_cons rx;
148 struct xsk_ring_prod tx;
149 struct xsk_socket *xsk;
152 struct QueueAssignment queue;
155 struct xsk_socket_config cfg;
156 bool enable_busy_poll;
157 uint32_t busy_poll_time;
158 uint32_t busy_poll_budget;
166 typedef struct AFXDPThreadVars_ {
179 struct UmemInfo umem;
180 struct XskSockInfo xsk;
181 uint32_t gro_flush_timeout;
182 uint32_t napi_defer_hard_irqs;
191 uint16_t capture_afxdp_packets;
192 uint16_t capture_kernel_drops;
193 uint16_t capture_afxdp_poll;
194 uint16_t capture_afxdp_poll_timeout;
195 uint16_t capture_afxdp_poll_failed;
196 uint16_t capture_afxdp_empty_reads;
197 uint16_t capture_afxdp_failed_reads;
198 uint16_t capture_afxdp_acquire_pkt_failed;
202 static void ReceiveAFXDPThreadExitStats(
ThreadVars *,
void *);
242 static inline void AFXDPDumpCounters(AFXDPThreadVars *ptv)
244 struct xdp_statistics stats;
245 socklen_t
len =
sizeof(
struct xdp_statistics);
246 int fd = xsk_socket__fd(ptv->xsk.xsk);
248 if (getsockopt(fd, SOL_XDP, XDP_STATISTICS, &stats, &
len) >= 0) {
249 uint64_t rx_dropped = stats.rx_dropped + stats.rx_invalid_descs + stats.rx_ring_full;
253 StatsAddUI64(ptv->tv, ptv->capture_afxdp_packets, ptv->pkts);
258 SCLogDebug(
"(%s) Kernel: Packets %" PRIu64
", bytes %" PRIu64
", dropped %" PRIu64
"",
285 static TmEcode AFXDPAssignQueueID(AFXDPThreadVars *ptv)
287 if (ptv->xsk.queue.assigned ==
false) {
288 ptv->xsk.queue.queue_num =
SC_ATOMIC_GET(xsk_protect.queue_num);
292 ptv->xsk.queue.assigned =
true;
297 static void AFXDPAllThreadsRunning(AFXDPThreadVars *ptv)
300 if ((ptv->threads - 1) == (
int)ptv->xsk.queue.queue_num) {
301 SCLogDebug(
"All AF_XDP capture threads are running.");
306 static TmEcode AcquireBuffer(AFXDPThreadVars *ptv)
308 int mmap_flags = MAP_PRIVATE | MAP_ANONYMOUS | ptv->umem.mmap_alignment_flag;
309 ptv->umem.buf = mmap(NULL, MEM_BYTES, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
311 if (ptv->umem.buf == MAP_FAILED) {
319 static TmEcode ConfigureXSKUmem(AFXDPThreadVars *ptv)
321 if (xsk_umem__create(&ptv->umem.umem, ptv->umem.buf, MEM_BYTES, &ptv->umem.fq, &ptv->umem.cq,
323 SCLogError(
"failed to create umem: %s", strerror(errno));
330 static TmEcode InitFillRing(AFXDPThreadVars *ptv,
const uint32_t
cnt)
334 uint32_t ret = xsk_ring_prod__reserve(&ptv->umem.fq,
cnt, &idx_fq);
336 SCLogError(
"Failed to initialise the fill ring.");
340 for (uint32_t i = 0; i <
cnt; i++) {
341 *xsk_ring_prod__fill_addr(&ptv->umem.fq, idx_fq++) = i * FRAME_SIZE;
344 xsk_ring_prod__submit(&ptv->umem.fq,
cnt);
353 static TmEcode WriteLinuxTunables(AFXDPThreadVars *ptv)
378 static TmEcode ConfigureBusyPolling(AFXDPThreadVars *ptv)
380 if (!ptv->xsk.enable_busy_poll) {
388 SCLogWarning(
"Kernel version older than required: v5.11,"
389 " upgrade kernel version to use 'enable-busy-poll' option.");
393 #if defined SO_PREFER_BUSY_POLL && defined SO_BUSY_POLL && defined SO_BUSY_POLL_BUDGET
394 const int fd = xsk_socket__fd(ptv->xsk.xsk);
401 if (setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL, (
void *)&sock_opt,
sizeof(sock_opt)) < 0) {
405 sock_opt = ptv->xsk.busy_poll_time;
406 if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (
void *)&sock_opt,
sizeof(sock_opt)) < 0) {
410 sock_opt = ptv->xsk.busy_poll_budget;
411 if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL_BUDGET, (
void *)&sock_opt,
sizeof(sock_opt)) < 0) {
418 "Kernel does not support busy poll, upgrade kernel or disable \"enable-busy-poll\".");
423 static void AFXDPSwitchState(AFXDPThreadVars *ptv,
int state)
425 ptv->afxdp_state = state;
428 static TmEcode OpenXSKSocket(AFXDPThreadVars *ptv)
439 if ((ret = xsk_socket__create(&ptv->xsk.xsk, ptv->livedev->dev, ptv->xsk.queue.queue_num,
440 ptv->umem.umem, &ptv->xsk.rx, &ptv->xsk.tx, &ptv->xsk.cfg))) {
441 SCLogError(
"Failed to create socket: %s", strerror(-ret));
444 SCLogDebug(
"bind to %s on queue %u", ptv->iface, ptv->xsk.queue.queue_num);
447 ptv->xsk.fd.fd = xsk_socket__fd(ptv->xsk.xsk);
448 ptv->xsk.fd.events = POLLIN;
451 AFXDPSwitchState(ptv, AFXDP_STATE_UP);
457 static void AFXDPCloseSocket(AFXDPThreadVars *ptv)
460 xsk_socket__delete(ptv->xsk.xsk);
464 if (ptv->umem.umem) {
465 xsk_umem__delete(ptv->umem.umem);
466 ptv->umem.umem = NULL;
469 memset(&ptv->umem.fq, 0,
sizeof(
struct xsk_ring_prod));
470 memset(&ptv->umem.cq, 0,
sizeof(
struct xsk_ring_cons));
473 static TmEcode AFXDPSocketCreation(AFXDPThreadVars *ptv)
479 if (InitFillRing(ptv, NUM_FRAMES * 2) !=
TM_ECODE_OK) {
490 " performance may be reduced.");
494 #ifdef HAVE_BPF_XDP_QUERY_ID
495 if (bpf_xdp_query_id(ptv->ifindex, ptv->xsk.cfg.xdp_flags, &ptv->prog_id)) {
496 SCLogError(
"Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
500 if (bpf_get_link_xdp_id(ptv->ifindex, &ptv->prog_id, ptv->xsk.cfg.xdp_flags)) {
501 SCLogError(
"Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
515 static TmEcode AFXDPTryReopen(AFXDPThreadVars *ptv)
517 AFXDPCloseSocket(ptv);
518 usleep(RECONNECT_TIMEOUT);
520 int if_flags = GetIfaceFlags(ptv->iface);
521 if (if_flags == -1) {
522 SCLogDebug(
"Couldn't get flags for interface '%s'", ptv->iface);
524 }
else if ((if_flags & (IFF_UP | IFF_RUNNING)) == 0) {
525 SCLogDebug(
"Interface '%s' is down", ptv->iface);
533 SCLogInfo(
"Interface '%s' is back", ptv->iface);
546 static void AFXDPReleasePacket(
Packet *p)
548 *xsk_ring_prod__fill_addr((
struct xsk_ring_prod *)p->afxdp_v.fq, p->afxdp_v.fq_idx) =
554 static inline int DumpStatsEverySecond(AFXDPThreadVars *ptv, time_t *last_dump)
556 int stats_dumped = 0;
557 time_t current_time = time(NULL);
559 if (current_time != *last_dump) {
560 AFXDPDumpCounters(ptv);
561 *last_dump = current_time;
570 static inline ssize_t WakeupSocket(
void *data)
573 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
576 if (ptv->xsk.enable_busy_poll || xsk_ring_prod__needs_wakeup(&ptv->umem.fq)) {
578 res = recvfrom(xsk_socket__fd(ptv->xsk.xsk), NULL, 0, MSG_DONTWAIT, NULL, NULL);
599 if (initdata == NULL) {
604 AFXDPThreadVars *ptv =
SCCalloc(1,
sizeof(AFXDPThreadVars));
614 ptv->ifindex = if_nametoindex(ptv->iface);
617 if (ptv->livedev == NULL) {
623 ptv->promisc = afxdpconfig->
promisc;
624 if (ptv->promisc != 0) {
626 if (SetIfaceFlags(ptv->iface, IFF_PROMISC | IFF_UP) != 0) {
627 SCLogError(
"Failed to switch interface (%s) to promiscuous, error %s", ptv->iface,
634 ptv->threads = afxdpconfig->
threads;
637 ptv->xsk.cfg.rx_size = NUM_FRAMES_CONS;
638 ptv->xsk.cfg.tx_size = NUM_FRAMES_PROD;
639 ptv->xsk.cfg.xdp_flags = afxdpconfig->
mode;
640 ptv->xsk.cfg.bind_flags = afxdpconfig->
bind_flags;
643 ptv->umem.cfg.fill_size = NUM_FRAMES_PROD * 2;
644 ptv->umem.cfg.comp_size = NUM_FRAMES_CONS;
645 ptv->umem.cfg.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE;
646 ptv->umem.cfg.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM;
650 if (ptv->umem.cfg.flags == XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
651 ptv->umem.mmap_alignment_flag = MAP_HUGETLB;
669 ptv->capture_afxdp_acquire_pkt_failed =
679 ReceiveAFXDPThreadDeinit(
tv, ptv);
696 time_t last_dump = 0;
698 uint32_t idx_rx = 0, idx_fq = 0, rcvd;
700 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
705 AFXDPAllThreadsRunning(ptv);
714 if (
unlikely(ptv->afxdp_state == AFXDP_STATE_DOWN)) {
716 usleep(RECONNECT_TIMEOUT);
720 r = AFXDPTryReopen(ptv);
726 AFXDPDumpCounters(ptv);
733 if (!ptv->xsk.enable_busy_poll) {
734 StatsIncr(ptv->tv, ptv->capture_afxdp_poll);
741 StatsIncr(ptv->tv, ptv->capture_afxdp_poll_timeout);
743 StatsIncr(ptv->tv, ptv->capture_afxdp_poll_failed);
745 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
748 DumpStatsEverySecond(ptv, &last_dump);
753 rcvd = xsk_ring_cons__peek(&ptv->xsk.rx, ptv->xsk.busy_poll_budget, &idx_rx);
755 StatsIncr(ptv->tv, ptv->capture_afxdp_empty_reads);
756 ssize_t ret = WakeupSocket(ptv);
759 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
761 DumpStatsEverySecond(ptv, &last_dump);
765 uint32_t res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
766 while (res != rcvd) {
767 StatsIncr(ptv->tv, ptv->capture_afxdp_failed_reads);
768 ssize_t ret = WakeupSocket(ptv);
771 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
774 res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
777 gettimeofday(&
ts, NULL);
779 for (uint32_t i = 0; i < rcvd; i++) {
782 StatsIncr(ptv->tv, ptv->capture_afxdp_acquire_pkt_failed);
794 uint64_t addr = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx)->addr;
795 uint32_t
len = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx++)->len;
796 uint64_t orig = xsk_umem__extract_addr(addr);
797 addr = xsk_umem__add_offset_to_addr(addr);
799 uint8_t *pkt_data = xsk_umem__get_data(ptv->umem.buf, addr);
803 p->afxdp_v.fq_idx = idx_fq++;
804 p->afxdp_v.orig = orig;
805 p->afxdp_v.fq = &ptv->umem.fq;
809 if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) !=
TM_ECODE_OK) {
815 xsk_ring_prod__submit(&ptv->umem.fq, rcvd);
816 xsk_ring_cons__release(&ptv->xsk.rx, rcvd);
819 DumpStatsEverySecond(ptv, &last_dump);
829 static void RunModeAFXDPRemoveProg(
char *iface_name)
831 unsigned int ifindex = if_nametoindex(iface_name);
833 struct xdp_multiprog *progs = xdp_multiprog__get_from_ifindex(ifindex);
837 enum xdp_attach_mode mode = xdp_multiprog__attach_mode(progs);
839 struct xdp_program *prog = NULL;
842 for (prog = xdp_multiprog__next_prog(NULL, progs); prog;
843 prog = xdp_multiprog__next_prog(prog, progs)) {
844 int ret = xdp_program__detach(prog, ifindex, mode, 0);
846 SCLogDebug(
"Error: cannot detatch XDP program: %s\n", strerror(errno));
850 prog = xdp_multiprog__main_prog(progs);
851 if (xdp_program__is_attached(prog, ifindex) != XDP_MODE_UNSPEC) {
852 int ret = xdp_program__detach(prog, ifindex, mode, 0);
854 SCLogDebug(
"Error: cannot detatch XDP program: %s\n", strerror(errno));
868 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
876 RunModeAFXDPRemoveProg(ptv->iface);
880 xsk_socket__delete(ptv->xsk.xsk);
884 if (ptv->umem.umem) {
885 xsk_umem__delete(ptv->umem.umem);
886 ptv->umem.umem = NULL;
888 munmap(ptv->umem.buf, MEM_BYTES);
899 static void ReceiveAFXDPThreadExitStats(
ThreadVars *
tv,
void *data)
902 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
904 AFXDPDumpCounters(ptv);
906 SCLogPerf(
"(%s) Kernel: Packets %" PRIu64
", bytes %" PRIu64
", dropped %" PRIu64
"",
tv->
name,