suricata
tmqh-flow.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2013 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  *
24  * Simple output queue handler that makes sure all packets of the same flow
25  * are sent to the same queue. We support different kind of q handlers. Have
26  * a look at "autofp-scheduler" conf to further undertsand the various q
27  * handlers we provide.
28  */
29 
30 #include "suricata.h"
31 #include "packet-queue.h"
32 #include "decode.h"
33 #include "threads.h"
34 #include "threadvars.h"
35 #include "tmqh-flow.h"
36 
37 #include "tm-queuehandlers.h"
38 
39 #include "conf.h"
40 #include "util-unittest.h"
41 
45 void *TmqhOutputFlowSetupCtx(const char *queue_str);
46 void TmqhOutputFlowFreeCtx(void *ctx);
47 void TmqhFlowRegisterTests(void);
48 
49 void TmqhFlowRegister(void)
50 {
51  tmqh_table[TMQH_FLOW].name = "flow";
56 
57  const char *scheduler = NULL;
58  if (ConfGet("autofp-scheduler", &scheduler) == 1) {
59  if (strcasecmp(scheduler, "round-robin") == 0) {
60  SCLogNotice("using flow hash instead of round robin");
62  } else if (strcasecmp(scheduler, "active-packets") == 0) {
63  SCLogNotice("using flow hash instead of active packets");
65  } else if (strcasecmp(scheduler, "hash") == 0) {
67  } else if (strcasecmp(scheduler, "ippair") == 0) {
69  } else {
70  SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" "
71  "for autofp-scheduler in conf. Killing engine.",
72  scheduler);
73  exit(EXIT_FAILURE);
74  }
75  } else {
77  }
78 
79  return;
80 }
81 
83 {
84 #define PRINT_IF_FUNC(f, msg) \
85  if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \
86  SCLogConfig("AutoFP mode using \"%s\" flow load balancer", (msg))
87 
90 
91 #undef PRINT_IF_FUNC
92 }
93 
94 /* same as 'simple' */
96 {
97  PacketQueue *q = &trans_q[tv->inq->id];
98 
100 
101  SCMutexLock(&q->mutex_q);
102  if (q->len == 0) {
103  /* if we have no packets in queue, wait... */
104  SCCondWait(&q->cond_q, &q->mutex_q);
105  }
106 
107  if (q->len > 0) {
108  Packet *p = PacketDequeue(q);
109  SCMutexUnlock(&q->mutex_q);
110  return p;
111  } else {
112  /* return NULL if we have no pkt. Should only happen on signals. */
113  SCMutexUnlock(&q->mutex_q);
114  return NULL;
115  }
116 }
117 
118 static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
119 {
120  void *ptmp;
121  Tmq *tmq = TmqGetQueueByName(name);
122  if (tmq == NULL) {
123  tmq = TmqCreateQueue(name);
124  if (tmq == NULL)
125  return -1;
126  }
127  tmq->writer_cnt++;
128 
129  uint16_t id = tmq->id;
130 
131  if (ctx->queues == NULL) {
132  ctx->size = 1;
133  ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode));
134  if (ctx->queues == NULL) {
135  return -1;
136  }
137  memset(ctx->queues, 0, ctx->size * sizeof(TmqhFlowMode));
138  } else {
139  ctx->size++;
140  ptmp = SCRealloc(ctx->queues, ctx->size * sizeof(TmqhFlowMode));
141  if (ptmp == NULL) {
142  SCFree(ctx->queues);
143  ctx->queues = NULL;
144  return -1;
145  }
146  ctx->queues = ptmp;
147 
148  memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
149  }
150  ctx->queues[ctx->size - 1].q = &trans_q[id];
151 
152  return 0;
153 }
154 
155 /**
156  * \brief setup the queue handlers ctx
157  *
158  * Parses a comma separated string "queuename1,queuename2,etc"
159  * and sets the ctx up to devide flows over these queue's.
160  *
161  * \param queue_str comma separated string with output queue names
162  *
163  * \retval ctx queues handlers ctx or NULL in error
164  */
165 void *TmqhOutputFlowSetupCtx(const char *queue_str)
166 {
167  if (queue_str == NULL || strlen(queue_str) == 0)
168  return NULL;
169 
170  SCLogDebug("queue_str %s", queue_str);
171 
172  TmqhFlowCtx *ctx = SCMalloc(sizeof(TmqhFlowCtx));
173  if (unlikely(ctx == NULL))
174  return NULL;
175  memset(ctx,0x00,sizeof(TmqhFlowCtx));
176 
177  char *str = SCStrdup(queue_str);
178  if (unlikely(str == NULL)) {
179  goto error;
180  }
181  char *tstr = str;
182 
183  /* parse the comma separated string */
184  do {
185  char *comma = strchr(tstr,',');
186  if (comma != NULL) {
187  *comma = '\0';
188  char *qname = tstr;
189  int r = StoreQueueId(ctx,qname);
190  if (r < 0)
191  goto error;
192  } else {
193  char *qname = tstr;
194  int r = StoreQueueId(ctx,qname);
195  if (r < 0)
196  goto error;
197  }
198  tstr = comma ? (comma + 1) : comma;
199  } while (tstr != NULL);
200 
201  SCFree(str);
202  return (void *)ctx;
203 
204 error:
205  SCFree(ctx);
206  if (str != NULL)
207  SCFree(str);
208  return NULL;
209 }
210 
211 void TmqhOutputFlowFreeCtx(void *ctx)
212 {
213  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
214 
215  SCLogPerf("AutoFP - Total flow handler queues - %" PRIu16,
216  fctx->size);
217  SCFree(fctx->queues);
218  SCFree(fctx);
219 
220  return;
221 }
222 
224 {
225  int16_t qid = 0;
226 
227  TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
228 
229  if (p->flags & PKT_WANTS_FLOW) {
230  uint32_t hash = p->flow_hash;
231  qid = hash % ctx->size;
232  } else {
233  qid = ctx->last++;
234 
235  if (ctx->last == ctx->size)
236  ctx->last = 0;
237  }
238 
239  PacketQueue *q = ctx->queues[qid].q;
240  SCMutexLock(&q->mutex_q);
241  PacketEnqueue(q, p);
242  SCCondSignal(&q->cond_q);
243  SCMutexUnlock(&q->mutex_q);
244 
245  return;
246 }
247 
248 /**
249  * \brief select the queue to output based on IP address pair.
250  *
251  * \param tv thread vars.
252  * \param p packet.
253  */
255 {
256  int16_t qid = 0;
257  uint32_t addr_hash = 0;
258  int i;
259 
260  TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
261 
262  if (p->src.family == AF_INET6) {
263  for (i = 0; i < 4; i++) {
264  addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
265  }
266  } else {
267  addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
268  }
269 
270  /* we don't have to worry about possible overflow, since
271  * ctx->size will be lesser than 2 ** 31 for sure */
272  qid = addr_hash % ctx->size;
273 
274  PacketQueue *q = ctx->queues[qid].q;
275  SCMutexLock(&q->mutex_q);
276  PacketEnqueue(q, p);
277  SCCondSignal(&q->cond_q);
278  SCMutexUnlock(&q->mutex_q);
279 
280  return;
281 }
282 
283 #ifdef UNITTESTS
284 
285 static int TmqhOutputFlowSetupCtxTest01(void)
286 {
287  int retval = 0;
288  Tmq *tmq = NULL;
289  TmqhFlowCtx *fctx = NULL;
290 
291  TmqResetQueues();
292 
293  tmq = TmqCreateQueue("queue1");
294  if (tmq == NULL)
295  goto end;
296  tmq = TmqCreateQueue("queue2");
297  if (tmq == NULL)
298  goto end;
299  tmq = TmqCreateQueue("another");
300  if (tmq == NULL)
301  goto end;
302  tmq = TmqCreateQueue("yetanother");
303  if (tmq == NULL)
304  goto end;
305 
306  const char *str = "queue1,queue2,another,yetanother";
307  void *ctx = TmqhOutputFlowSetupCtx(str);
308 
309  if (ctx == NULL)
310  goto end;
311 
312  fctx = (TmqhFlowCtx *)ctx;
313 
314  if (fctx->size != 4)
315  goto end;
316 
317  if (fctx->queues == NULL)
318  goto end;
319 
320  if (fctx->queues[0].q != &trans_q[0])
321  goto end;
322  if (fctx->queues[1].q != &trans_q[1])
323  goto end;
324  if (fctx->queues[2].q != &trans_q[2])
325  goto end;
326  if (fctx->queues[3].q != &trans_q[3])
327  goto end;
328 
329  retval = 1;
330 end:
331  if (fctx != NULL)
332  TmqhOutputFlowFreeCtx(fctx);
333  TmqResetQueues();
334  return retval;
335 }
336 
337 static int TmqhOutputFlowSetupCtxTest02(void)
338 {
339  int retval = 0;
340  Tmq *tmq = NULL;
341  TmqhFlowCtx *fctx = NULL;
342 
343  TmqResetQueues();
344 
345  tmq = TmqCreateQueue("queue1");
346  if (tmq == NULL)
347  goto end;
348  tmq = TmqCreateQueue("queue2");
349  if (tmq == NULL)
350  goto end;
351  tmq = TmqCreateQueue("another");
352  if (tmq == NULL)
353  goto end;
354  tmq = TmqCreateQueue("yetanother");
355  if (tmq == NULL)
356  goto end;
357 
358  const char *str = "queue1";
359  void *ctx = TmqhOutputFlowSetupCtx(str);
360 
361  if (ctx == NULL)
362  goto end;
363 
364  fctx = (TmqhFlowCtx *)ctx;
365 
366  if (fctx->size != 1)
367  goto end;
368 
369  if (fctx->queues == NULL)
370  goto end;
371 
372  if (fctx->queues[0].q != &trans_q[0])
373  goto end;
374 
375  retval = 1;
376 end:
377  if (fctx != NULL)
378  TmqhOutputFlowFreeCtx(fctx);
379  TmqResetQueues();
380  return retval;
381 }
382 
383 static int TmqhOutputFlowSetupCtxTest03(void)
384 {
385  int retval = 0;
386  TmqhFlowCtx *fctx = NULL;
387 
388  TmqResetQueues();
389 
390  const char *str = "queue1,queue2,another,yetanother";
391  void *ctx = TmqhOutputFlowSetupCtx(str);
392 
393  if (ctx == NULL)
394  goto end;
395 
396  fctx = (TmqhFlowCtx *)ctx;
397 
398  if (fctx->size != 4)
399  goto end;
400 
401  if (fctx->queues == NULL)
402  goto end;
403 
404  if (fctx->queues[0].q != &trans_q[0])
405  goto end;
406  if (fctx->queues[1].q != &trans_q[1])
407  goto end;
408  if (fctx->queues[2].q != &trans_q[2])
409  goto end;
410  if (fctx->queues[3].q != &trans_q[3])
411  goto end;
412 
413  retval = 1;
414 end:
415  if (fctx != NULL)
416  TmqhOutputFlowFreeCtx(fctx);
417  TmqResetQueues();
418  return retval;
419 }
420 
421 #endif /* UNITTESTS */
422 
424 {
425 #ifdef UNITTESTS
426  UtRegisterTest("TmqhOutputFlowSetupCtxTest01",
427  TmqhOutputFlowSetupCtxTest01);
428  UtRegisterTest("TmqhOutputFlowSetupCtxTest02",
429  TmqhOutputFlowSetupCtxTest02);
430  UtRegisterTest("TmqhOutputFlowSetupCtxTest03",
431  TmqhOutputFlowSetupCtxTest03);
432 #endif
433 
434  return;
435 }
Packet *(* InHandler)(ThreadVars *)
const char * name
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:56
void TmqResetQueues(void)
Definition: tm-queues.c:79
Tmqh tmqh_table[TMQH_SIZE]
#define SCLogDebug(...)
Definition: util-debug.h:335
void(* RegisterTests)(void)
void * TmqhOutputFlowSetupCtx(const char *queue_str)
setup the queue handlers ctx
Definition: tmqh-flow.c:165
SCCondT cond_q
Definition: decode.h:629
uint16_t writer_cnt
Definition: tm-queues.h:31
#define unlikely(expr)
Definition: util-optimize.h:35
uint16_t id
Definition: tm-queues.h:29
Address dst
Definition: decode.h:414
#define SCCondWait
#define SCMutexLock(mut)
int ConfGet(const char *name, const char **vptr)
Retrieve the value of a configuration node.
Definition: conf.c:331
uint16_t size
Definition: tmqh-flow.h:35
void TmqhFlowRegisterTests(void)
Definition: tmqh-flow.c:423
void(* OutHandlerCtxFree)(void *)
#define str(s)
#define SCMutexUnlock(mut)
char family
Definition: decode.h:112
Definition: tm-queues.h:27
uint32_t len
Definition: decode.h:624
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:294
void UtRegisterTest(const char *name, int(*TestFn)(void))
Register unit test.
void TmqhOutputFlowFreeCtx(void *ctx)
Definition: tmqh-flow.c:211
void TmqhFlowPrintAutofpHandler(void)
Definition: tmqh-flow.c:82
void TmqhOutputFlowHash(ThreadVars *t, Packet *p)
Definition: tmqh-flow.c:223
TmqhFlowMode * queues
Definition: tmqh-flow.h:38
void(* OutHandler)(ThreadVars *, Packet *)
#define SCRealloc(x, a)
Definition: util-mem.h:238
void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p)
select the queue to output based on IP address pair.
Definition: tmqh-flow.c:254
#define SCMalloc(a)
Definition: util-mem.h:222
ThreadVars * tv
Definition: tm-threads.h:55
#define SCFree(a)
Definition: util-mem.h:322
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:269
#define SCCondSignal
void TmqhFlowRegister(void)
Definition: tmqh-flow.c:49
PacketQueue trans_q[256]
Definition: suricata.h:132
#define SCLogPerf(...)
Definition: util-debug.h:261
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:137
PacketQueue * q
Definition: tmqh-flow.h:28
void * outctx
Definition: threadvars.h:74
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:167
#define SCStrdup(a)
Definition: util-mem.h:268
#define PKT_WANTS_FLOW
Definition: decode.h:1115
Ctx for the flow queue handler.
Definition: tmqh-flow.h:34
uint16_t last
Definition: tmqh-flow.h:36
uint32_t flow_hash
Definition: decode.h:450
Per thread variable structure.
Definition: threadvars.h:57
uint32_t flags
Definition: decode.h:444
void *(* OutHandlerCtxSetup)(const char *)
Tmq * inq
Definition: threadvars.h:72
int id
Definition: tm-threads.h:84
Tmq * TmqCreateQueue(const char *name)
Definition: tm-queues.c:36
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:139
SCMutex mutex_q
Definition: decode.h:628
Address src
Definition: decode.h:413
#define PRINT_IF_FUNC(f, msg)
Packet * TmqhInputFlow(ThreadVars *t)
Definition: tmqh-flow.c:95