suricata
source-mpipe.c
Go to the documentation of this file.
1 /* Copyright (C) 2011-2014 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Tom DeCanio <decanio.tom@gmail.com>
22  * \author Ken Steele, Tilera Corporation <suricata@tilera.com>
23  *
24  * Tilera TILE-Gx mpipe ingress packet support.
25  */
26 
27 #include "suricata-common.h"
28 #include "suricata.h"
29 #include "host.h"
30 #include "decode.h"
31 #include "packet-queue.h"
32 #include "threads.h"
33 #include "threadvars.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tm-threads-common.h"
37 #include "runmode-tile.h"
38 #include "source-mpipe.h"
39 #include "conf.h"
40 #include "util-debug.h"
41 #include "util-error.h"
42 #include "util-privs.h"
43 #include "util-device.h"
44 #include "util-mem.h"
45 #include "util-profiling.h"
46 #include "tmqh-packetpool.h"
47 #include "pkt-var.h"
48 
49 #ifdef HAVE_MPIPE
50 
51 #include <mde-version.h>
52 #include <tmc/alloc.h>
53 #include <arch/sim.h>
54 #include <arch/atomic.h>
55 #include <arch/cycle.h>
56 #include <gxio/mpipe.h>
57 #include <gxio/trio.h>
58 #include <tmc/cpus.h>
59 #include <tmc/spin.h>
60 #include <tmc/sync.h>
61 #include <tmc/task.h>
62 #include <tmc/perf.h>
63 
64 /* Align "p" mod "align", assuming "p" is a "void*". */
65 #define ALIGN(p, align) do { (p) += -(long)(p) & ((align) - 1); } while(0)
66 
67 #define VERIFY(VAL, WHAT) \
68  do { \
69  int __val = (VAL); \
70  if (__val < 0) { \
71  SCLogError(SC_ERR_INVALID_ARGUMENT,(WHAT)); \
72  SCReturnInt(TM_ECODE_FAILED); \
73  } \
74  } while (0)
75 
76 #define min(a,b) (((a) < (b)) ? (a) : (b))
77 
78 /** storage for mpipe device names */
79 typedef struct MpipeDevice_ {
80  char *dev; /**< the device (e.g. "xgbe1") */
81  TAILQ_ENTRY(MpipeDevice_) next;
82 } MpipeDevice;
83 
84 
85 /** private device list */
86 static TAILQ_HEAD(, MpipeDevice_) mpipe_devices =
87  TAILQ_HEAD_INITIALIZER(mpipe_devices);
88 
89 static int first_stack;
90 static uint32_t headroom = 2;
91 
92 /**
93  * \brief Structure to hold thread specific variables.
94  */
95 typedef struct MpipeThreadVars_
96 {
97  ChecksumValidationMode checksum_mode;
98 
99  /* counters */
100  uint32_t pkts;
101  uint64_t bytes;
102  uint32_t errs;
103 
104  ThreadVars *tv;
105  TmSlot *slot;
106 
107  Packet *in_p;
108 
109  /** stats/counters */
110  uint16_t max_mpipe_depth;
111  uint16_t mpipe_drop;
112  uint16_t counter_no_buffers_0;
113  uint16_t counter_no_buffers_1;
114  uint16_t counter_no_buffers_2;
115  uint16_t counter_no_buffers_3;
116  uint16_t counter_no_buffers_4;
117  uint16_t counter_no_buffers_5;
118  uint16_t counter_no_buffers_6;
119  uint16_t counter_no_buffers_7;
120 
121 } MpipeThreadVars;
122 
123 TmEcode ReceiveMpipeLoop(ThreadVars *tv, void *data, void *slot);
124 TmEcode ReceiveMpipeThreadInit(ThreadVars *, void *, void **);
125 void ReceiveMpipeThreadExitStats(ThreadVars *, void *);
126 
127 TmEcode DecodeMpipeThreadInit(ThreadVars *, void *, void **);
128 TmEcode DecodeMpipeThreadDeinit(ThreadVars *tv, void *data);
129 TmEcode DecodeMpipe(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
130 static int MpipeReceiveOpenIqueue(int rank);
131 
132 #define MAX_CHANNELS 32 /* can probably find this in the MDE */
133 
134 /*
135  * mpipe configuration.
136  */
137 
138 /* The mpipe context (shared by all workers) */
139 static gxio_mpipe_context_t context_body;
140 static gxio_mpipe_context_t* context = &context_body;
141 
142 /* First allocated Notification ring for iQueues. */
143 static int first_notif_ring;
144 
145 /* The ingress queue for this worker thread */
146 static __thread gxio_mpipe_iqueue_t* thread_iqueue;
147 
148 /* The egress queues (one per port) */
149 static gxio_mpipe_equeue_t equeue[MAX_CHANNELS];
150 
151 /* the number of entries in an equeue ring */
152 static const int equeue_entries = 8192;
153 
154 /* Array of mpipe links */
155 static gxio_mpipe_link_t mpipe_link[MAX_CHANNELS];
156 
157 /* Per interface configuration data */
158 static MpipeIfaceConfig *mpipe_conf[MAX_CHANNELS];
159 
160 /* Per interface TAP/IPS configuration */
161 
162 /* egress equeue associated with each ingress channel */
163 static MpipePeerVars channel_to_equeue[MAX_CHANNELS];
164 
165 /**
166  * \brief Registration Function for ReceiveMpipe.
167  * \todo Unit tests are needed for this module.
168  */
169 void TmModuleReceiveMpipeRegister (void)
170 {
171  tmm_modules[TMM_RECEIVEMPIPE].name = "ReceiveMpipe";
172  tmm_modules[TMM_RECEIVEMPIPE].ThreadInit = ReceiveMpipeThreadInit;
174  tmm_modules[TMM_RECEIVEMPIPE].PktAcqLoop = ReceiveMpipeLoop;
176  tmm_modules[TMM_RECEIVEMPIPE].ThreadExitPrintStats = ReceiveMpipeThreadExitStats;
181 }
182 
183 /**
184  * \brief Registraction Function for DecodeNetio.
185  * \todo Unit tests are needed for this module.
186  */
187 void TmModuleDecodeMpipeRegister (void)
188 {
189  tmm_modules[TMM_DECODEMPIPE].name = "DecodeMpipe";
190  tmm_modules[TMM_DECODEMPIPE].ThreadInit = DecodeMpipeThreadInit;
191  tmm_modules[TMM_DECODEMPIPE].Func = DecodeMpipe;
193  tmm_modules[TMM_DECODEMPIPE].ThreadDeinit = DecodeMpipeThreadDeinit;
197 }
198 
199 /* Release Packet without sending. */
200 void MpipeReleasePacket(Packet *p)
201 {
202  /* Use this thread's context to free the packet. */
203  // TODO: Check for dual mPipes.
204  gxio_mpipe_iqueue_t* iqueue = thread_iqueue;
205  int bucket = p->mpipe_v.idesc.bucket_id;
206  gxio_mpipe_credit(iqueue->context, iqueue->ring, bucket, 1);
207 
208  gxio_mpipe_push_buffer(iqueue->context,
209  p->mpipe_v.idesc.stack_idx,
210  (void*)(intptr_t)p->mpipe_v.idesc.va);
211 }
212 
213 /* Unconditionally send packet, then release packet buffer. */
214 void MpipeReleasePacketCopyTap(Packet *p)
215 {
216  gxio_mpipe_iqueue_t* iqueue = thread_iqueue;
217  int bucket = p->mpipe_v.idesc.bucket_id;
218  gxio_mpipe_credit(iqueue->context, iqueue->ring, bucket, 1);
219  gxio_mpipe_edesc_t edesc;
220  edesc.words[0] = 0;
221  edesc.words[1] = 0;
222  edesc.bound = 1;
223  edesc.xfer_size = p->mpipe_v.idesc.l2_size;
224  edesc.va = p->mpipe_v.idesc.va;
225  edesc.stack_idx = p->mpipe_v.idesc.stack_idx;
226  edesc.hwb = 1; /* mPIPE will return packet buffer to proper stack. */
227  edesc.size = p->mpipe_v.idesc.size;
228  int channel = p->mpipe_v.idesc.channel;
229  /* Tell mPIPE to egress the packet. */
230  gxio_mpipe_equeue_put(channel_to_equeue[channel].peer_equeue, edesc);
231 }
232 
233 /* Release Packet and send copy if action is not DROP. */
234 void MpipeReleasePacketCopyIPS(Packet *p)
235 {
237  /* Return packet buffer without sending the packet. */
238  MpipeReleasePacket(p);
239  } else {
240  /* Send packet */
241  MpipeReleasePacketCopyTap(p);
242  }
243 }
244 
245 /**
246  * \brief Mpipe Packet Process function.
247  *
248  * This function fills in our packet structure from mpipe.
249  * From here the packets are picked up by the DecodeMpipe thread.
250  *
251  * \param user pointer to MpipeThreadVars passed from pcap_dispatch
252  * \param h pointer to gxio packet header
253  * \param pkt pointer to current packet
254  */
255 static inline
256 Packet *MpipeProcessPacket(MpipeThreadVars *ptv, gxio_mpipe_idesc_t *idesc)
257 {
258  int caplen = idesc->l2_size;
259  u_char *pkt = gxio_mpipe_idesc_get_va(idesc);
260  Packet *p = (Packet *)(pkt - sizeof(Packet) - headroom/*2*/);
261 
262  PACKET_RECYCLE(p);
264 
265  ptv->bytes += caplen;
266  ptv->pkts++;
267 
268  gettimeofday(&p->ts, NULL);
269 
271  /* No need to check return value, since the only error is pkt == NULL which can't happen here. */
272  PacketSetData(p, pkt, caplen);
273 
274  /* copy only the fields we use later */
275  p->mpipe_v.idesc.bucket_id = idesc->bucket_id;
276  p->mpipe_v.idesc.nr = idesc->nr;
277  p->mpipe_v.idesc.cs = idesc->cs;
278  p->mpipe_v.idesc.va = idesc->va;
279  p->mpipe_v.idesc.stack_idx = idesc->stack_idx;
280  MpipePeerVars *equeue_info = &channel_to_equeue[idesc->channel];
281  if (equeue_info->copy_mode != MPIPE_COPY_MODE_NONE) {
282  p->mpipe_v.idesc.size = idesc->size;
283  p->mpipe_v.idesc.l2_size = idesc->l2_size;
284  p->mpipe_v.idesc.channel = idesc->channel;
285  p->ReleasePacket = equeue_info->ReleasePacket;
286  } else {
287  p->ReleasePacket = MpipeReleasePacket;
288  }
289 
290  if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE)
292 
293  return p;
294 }
295 
296 static uint16_t XlateStack(MpipeThreadVars *ptv, int stack_idx)
297 {
298  switch(stack_idx - first_stack) {
299  case 0:
300  return ptv->counter_no_buffers_0;
301  case 1:
302  return ptv->counter_no_buffers_1;
303  case 2:
304  return ptv->counter_no_buffers_2;
305  case 3:
306  return ptv->counter_no_buffers_3;
307  case 4:
308  return ptv->counter_no_buffers_4;
309  case 5:
310  return ptv->counter_no_buffers_5;
311  case 6:
312  return ptv->counter_no_buffers_6;
313  case 7:
314  return ptv->counter_no_buffers_7;
315  default:
316  return ptv->counter_no_buffers_7;
317  }
318 }
319 
320 static void SendNoOpPacket(ThreadVars *tv, TmSlot *slot)
321 {
323  if (p == NULL) {
324  return;
325  }
326 
327  p->datalink = DLT_RAW;
328  p->proto = IPPROTO_TCP;
329 
330  /* So that DecodeMpipe ignores is. */
332 
333  p->flow = NULL;
334 
335  TmThreadsSlotProcessPkt(tv, slot, p);
336 }
337 
338 /**
339  * \brief Receives packets from an interface via gxio mpipe.
340  */
341 TmEcode ReceiveMpipeLoop(ThreadVars *tv, void *data, void *slot)
342 {
343  SCEnter();
344 
345  MpipeThreadVars *ptv = (MpipeThreadVars *)data;
346  TmSlot *s = (TmSlot *)slot;
347  ptv->slot = s->slot_next;
348  Packet *p = NULL;
349  int rank = tv->rank;
350  int max_queued = 0;
351  char *ctype;
352 
353  ptv->checksum_mode = CHECKSUM_VALIDATION_DISABLE;
354  if (ConfGet("mpipe.checksum-checks", &ctype) == 1) {
355  if (ConfValIsTrue(ctype)) {
356  ptv->checksum_mode = CHECKSUM_VALIDATION_ENABLE;
357  } else if (ConfValIsFalse(ctype)) {
358  ptv->checksum_mode = CHECKSUM_VALIDATION_DISABLE;
359  } else {
361  "Invalid value for checksum-check for mpipe");
362  }
363  }
364 
365  /* Open Ingress Queue for this worker thread. */
366  MpipeReceiveOpenIqueue(rank);
367  gxio_mpipe_iqueue_t* iqueue = thread_iqueue;
368  int update_counter = 0;
369  uint64_t last_packet_time = get_cycle_count();
370 
371  for (;;) {
372 
373  /* Check to see how many packets are available to process. */
374  gxio_mpipe_idesc_t *idesc;
375  int n = gxio_mpipe_iqueue_try_peek(iqueue, &idesc);
376  if (likely(n > 0)) {
377  int i;
378  int m = min(n, 16);
379 
380  /* Prefetch the idescs (64 bytes each). */
381  for (i = 0; i < m; i++) {
382  __insn_prefetch(&idesc[i]);
383  }
384  if (unlikely(n > max_queued)) {
385  StatsSetUI64(tv, ptv->max_mpipe_depth,
386  (uint64_t)n);
387  max_queued = n;
388  }
389  for (i = 0; i < m; i++, idesc++) {
390  if (likely(!gxio_mpipe_idesc_has_error(idesc))) {
391  p = MpipeProcessPacket(ptv, idesc);
392  p->mpipe_v.rank = rank;
393  if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
394  TmqhOutputPacketpool(ptv->tv, p);
396  }
397  } else {
398  if (idesc->be) {
399  /* Buffer Error - No buffer available, so mPipe
400  * dropped the packet. */
401  StatsIncr(tv, XlateStack(ptv, idesc->stack_idx));
402  } else {
403  /* Bad packet. CRC error */
404  StatsIncr(tv, ptv->mpipe_drop);
405  gxio_mpipe_iqueue_drop(iqueue, idesc);
406  }
407  gxio_mpipe_iqueue_release(iqueue, idesc);
408  }
409  }
410  /* Move forward M packets in ingress ring. */
411  gxio_mpipe_iqueue_advance(iqueue, m);
412 
413  last_packet_time = get_cycle_count();
414  }
415  if (update_counter-- <= 0) {
416  /* Only periodically update and check for termination. */
418  update_counter = 10000;
419 
420  if (suricata_ctl_flags != 0) {
421  break;
422  }
423 
424  // If no packet has been received for some period of time, process a NOP packet
425  // just to make sure that pseudo packets from the Flow manager get processed.
426  uint64_t now = get_cycle_count();
427  if (now - last_packet_time > 100000000) {
428  SendNoOpPacket(ptv->tv, ptv->slot);
429  last_packet_time = now;
430  }
431  }
432  }
434 }
435 
436 static void MpipeRegisterPerfCounters(MpipeThreadVars *ptv, ThreadVars *tv)
437 {
438  /* register counters */
439  ptv->max_mpipe_depth = StatsRegisterCounter("mpipe.max_mpipe_depth", tv);
440  ptv->mpipe_drop = StatsRegisterCounter("mpipe.drop", tv);
441  ptv->counter_no_buffers_0 = StatsRegisterCounter("mpipe.no_buf0", tv);
442  ptv->counter_no_buffers_1 = StatsRegisterCounter("mpipe.no_buf1", tv);
443  ptv->counter_no_buffers_2 = StatsRegisterCounter("mpipe.no_buf2", tv);
444  ptv->counter_no_buffers_3 = StatsRegisterCounter("mpipe.no_buf3", tv);
445  ptv->counter_no_buffers_4 = StatsRegisterCounter("mpipe.no_buf4", tv);
446  ptv->counter_no_buffers_5 = StatsRegisterCounter("mpipe.no_buf5", tv);
447  ptv->counter_no_buffers_6 = StatsRegisterCounter("mpipe.no_buf6", tv);
448  ptv->counter_no_buffers_7 = StatsRegisterCounter("mpipe.no_buf7", tv);
449 }
450 
451 static const gxio_mpipe_buffer_size_enum_t gxio_buffer_sizes[] = {
452  GXIO_MPIPE_BUFFER_SIZE_128,
453  GXIO_MPIPE_BUFFER_SIZE_256,
454  GXIO_MPIPE_BUFFER_SIZE_512,
455  GXIO_MPIPE_BUFFER_SIZE_1024,
456  GXIO_MPIPE_BUFFER_SIZE_1664,
457  GXIO_MPIPE_BUFFER_SIZE_4096,
458  GXIO_MPIPE_BUFFER_SIZE_10368,
459  GXIO_MPIPE_BUFFER_SIZE_16384
460 };
461 
462 static const int buffer_sizes[] = {
463  128,
464  256,
465  512,
466  1024,
467  1664,
468  4096,
469  10368,
470  16384
471 };
472 
473 static int NormalizeBufferWeights(float buffer_weights[], int num_weights)
474 {
475  int stack_count = 0;
476  /* Count required buffer stacks and normalize weights to sum to 1.0. */
477  float total_weight = 0;
478  for (int i = 0; i < num_weights; i++) {
479  if (buffer_weights[i] != 0) {
480  ++stack_count;
481  total_weight += buffer_weights[i];
482  }
483  }
484  /* Convert each weight to a value between 0 and 1. inclusive. */
485  for (int i = 0; i < num_weights; i++) {
486  if (buffer_weights[i] != 0) {
487  buffer_weights[i] /= total_weight;
488  }
489  }
490 
491  SCLogInfo("DEBUG: %u non-zero sized stacks", stack_count);
492  return stack_count;
493 }
494 
495 static TmEcode ReceiveMpipeAllocatePacketBuffers(void)
496 {
497  SCEnter();
498  int num_buffers;
499  int result;
500  int total_buffers = 0;
501 
502  /* Relative weighting for the number of buffers of each size.
503  */
504  float buffer_weight[] = {
505  0 , /* 128 */
506  4 , /* 256 */
507  0 , /* 512 */
508  0 , /* 1024 */
509  4 , /* 1664 */
510  0 , /* 4096 */
511  0 , /* 10386 */
512  0 /* 16384 */
513  };
514 
515  int num_weights = sizeof(buffer_weight)/sizeof(buffer_weight[0]);
516  if (ConfGetNode("mpipe.stack") != NULL) {
517  float weight;
518  for (int i = 0; i < num_weights; i++)
519  buffer_weight[i] = 0;
520  if (ConfGetFloat("mpipe.stack.size128", &weight))
521  buffer_weight[0] = weight;
522  if (ConfGetFloat("mpipe.stack.size256", &weight))
523  buffer_weight[1] = weight;
524  if (ConfGetFloat("mpipe.stack.size512", &weight))
525  buffer_weight[2] = weight;
526  if (ConfGetFloat("mpipe.stack.size1024", &weight))
527  buffer_weight[3] = weight;
528  if (ConfGetFloat("mpipe.stack.size1664", &weight))
529  buffer_weight[4] = weight;
530  if (ConfGetFloat("mpipe.stack.size4096", &weight))
531  buffer_weight[5] = weight;
532  if (ConfGetFloat("mpipe.stack.size10386", &weight))
533  buffer_weight[6] = weight;
534  if (ConfGetFloat("mpipe.stack.size16384", &weight))
535  buffer_weight[7] = weight;
536  }
537 
538  int stack_count = NormalizeBufferWeights(buffer_weight, num_weights);
539 
540  /* Allocate one of the largest pages to hold our buffer stack,
541  * notif ring, and packets. First get a bit map of the
542  * available page sizes. */
543  unsigned long available_pagesizes = tmc_alloc_get_pagesizes();
544 
545  void *packet_page = NULL;
546  size_t tile_vhuge_size = 64 * 1024;
547 
548  /* Try the largest available page size first to see if any
549  * pages of that size can be allocated. */
550  for (int i = sizeof(available_pagesizes) * 8 - 1; i; i--) {
551  unsigned long size = 1UL<<i;
552  if (available_pagesizes & size) {
553  tile_vhuge_size = (size_t)size;
554 
555  tmc_alloc_t alloc = TMC_ALLOC_INIT;
556  tmc_alloc_set_huge(&alloc);
557  tmc_alloc_set_home(&alloc, TMC_ALLOC_HOME_HASH);
558  if (tmc_alloc_set_pagesize_exact(&alloc, tile_vhuge_size) == NULL)
559  continue; // Couldn't get the page size requested
560  packet_page = tmc_alloc_map(&alloc, tile_vhuge_size);
561  if (packet_page)
562  break;
563  }
564  }
565  assert(packet_page);
566  void* packet_mem = packet_page;
567  SCLogInfo("DEBUG: tile_vhuge_size %"PRIuMAX, (uintmax_t)tile_vhuge_size);
568  /* Allocate one Huge page just to store buffer stacks, since they are
569  * only ever accessed by mPipe.
570  */
571  size_t stack_page_size = tmc_alloc_get_huge_pagesize();
572  tmc_alloc_t alloc = TMC_ALLOC_INIT;
573  tmc_alloc_set_huge(&alloc);
574  void *buffer_stack_page = tmc_alloc_map(&alloc, stack_page_size);
575  void *buffer_stack_mem = buffer_stack_page;
576  void *buffer_stack_mem_end = buffer_stack_mem + stack_page_size;
577  assert(buffer_stack_mem);
578 
579  /* Allocate buffer stacks. */
580  result = gxio_mpipe_alloc_buffer_stacks(context, stack_count, 0, 0);
581  VERIFY(result, "gxio_mpipe_alloc_buffer_stacks()");
582  int stack = result;
583  first_stack = stack;
584 
585  /* Divide up the Very Huge page into packet buffers. */
586  int i = 0;
587  for (int ss = 0; ss < stack_count; i++, ss++) {
588  /* Skip empty buffer stacks. */
589  for (;buffer_weight[i] == 0; i++) ;
590 
591  int stackidx = first_stack + ss;
592  /* Bytes from the Huge page used for this buffer stack. */
593  size_t packet_buffer_slice = tile_vhuge_size * buffer_weight[i];
594  int buffer_size = buffer_sizes[i];
595  num_buffers = packet_buffer_slice / (buffer_size + sizeof(Packet));
596 
597  /* Initialize the buffer stack. Must be aligned mod 64K. */
598  size_t stack_bytes = gxio_mpipe_calc_buffer_stack_bytes(num_buffers);
599  gxio_mpipe_buffer_size_enum_t buf_size = gxio_buffer_sizes[i];
600  result = gxio_mpipe_init_buffer_stack(context, stackidx, buf_size,
601  buffer_stack_mem, stack_bytes, 0);
602  VERIFY(result, "gxio_mpipe_init_buffer_stack()");
603  buffer_stack_mem += stack_bytes;
604 
605  /* Buffer stack must be aligned to 64KB page boundary. */
606  ALIGN(buffer_stack_mem, 0x10000);
607  assert(buffer_stack_mem < buffer_stack_mem_end);
608 
609  /* Register the entire huge page of memory which contains all
610  * the buffers.
611  */
612  result = gxio_mpipe_register_page(context, stackidx, packet_page,
613  tile_vhuge_size, 0);
614  VERIFY(result, "gxio_mpipe_register_page()");
615 
616  /* And register the memory holding the buffer stack. */
617  result = gxio_mpipe_register_page(context, stackidx,
618  buffer_stack_page,
619  stack_page_size, 0);
620  VERIFY(result, "gxio_mpipe_register_page()");
621 
622  total_buffers += num_buffers;
623 
624  SCLogInfo("Adding %d %d byte packet buffers",
625  num_buffers, buffer_size);
626 
627  /* Push some buffers onto the stack. */
628  for (int j = 0; j < num_buffers; j++) {
629  Packet *p = (Packet *)packet_mem;
630  memset(p, 0, sizeof(Packet));
632 
633  gxio_mpipe_push_buffer(context, stackidx,
634  packet_mem + sizeof(Packet));
635  packet_mem += (sizeof(Packet) + buffer_size);
636  }
637 
638  /* Paranoia. */
639  assert(packet_mem <= packet_page + tile_vhuge_size);
640  }
641  SCLogInfo("%d total packet buffers", total_buffers);
643 }
644 
645 static TmEcode ReceiveMpipeCreateBuckets(int ring, int num_workers,
646  int *first_bucket, int *num_buckets)
647 {
648  SCEnter();
649  int result;
650  int min_buckets = 256;
651 
652  /* Allocate a NotifGroup. */
653  int group = gxio_mpipe_alloc_notif_groups(context, 1, 0, 0);
654  VERIFY(group, "gxio_mpipe_alloc_notif_groups()");
655 
656  intmax_t value = 0;
657  if (ConfGetInt("mpipe.buckets", &value) == 1) {
658  /* range check */
659  if ((value >= 1) && (value <= 4096)) {
660  /* Must be a power of 2, so round up to next power of 2. */
661  int ceiling_log2 = 64 - __builtin_clz((int64_t)value - 1);
662  *num_buckets = 1 << (ceiling_log2);
663  } else {
665  "Illegal mpipe.buckets value (%ld). must be between 1 and 4096.", value);
666  }
667  }
668  if (ConfGetInt("mpipe.min-buckets", &value) == 1) {
669  /* range check */
670  if ((value >= 1) && (value <= 4096)) {
671  /* Must be a power of 2, so round up to next power of 2. */
672  int ceiling_log2 = 64 - __builtin_clz((int64_t)value - 1);
673  min_buckets = 1 << (ceiling_log2);
674  } else {
676  "Illegal min-mpipe.buckets value (%ld). must be between 1 and 4096.", value);
677  }
678  }
679 
680  /* Allocate buckets. Keep trying half the number of requested buckets until min-bucket is reached. */
681  while (1) {
682  *first_bucket = gxio_mpipe_alloc_buckets(context, *num_buckets, 0, 0);
683  if (*first_bucket != GXIO_MPIPE_ERR_NO_BUCKET)
684  break;
685  /* Failed to allocate the requested number of buckets. Keep
686  * trying less buckets until min-buckets is reached.
687  */
688  if (*num_buckets <= min_buckets) {
690  "Could not allocate (%d) mpipe buckets. "
691  "Try a smaller mpipe.buckets value in suricata.yaml", *num_buckets);
693  }
694  /* Cut the number of requested buckets in half and try again. */
695  *num_buckets /= 2;
696  }
697 
698  /* Init group and buckets, preserving packet order among flows. */
699  gxio_mpipe_bucket_mode_t mode = GXIO_MPIPE_BUCKET_STATIC_FLOW_AFFINITY;
700  char *balance;
701  if (ConfGet("mpipe.load-balance", &balance) == 1) {
702  if (balance) {
703  if (strcmp(balance, "static") == 0) {
704  mode = GXIO_MPIPE_BUCKET_STATIC_FLOW_AFFINITY;
705  SCLogInfo("Using \"static\" flow affinity load balancing with %d buckets.", *num_buckets);
706  } else if (strcmp(balance, "dynamic") == 0) {
707  mode = GXIO_MPIPE_BUCKET_DYNAMIC_FLOW_AFFINITY;
708  SCLogInfo("Using \"dynamic\" flow affinity load balancing with %d buckets.", *num_buckets);
709  } else if (strcmp(balance, "sticky") == 0) {
710  mode = GXIO_MPIPE_BUCKET_STICKY_FLOW_LOCALITY;
711  SCLogInfo("Using \"sticky\" load balancing with %d buckets.", *num_buckets);
712  } else if (strcmp(balance, "round-robin") == 0) {
713  mode = GXIO_MPIPE_BUCKET_ROUND_ROBIN;
714  } else {
716  "Illegal load balancing mode \"%s\"", balance);
717  balance = "static";
718  }
719  }
720  } else {
721  balance = "static";
722  }
723  SCLogInfo("Using \"%s\" load balancing with %d buckets.", balance, *num_buckets);
724 
725  result = gxio_mpipe_init_notif_group_and_buckets(context, group,
726  ring, num_workers,
727  *first_bucket,
728  *num_buckets,
729  mode);
730  VERIFY(result, "gxio_mpipe_init_notif_group_and_buckets()");
731 
733 }
734 
735 /* \brief Register mPIPE classifier rules to start receiving packets.
736  *
737  * \param Index of the first classifier bucket
738  * \param Number of classifier buckets.
739  *
740  * \return result code where <0 is an error.
741  */
742 static int ReceiveMpipeRegisterRules(int bucket, int num_buckets)
743 {
744  /* Register for packets. */
745  gxio_mpipe_rules_t rules;
746  gxio_mpipe_rules_init(&rules, context);
747  gxio_mpipe_rules_begin(&rules, bucket, num_buckets, NULL);
748  /* Give Suricata priority over Linux to receive packets. */
749  gxio_mpipe_rules_set_priority(&rules, -100);
750  return gxio_mpipe_rules_commit(&rules);
751 }
752 
753 /* \brief Initialize mPIPE ingress ring
754  *
755  * \param name of interface to open
756  * \param Array of port configuations
757  *
758  * \return Output port channel number, or -1 on error
759  */
760 static int MpipeReceiveOpenIqueue(int rank)
761 {
762  /* Initialize the NotifRings. */
763  size_t notif_ring_entries = 2048;
764  intmax_t value = 0;
765  if (ConfGetInt("mpipe.iqueue-packets", &value) == 1) {
766  /* range check */
767  if (value == 128 || value == 512 || value == 2048 || value == (64 * 1024)) {
768  notif_ring_entries = value;
769  } else {
770  SCLogError(SC_ERR_INVALID_ARGUMENT, "Illegal mpipe.iqueue_packets value. must be 128, 512, 2048 or 65536.");
771  }
772  }
773 
774  size_t notif_ring_size = notif_ring_entries * sizeof(gxio_mpipe_idesc_t);
775 
776  tmc_alloc_t alloc = TMC_ALLOC_INIT;
777  /* Allocate the memory locally on this thread's CPU. */
778  tmc_alloc_set_home(&alloc, TMC_ALLOC_HOME_TASK);
779  /* Allocate all the memory on one page. Which is required for the
780  notif ring, not the iqueue. */
781  if (notif_ring_size > (size_t)getpagesize())
782  tmc_alloc_set_huge(&alloc);
783  int needed = notif_ring_size + sizeof(gxio_mpipe_iqueue_t);
784  // TODO - Save the rest of the Huge Page for other allocations.
785  void *iqueue_mem = tmc_alloc_map(&alloc, needed);
786  if (iqueue_mem == NULL) {
787  SCLogError(SC_ERR_FATAL, "Failed to allocate memory for mPIPE iQueue");
788  return TM_ECODE_FAILED;
789  }
790 
791  thread_iqueue = iqueue_mem + notif_ring_size;
792  int result = gxio_mpipe_iqueue_init(thread_iqueue, context, first_notif_ring + rank,
793  iqueue_mem, notif_ring_size, 0);
794  if (result < 0) {
795  VERIFY(result, "gxio_mpipe_iqueue_init()");
796  }
797 
798  return TM_ECODE_OK;
799 }
800 
801 /* \brief Initialize on MPIPE egress port
802  *
803  * Initialize one mPIPE egress port for use in IPS mode.
804  * The port must be one of the input ports.
805  *
806  * \param name of interface to open
807  * \param Array of port configuations
808  *
809  * \return Output port channel number, or -1 on error
810  */
811 static int MpipeReceiveOpenEgress(char *out_iface, int iface_idx,
812  int copy_mode,
813  MpipeIfaceConfig *mpipe_conf[])
814 {
815  int channel;
816  int nlive = LiveGetDeviceCount();
817  int result;
818 
819  /* Initialize an equeue */
820  result = gxio_mpipe_alloc_edma_rings(context, 1, 0, 0);
821  if (result < 0) {
822  SCLogError(SC_ERR_FATAL, "Failed to allocate mPIPE egress ring");
823  return result;
824  }
825  uint32_t ering = result;
826  size_t edescs_size = equeue_entries * sizeof(gxio_mpipe_edesc_t);
827  tmc_alloc_t edescs_alloc = TMC_ALLOC_INIT;
828  tmc_alloc_set_pagesize(&edescs_alloc, edescs_size);
829  void *edescs = tmc_alloc_map(&edescs_alloc, edescs_size);
830  if (edescs == NULL) {
832  "Failed to allocate egress descriptors");
833  return -1;
834  }
835  /* retrieve channel of outbound interface */
836  for (int j = 0; j < nlive; j++) {
837  if (strcmp(out_iface, mpipe_conf[j]->iface) == 0) {
838  channel = gxio_mpipe_link_channel(&mpipe_link[j]);
839  SCLogInfo("egress link: %s is channel: %d",
840  out_iface, channel);
841  result = gxio_mpipe_equeue_init(&equeue[iface_idx],
842  context,
843  ering,
844  channel,
845  edescs,
846  edescs_size,
847  0);
848  if (result < 0) {
850  "mPIPE Failed to initialize egress queue");
851  return -1;
852  }
853  /* Record the mapping from ingress port to egress port.
854  * The egress information is stored indexed by ingress channel.
855  */
856  channel = gxio_mpipe_link_channel(&mpipe_link[iface_idx]);
857  channel_to_equeue[channel].peer_equeue = &equeue[iface_idx];
858  channel_to_equeue[channel].copy_mode = copy_mode;
859  if (copy_mode == MPIPE_COPY_MODE_IPS)
860  channel_to_equeue[channel].ReleasePacket = MpipeReleasePacketCopyIPS;
861  else
862  channel_to_equeue[channel].ReleasePacket = MpipeReleasePacketCopyTap;
863 
864  SCLogInfo("ingress link: %s is channel: %d copy_mode: %d",
865  out_iface, channel, copy_mode);
866 
867  return channel;
868  }
869  }
870 
871  /* Did not find matching interface name */
872  SCLogError(SC_ERR_INVALID_ARGUMENT, "Could not find egress interface: %s",
873  out_iface);
874  return -1;
875 }
876 
877 TmEcode ReceiveMpipeThreadInit(ThreadVars *tv, void *initdata, void **data)
878 {
879  SCEnter();
880  int rank = tv->rank;
881  int num_buckets = 4096;
882  int num_workers = tile_num_pipelines;
883 
884  if (initdata == NULL) {
885  SCLogError(SC_ERR_INVALID_ARGUMENT, "initdata == NULL");
887  }
888 
889  MpipeThreadVars *ptv = SCMalloc(sizeof(MpipeThreadVars));
890  if (unlikely(ptv == NULL))
892 
893  memset(ptv, 0, sizeof(MpipeThreadVars));
894 
895  ptv->tv = tv;
896 
897  int result;
898  const char *link_name = (char *)initdata;
899 
900  MpipeRegisterPerfCounters(ptv, tv);
901 
902  *data = (void *)ptv;
903 
904  /* Only rank 0 does initialization of mpipe */
905  if (rank != 0)
907 
908  /* Initialize and configure mPIPE, which is only done by one core. */
909 
910  if (strcmp(link_name, "multi") == 0) {
911  int nlive = LiveGetDeviceCount();
912  int instance = gxio_mpipe_link_instance(LiveGetDeviceName(0));
913  for (int i = 1; i < nlive; i++) {
914  link_name = LiveGetDeviceName(i);
915  if (gxio_mpipe_link_instance(link_name) != instance) {
917  "All interfaces not on same mpipe instance");
919  }
920  }
921  result = gxio_mpipe_init(context, instance);
922  VERIFY(result, "gxio_mpipe_init()");
923  /* open ingress interfaces */
924  for (int i = 0; i < nlive; i++) {
925  link_name = LiveGetDeviceName(i);
926  SCLogInfo("opening interface %s", link_name);
927  result = gxio_mpipe_link_open(&mpipe_link[i], context,
928  link_name, 0);
929  VERIFY(result, "gxio_mpipe_link_open()");
930  mpipe_conf[i] = ParseMpipeConfig(link_name);
931  }
932  /* find and open egress interfaces for IPS modes */
933  for (int i = 0; i < nlive; i++) {
934  MpipeIfaceConfig *aconf = mpipe_conf[i];
935  if (aconf != NULL) {
936  if (aconf->copy_mode != MPIPE_COPY_MODE_NONE) {
937  int channel = MpipeReceiveOpenEgress(aconf->out_iface,
938  i, aconf->copy_mode,
939  mpipe_conf);
940  if (channel < 0) {
942  }
943  }
944  }
945  }
946  } else {
947  SCLogInfo("using single interface %s", (char *)initdata);
948 
949  /* Start the driver. */
950  result = gxio_mpipe_init(context, gxio_mpipe_link_instance(link_name));
951  VERIFY(result, "gxio_mpipe_init()");
952 
953  gxio_mpipe_link_t link;
954  result = gxio_mpipe_link_open(&link, context, link_name, 0);
955  VERIFY(result, "gxio_mpipe_link_open()");
956  }
957  /* Allocate some NotifRings. */
958  result = gxio_mpipe_alloc_notif_rings(context,
959  num_workers,
960  0, 0);
961  VERIFY(result, "gxio_mpipe_alloc_notif_rings()");
962  first_notif_ring = result;
963 
964  int first_bucket = 0;
965  int rc;
966  rc = ReceiveMpipeCreateBuckets(first_notif_ring, num_workers,
967  &first_bucket, &num_buckets);
968  if (rc != TM_ECODE_OK)
969  SCReturnInt(rc);
970 
971  rc = ReceiveMpipeAllocatePacketBuffers();
972  if (rc != TM_ECODE_OK)
973  SCReturnInt(rc);
974 
975  result = ReceiveMpipeRegisterRules(first_bucket, num_buckets);
976  if (result < 0) {
978  "Registering mPIPE classifier rules, %s",
979  gxio_strerror(result));
981  }
982 
984 }
985 
986 TmEcode ReceiveMpipeInit(void)
987 {
988  SCEnter();
989 
990  SCLogInfo("tile_num_pipelines: %d", tile_num_pipelines);
991 
993 }
994 
995 /**
996  * \brief This function prints stats to the screen at exit.
997  * \param tv pointer to ThreadVars
998  * \param data pointer that gets cast into NetiohreadVars for ptv
999  */
1000 void ReceiveMpipeThreadExitStats(ThreadVars *tv, void *data)
1001 {
1002  SCEnter();
1003  SCReturn;
1004 }
1005 
1006 TmEcode DecodeMpipeThreadInit(ThreadVars *tv, void *initdata, void **data)
1007 {
1008  SCEnter();
1009  DecodeThreadVars *dtv = NULL;
1010 
1011  dtv = DecodeThreadVarsAlloc(tv);
1012 
1013  if (dtv == NULL)
1015 
1016  DecodeRegisterPerfCounters(dtv, tv);
1017 
1018  *data = (void *)dtv;
1019 
1021 }
1022 
1023 TmEcode DecodeMpipeThreadDeinit(ThreadVars *tv, void *data)
1024 {
1025  if (data != NULL)
1026  DecodeThreadVarsFree(tv, data);
1028 }
1029 
1030 TmEcode DecodeMpipe(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
1031  PacketQueue *postq)
1032 {
1033  SCEnter();
1034  DecodeThreadVars *dtv = (DecodeThreadVars *)data;
1035 
1036  /* XXX HACK: flow timeout can call us for injected pseudo packets
1037  * see bug: https://redmine.openinfosecfoundation.org/issues/1107 */
1038  if (p->flags & PKT_PSEUDO_STREAM_END)
1039  return TM_ECODE_OK;
1040 
1041  /* update counters */
1042  DecodeUpdatePacketCounters(tv, dtv, p);
1043 
1044  /* call the decoder */
1045  DecodeEthernet(tv, dtv, p, GET_PKT_DATA(p), GET_PKT_LEN(p), pq);
1046 
1047  PacketDecodeFinalize(tv, dtv, p);
1048 
1050 }
1051 
1052 /**
1053  * \brief Add a mpipe device for monitoring
1054  *
1055  * \param dev string with the device name
1056  *
1057  * \retval 0 on success.
1058  * \retval -1 on failure.
1059  */
1060 int MpipeLiveRegisterDevice(char *dev)
1061 {
1062  MpipeDevice *nd = SCMalloc(sizeof(MpipeDevice));
1063  if (unlikely(nd == NULL)) {
1064  return -1;
1065  }
1066 
1067  nd->dev = SCStrdup(dev);
1068  if (unlikely(nd->dev == NULL)) {
1069  SCFree(nd);
1070  return -1;
1071  }
1072  TAILQ_INSERT_TAIL(&mpipe_devices, nd, next);
1073 
1074  SCLogDebug("Mpipe device \"%s\" registered.", dev);
1075  return 0;
1076 }
1077 
1078 /**
1079  * \brief Get the number of registered devices
1080  *
1081  * \retval cnt the number of registered devices
1082  */
1083 int MpipeLiveGetDeviceCount(void)
1084 {
1085  int i = 0;
1086  MpipeDevice *nd;
1087 
1088  TAILQ_FOREACH(nd, &mpipe_devices, next) {
1089  i++;
1090  }
1091 
1092  return i;
1093 }
1094 
1095 #endif // HAVE_MPIPE
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:32
const char * LiveGetDeviceName(int number)
Get a pointer to the device name at idx.
Definition: util-device.c:168
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:562
#define SCLogDebug(...)
Definition: util-debug.h:335
struct Flow_ * flow
Definition: decode.h:444
uint8_t cap_flags
Definition: tm-modules.h:67
#define TAILQ_FOREACH(var, head, field)
Definition: queue.h:350
struct HtpBodyChunk_ * next
uint8_t flags
Definition: tm-modules.h:70
#define PACKET_TEST_ACTION(p, a)
Definition: decode.h:870
void PacketDecodeFinalize(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Finalize decoding of a packet.
Definition: decode.c:114
#define unlikely(expr)
Definition: util-optimize.h:35
TmEcode(* Func)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *)
Definition: tm-modules.h:52
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:431
#define PACKET_INITIALIZE(p)
Initialize a packet structure for use.
Definition: decode.h:742
#define PACKET_RECYCLE(p)
Definition: decode.h:827
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:201
int LiveGetDeviceCount(void)
Get the number of registered devices.
Definition: util-device.c:148
#define TAILQ_HEAD(name, type)
Definition: queue.h:321
uint16_t rank
Definition: threadvars.h:95
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
void(* ReleasePacket)(struct Packet_ *)
Definition: decode.h:487
Packet * PacketPoolGetPacket(void)
Get a new packet from the packet pool.
int ConfGet(const char *name, const char **vptr)
Retrieve the value of a configuration node.
Definition: conf.c:331
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1143
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:31
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:939
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:185
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
int datalink
Definition: decode.h:579
uint8_t proto
Definition: decode.h:429
int DecodeEthernet(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, uint32_t len, PacketQueue *pq)
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:294
uint8_t group
Structure to hold thread specific data for all decode modules.
Definition: decode.h:642
void(* RegisterTests)(void)
Definition: tm-modules.h:65
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
#define SCEnter(...)
Definition: util-debug.h:337
struct TmSlot_ * slot_next
Definition: tm-threads.h:72
int ConfValIsFalse(const char *val)
Check if a value is false.
Definition: conf.c:591
int tile_num_pipelines
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:163
#define PKT_IGNORE_CHECKSUM
Definition: decode.h:1110
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1102
#define SCReturnInt(x)
Definition: util-debug.h:341
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:281
int ConfValIsTrue(const char *val)
Check if a value is true.
Definition: conf.c:566
#define TAILQ_INSERT_TAIL(head, elm, field)
Definition: queue.h:385
#define SC_CAP_NET_RAW
Definition: util-privs.h:32
const char * name
Definition: tm-modules.h:44
#define SCMalloc(a)
Definition: util-mem.h:174
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:254
int PacketSetData(Packet *p, uint8_t *pktdata, uint32_t pktlen)
Set data for Packet and set length when zeo copy is used.
Definition: decode.c:608
#define SCFree(a)
Definition: util-mem.h:236
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.h:73
#define TAILQ_ENTRY(type)
Definition: queue.h:330
int ConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition: conf.c:437
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:136
SCMutex m
Definition: flow-hash.h:105
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
ChecksumValidationMode
Definition: decode.h:40
ConfNode * ConfGetNode(const char *name)
Get a ConfNode by name.
Definition: conf.c:176
int ConfGetFloat(const char *name, float *val)
Retrieve a configuration value as a float.
Definition: conf.c:645
#define GET_PKT_DATA(p)
Definition: decode.h:224
void DecodeUpdatePacketCounters(ThreadVars *tv, const DecodeThreadVars *dtv, const Packet *p)
Definition: decode.c:528
#define SCStrdup(a)
Definition: util-mem.h:220
struct Packet_ Packet
void * ParseMpipeConfig(const char *iface)
#define LINKTYPE_ETHERNET
Definition: decode.h:1081
#define SCReturn
Definition: util-debug.h:339
Per thread variable structure.
Definition: threadvars.h:57
struct timeval ts
Definition: decode.h:450
#define GET_PKT_LEN(p)
Definition: decode.h:223
#define TAILQ_HEAD_INITIALIZER(head)
Definition: queue.h:327
#define ACTION_DROP
uint32_t flags
Definition: decode.h:442
#define likely(expr)
Definition: util-optimize.h:32
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:588