32 #define PCAP_DONT_INCLUDE_PCAP_BPF_H 1
33 #define SC_PCAP_DONT_INCLUDE_PCAP_H 1
65 #include <bpf/libbpf.h>
67 #include <xdp/libxdp.h>
70 #if HAVE_LINUX_IF_ETHER_H
71 #include <linux/if_ether.h>
108 SCLogError(
"Error creating thread %s: you do not have "
109 "support for AF_XDP enabled, on Linux host please recompile "
110 "with --enable-af-xdp",
117 #define POLL_TIMEOUT 100
118 #define NUM_FRAMES_PROD XSK_RING_PROD__DEFAULT_NUM_DESCS
119 #define NUM_FRAMES_CONS XSK_RING_CONS__DEFAULT_NUM_DESCS
120 #define NUM_FRAMES NUM_FRAMES_PROD
121 #define FRAME_SIZE XSK_UMEM__DEFAULT_FRAME_SIZE
122 #define MEM_BYTES (NUM_FRAMES * FRAME_SIZE * 2)
123 #define RECONNECT_TIMEOUT 500000
126 enum state { AFXDP_STATE_DOWN, AFXDP_STATE_UP };
128 struct XskInitProtect {
135 struct xsk_umem *umem;
136 struct xsk_ring_prod fq;
137 struct xsk_ring_cons cq;
138 struct xsk_umem_config cfg;
139 int mmap_alignment_flag;
142 struct QueueAssignment {
148 struct xsk_ring_cons rx;
149 struct xsk_ring_prod tx;
150 struct xsk_socket *xsk;
153 struct QueueAssignment queue;
156 struct xsk_socket_config cfg;
157 bool enable_busy_poll;
158 uint32_t busy_poll_time;
159 uint32_t busy_poll_budget;
167 typedef struct AFXDPThreadVars_ {
180 struct UmemInfo umem;
181 struct XskSockInfo xsk;
182 uint32_t gro_flush_timeout;
183 uint32_t napi_defer_hard_irqs;
192 uint16_t capture_afxdp_packets;
193 uint16_t capture_kernel_drops;
194 uint16_t capture_afxdp_poll;
195 uint16_t capture_afxdp_poll_timeout;
196 uint16_t capture_afxdp_poll_failed;
197 uint16_t capture_afxdp_empty_reads;
198 uint16_t capture_afxdp_failed_reads;
199 uint16_t capture_afxdp_acquire_pkt_failed;
203 static void ReceiveAFXDPThreadExitStats(
ThreadVars *,
void *);
243 static inline void AFXDPDumpCounters(AFXDPThreadVars *ptv)
245 struct xdp_statistics stats;
246 socklen_t
len =
sizeof(
struct xdp_statistics);
247 int fd = xsk_socket__fd(ptv->xsk.xsk);
249 if (getsockopt(fd, SOL_XDP, XDP_STATISTICS, &stats, &
len) >= 0) {
250 uint64_t rx_dropped = stats.rx_dropped + stats.rx_invalid_descs + stats.rx_ring_full;
254 StatsAddUI64(ptv->tv, ptv->capture_afxdp_packets, ptv->pkts);
259 SCLogDebug(
"(%s) Kernel: Packets %" PRIu64
", bytes %" PRIu64
", dropped %" PRIu64
"",
286 static TmEcode AFXDPAssignQueueID(AFXDPThreadVars *ptv)
288 if (ptv->xsk.queue.assigned ==
false) {
289 ptv->xsk.queue.queue_num =
SC_ATOMIC_GET(xsk_protect.queue_num);
293 ptv->xsk.queue.assigned =
true;
298 static void AFXDPAllThreadsRunning(AFXDPThreadVars *ptv)
301 if ((ptv->threads - 1) == (
int)ptv->xsk.queue.queue_num) {
302 SCLogDebug(
"All AF_XDP capture threads are running.");
307 static TmEcode AcquireBuffer(AFXDPThreadVars *ptv)
309 int mmap_flags = MAP_PRIVATE | MAP_ANONYMOUS | ptv->umem.mmap_alignment_flag;
310 ptv->umem.buf = mmap(NULL, MEM_BYTES, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
312 if (ptv->umem.buf == MAP_FAILED) {
320 static TmEcode ConfigureXSKUmem(AFXDPThreadVars *ptv)
322 if (xsk_umem__create(&ptv->umem.umem, ptv->umem.buf, MEM_BYTES, &ptv->umem.fq, &ptv->umem.cq,
324 SCLogError(
"failed to create umem: %s", strerror(errno));
331 static TmEcode InitFillRing(AFXDPThreadVars *ptv,
const uint32_t
cnt)
335 uint32_t ret = xsk_ring_prod__reserve(&ptv->umem.fq,
cnt, &idx_fq);
337 SCLogError(
"Failed to initialise the fill ring.");
341 for (uint32_t i = 0; i <
cnt; i++) {
342 *xsk_ring_prod__fill_addr(&ptv->umem.fq, idx_fq++) = i * FRAME_SIZE;
345 xsk_ring_prod__submit(&ptv->umem.fq,
cnt);
354 static TmEcode WriteLinuxTunables(AFXDPThreadVars *ptv)
379 static TmEcode ConfigureBusyPolling(AFXDPThreadVars *ptv)
381 if (!ptv->xsk.enable_busy_poll) {
389 SCLogWarning(
"Kernel version older than required: v5.11,"
390 " upgrade kernel version to use 'enable-busy-poll' option.");
394 #if defined SO_PREFER_BUSY_POLL && defined SO_BUSY_POLL && defined SO_BUSY_POLL_BUDGET
395 const int fd = xsk_socket__fd(ptv->xsk.xsk);
402 if (setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL, (
void *)&sock_opt,
sizeof(sock_opt)) < 0) {
406 sock_opt = ptv->xsk.busy_poll_time;
407 if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (
void *)&sock_opt,
sizeof(sock_opt)) < 0) {
411 sock_opt = ptv->xsk.busy_poll_budget;
412 if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL_BUDGET, (
void *)&sock_opt,
sizeof(sock_opt)) < 0) {
419 "Kernel does not support busy poll, upgrade kernel or disable \"enable-busy-poll\".");
424 static void AFXDPSwitchState(AFXDPThreadVars *ptv,
int state)
426 ptv->afxdp_state = state;
429 static TmEcode OpenXSKSocket(AFXDPThreadVars *ptv)
440 if ((ret = xsk_socket__create(&ptv->xsk.xsk, ptv->livedev->dev, ptv->xsk.queue.queue_num,
441 ptv->umem.umem, &ptv->xsk.rx, &ptv->xsk.tx, &ptv->xsk.cfg))) {
442 SCLogError(
"Failed to create socket: %s", strerror(-ret));
445 SCLogDebug(
"bind to %s on queue %u", ptv->iface, ptv->xsk.queue.queue_num);
448 ptv->xsk.fd.fd = xsk_socket__fd(ptv->xsk.xsk);
449 ptv->xsk.fd.events = POLLIN;
452 AFXDPSwitchState(ptv, AFXDP_STATE_UP);
458 static void AFXDPCloseSocket(AFXDPThreadVars *ptv)
461 xsk_socket__delete(ptv->xsk.xsk);
465 if (ptv->umem.umem) {
466 xsk_umem__delete(ptv->umem.umem);
467 ptv->umem.umem = NULL;
470 memset(&ptv->umem.fq, 0,
sizeof(
struct xsk_ring_prod));
471 memset(&ptv->umem.cq, 0,
sizeof(
struct xsk_ring_cons));
474 static TmEcode AFXDPSocketCreation(AFXDPThreadVars *ptv)
480 if (InitFillRing(ptv, NUM_FRAMES * 2) !=
TM_ECODE_OK) {
491 " performance may be reduced.");
495 #ifdef HAVE_BPF_XDP_QUERY_ID
496 if (bpf_xdp_query_id(ptv->ifindex, ptv->xsk.cfg.xdp_flags, &ptv->prog_id)) {
497 SCLogError(
"Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
501 if (bpf_get_link_xdp_id(ptv->ifindex, &ptv->prog_id, ptv->xsk.cfg.xdp_flags)) {
502 SCLogError(
"Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
516 static TmEcode AFXDPTryReopen(AFXDPThreadVars *ptv)
518 AFXDPCloseSocket(ptv);
519 usleep(RECONNECT_TIMEOUT);
521 int if_flags = GetIfaceFlags(ptv->iface);
522 if (if_flags == -1) {
523 SCLogDebug(
"Couldn't get flags for interface '%s'", ptv->iface);
525 }
else if ((if_flags & (IFF_UP | IFF_RUNNING)) == 0) {
526 SCLogDebug(
"Interface '%s' is down", ptv->iface);
534 SCLogInfo(
"Interface '%s' is back", ptv->iface);
547 static void AFXDPReleasePacket(
Packet *p)
549 *xsk_ring_prod__fill_addr((
struct xsk_ring_prod *)p->afxdp_v.fq, p->afxdp_v.fq_idx) =
555 static inline int DumpStatsEverySecond(AFXDPThreadVars *ptv, time_t *last_dump)
557 int stats_dumped = 0;
558 time_t current_time = time(NULL);
560 if (current_time != *last_dump) {
561 AFXDPDumpCounters(ptv);
562 *last_dump = current_time;
571 static inline ssize_t WakeupSocket(
void *data)
574 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
577 if (ptv->xsk.enable_busy_poll || xsk_ring_prod__needs_wakeup(&ptv->umem.fq)) {
579 res = recvfrom(xsk_socket__fd(ptv->xsk.xsk), NULL, 0, MSG_DONTWAIT, NULL, NULL);
600 if (initdata == NULL) {
605 AFXDPThreadVars *ptv =
SCCalloc(1,
sizeof(AFXDPThreadVars));
615 ptv->ifindex = if_nametoindex(ptv->iface);
618 if (ptv->livedev == NULL) {
624 ptv->promisc = afxdpconfig->
promisc;
625 if (ptv->promisc != 0) {
627 if (SetIfaceFlags(ptv->iface, IFF_PROMISC | IFF_UP) != 0) {
628 SCLogError(
"Failed to switch interface (%s) to promiscuous, error %s", ptv->iface,
635 ptv->threads = afxdpconfig->
threads;
638 ptv->xsk.cfg.rx_size = NUM_FRAMES_CONS;
639 ptv->xsk.cfg.tx_size = NUM_FRAMES_PROD;
640 ptv->xsk.cfg.xdp_flags = afxdpconfig->
mode;
641 ptv->xsk.cfg.bind_flags = afxdpconfig->
bind_flags;
644 ptv->umem.cfg.fill_size = NUM_FRAMES_PROD * 2;
645 ptv->umem.cfg.comp_size = NUM_FRAMES_CONS;
646 ptv->umem.cfg.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE;
647 ptv->umem.cfg.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM;
651 if (ptv->umem.cfg.flags == XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
652 ptv->umem.mmap_alignment_flag = MAP_HUGETLB;
670 ptv->capture_afxdp_acquire_pkt_failed =
680 ReceiveAFXDPThreadDeinit(
tv, ptv);
697 time_t last_dump = 0;
699 uint32_t idx_rx = 0, idx_fq = 0, rcvd;
701 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
706 AFXDPAllThreadsRunning(ptv);
715 if (
unlikely(ptv->afxdp_state == AFXDP_STATE_DOWN)) {
717 usleep(RECONNECT_TIMEOUT);
721 r = AFXDPTryReopen(ptv);
727 AFXDPDumpCounters(ptv);
734 if (!ptv->xsk.enable_busy_poll) {
735 StatsIncr(ptv->tv, ptv->capture_afxdp_poll);
742 StatsIncr(ptv->tv, ptv->capture_afxdp_poll_timeout);
744 StatsIncr(ptv->tv, ptv->capture_afxdp_poll_failed);
746 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
749 DumpStatsEverySecond(ptv, &last_dump);
754 rcvd = xsk_ring_cons__peek(&ptv->xsk.rx, ptv->xsk.busy_poll_budget, &idx_rx);
756 StatsIncr(ptv->tv, ptv->capture_afxdp_empty_reads);
757 ssize_t ret = WakeupSocket(ptv);
760 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
762 DumpStatsEverySecond(ptv, &last_dump);
766 uint32_t res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
767 while (res != rcvd) {
768 StatsIncr(ptv->tv, ptv->capture_afxdp_failed_reads);
769 ssize_t ret = WakeupSocket(ptv);
772 AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
775 res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
778 gettimeofday(&
ts, NULL);
780 for (uint32_t i = 0; i < rcvd; i++) {
783 StatsIncr(ptv->tv, ptv->capture_afxdp_acquire_pkt_failed);
795 uint64_t addr = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx)->addr;
796 uint32_t
len = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx++)->len;
797 uint64_t orig = xsk_umem__extract_addr(addr);
798 addr = xsk_umem__add_offset_to_addr(addr);
800 uint8_t *pkt_data = xsk_umem__get_data(ptv->umem.buf, addr);
804 p->afxdp_v.fq_idx = idx_fq++;
805 p->afxdp_v.orig = orig;
806 p->afxdp_v.fq = &ptv->umem.fq;
810 if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) !=
TM_ECODE_OK) {
816 xsk_ring_prod__submit(&ptv->umem.fq, rcvd);
817 xsk_ring_cons__release(&ptv->xsk.rx, rcvd);
820 DumpStatsEverySecond(ptv, &last_dump);
830 static void RunModeAFXDPRemoveProg(
char *iface_name)
832 unsigned int ifindex = if_nametoindex(iface_name);
834 struct xdp_multiprog *progs = xdp_multiprog__get_from_ifindex(ifindex);
838 enum xdp_attach_mode mode = xdp_multiprog__attach_mode(progs);
840 struct xdp_program *prog = NULL;
843 for (prog = xdp_multiprog__next_prog(NULL, progs); prog;
844 prog = xdp_multiprog__next_prog(prog, progs)) {
845 int ret = xdp_program__detach(prog, ifindex, mode, 0);
847 SCLogDebug(
"Error: cannot detatch XDP program: %s\n", strerror(errno));
851 prog = xdp_multiprog__main_prog(progs);
852 if (xdp_program__is_attached(prog, ifindex) != XDP_MODE_UNSPEC) {
853 int ret = xdp_program__detach(prog, ifindex, mode, 0);
855 SCLogDebug(
"Error: cannot detatch XDP program: %s\n", strerror(errno));
869 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
877 RunModeAFXDPRemoveProg(ptv->iface);
881 xsk_socket__delete(ptv->xsk.xsk);
885 if (ptv->umem.umem) {
886 xsk_umem__delete(ptv->umem.umem);
887 ptv->umem.umem = NULL;
889 munmap(ptv->umem.buf, MEM_BYTES);
900 static void ReceiveAFXDPThreadExitStats(
ThreadVars *
tv,
void *data)
903 AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
905 AFXDPDumpCounters(ptv);
907 SCLogPerf(
"(%s) Kernel: Packets %" PRIu64
", bytes %" PRIu64
", dropped %" PRIu64
"",
tv->
name,