suricata
tmqh-flow.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2013 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  *
24  * Simple output queue handler that makes sure all packets of the same flow
25  * are sent to the same queue. We support different kind of q handlers. Have
26  * a look at "autofp-scheduler" conf to further undertsand the various q
27  * handlers we provide.
28  */
29 
30 #include "suricata.h"
31 #include "packet-queue.h"
32 #include "decode.h"
33 #include "threads.h"
34 #include "threadvars.h"
35 #include "tmqh-flow.h"
36 
37 #include "tm-queuehandlers.h"
38 
39 #include "conf.h"
40 #include "util-unittest.h"
41 
45 void *TmqhOutputFlowSetupCtx(const char *queue_str);
46 void TmqhOutputFlowFreeCtx(void *ctx);
47 void TmqhFlowRegisterTests(void);
48 
49 void TmqhFlowRegister(void)
50 {
51  tmqh_table[TMQH_FLOW].name = "flow";
56 
57  const char *scheduler = NULL;
58  if (ConfGet("autofp-scheduler", &scheduler) == 1) {
59  if (strcasecmp(scheduler, "round-robin") == 0) {
60  SCLogNotice("using flow hash instead of round robin");
62  } else if (strcasecmp(scheduler, "active-packets") == 0) {
63  SCLogNotice("using flow hash instead of active packets");
65  } else if (strcasecmp(scheduler, "hash") == 0) {
67  } else if (strcasecmp(scheduler, "ippair") == 0) {
69  } else {
70  SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" "
71  "for autofp-scheduler in conf. Killing engine.",
72  scheduler);
73  exit(EXIT_FAILURE);
74  }
75  } else {
77  }
78 
79  return;
80 }
81 
83 {
84 #define PRINT_IF_FUNC(f, msg) \
85  if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \
86  SCLogConfig("AutoFP mode using \"%s\" flow load balancer", (msg))
87 
90 
91 #undef PRINT_IF_FUNC
92 }
93 
94 /* same as 'simple' */
96 {
97  PacketQueue *q = tv->inq->pq;
98 
100 
101  SCMutexLock(&q->mutex_q);
102  if (q->len == 0) {
103  /* if we have no packets in queue, wait... */
104  SCCondWait(&q->cond_q, &q->mutex_q);
105  }
106 
107  if (q->len > 0) {
108  Packet *p = PacketDequeue(q);
109  SCMutexUnlock(&q->mutex_q);
110  return p;
111  } else {
112  /* return NULL if we have no pkt. Should only happen on signals. */
113  SCMutexUnlock(&q->mutex_q);
114  return NULL;
115  }
116 }
117 
118 static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
119 {
120  void *ptmp;
121  Tmq *tmq = TmqGetQueueByName(name);
122  if (tmq == NULL) {
123  tmq = TmqCreateQueue(name);
124  if (tmq == NULL)
125  return -1;
126  }
127  tmq->writer_cnt++;
128 
129  if (ctx->queues == NULL) {
130  ctx->size = 1;
131  ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode));
132  if (ctx->queues == NULL) {
133  return -1;
134  }
135  memset(ctx->queues, 0, ctx->size * sizeof(TmqhFlowMode));
136  } else {
137  ctx->size++;
138  ptmp = SCRealloc(ctx->queues, ctx->size * sizeof(TmqhFlowMode));
139  if (ptmp == NULL) {
140  SCFree(ctx->queues);
141  ctx->queues = NULL;
142  return -1;
143  }
144  ctx->queues = ptmp;
145 
146  memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
147  }
148  ctx->queues[ctx->size - 1].q = tmq->pq;
149 
150  return 0;
151 }
152 
153 /**
154  * \brief setup the queue handlers ctx
155  *
156  * Parses a comma separated string "queuename1,queuename2,etc"
157  * and sets the ctx up to devide flows over these queue's.
158  *
159  * \param queue_str comma separated string with output queue names
160  *
161  * \retval ctx queues handlers ctx or NULL in error
162  */
163 void *TmqhOutputFlowSetupCtx(const char *queue_str)
164 {
165  if (queue_str == NULL || strlen(queue_str) == 0)
166  return NULL;
167 
168  SCLogDebug("queue_str %s", queue_str);
169 
170  TmqhFlowCtx *ctx = SCMalloc(sizeof(TmqhFlowCtx));
171  if (unlikely(ctx == NULL))
172  return NULL;
173  memset(ctx,0x00,sizeof(TmqhFlowCtx));
174 
175  char *str = SCStrdup(queue_str);
176  if (unlikely(str == NULL)) {
177  goto error;
178  }
179  char *tstr = str;
180 
181  /* parse the comma separated string */
182  do {
183  char *comma = strchr(tstr,',');
184  if (comma != NULL) {
185  *comma = '\0';
186  char *qname = tstr;
187  int r = StoreQueueId(ctx,qname);
188  if (r < 0)
189  goto error;
190  } else {
191  char *qname = tstr;
192  int r = StoreQueueId(ctx,qname);
193  if (r < 0)
194  goto error;
195  }
196  tstr = comma ? (comma + 1) : comma;
197  } while (tstr != NULL);
198 
199  SCFree(str);
200  return (void *)ctx;
201 
202 error:
203  SCFree(ctx);
204  if (str != NULL)
205  SCFree(str);
206  return NULL;
207 }
208 
209 void TmqhOutputFlowFreeCtx(void *ctx)
210 {
211  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
212 
213  SCLogPerf("AutoFP - Total flow handler queues - %" PRIu16,
214  fctx->size);
215  SCFree(fctx->queues);
216  SCFree(fctx);
217 
218  return;
219 }
220 
222 {
223  int16_t qid = 0;
224 
225  TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
226 
227  if (p->flags & PKT_WANTS_FLOW) {
228  uint32_t hash = p->flow_hash;
229  qid = hash % ctx->size;
230  } else {
231  qid = ctx->last++;
232 
233  if (ctx->last == ctx->size)
234  ctx->last = 0;
235  }
236 
237  PacketQueue *q = ctx->queues[qid].q;
238  SCMutexLock(&q->mutex_q);
239  PacketEnqueue(q, p);
240  SCCondSignal(&q->cond_q);
241  SCMutexUnlock(&q->mutex_q);
242 
243  return;
244 }
245 
246 /**
247  * \brief select the queue to output based on IP address pair.
248  *
249  * \param tv thread vars.
250  * \param p packet.
251  */
253 {
254  int16_t qid = 0;
255  uint32_t addr_hash = 0;
256  int i;
257 
258  TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
259 
260  if (p->src.family == AF_INET6) {
261  for (i = 0; i < 4; i++) {
262  addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
263  }
264  } else {
265  addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
266  }
267 
268  /* we don't have to worry about possible overflow, since
269  * ctx->size will be lesser than 2 ** 31 for sure */
270  qid = addr_hash % ctx->size;
271 
272  PacketQueue *q = ctx->queues[qid].q;
273  SCMutexLock(&q->mutex_q);
274  PacketEnqueue(q, p);
275  SCCondSignal(&q->cond_q);
276  SCMutexUnlock(&q->mutex_q);
277 
278  return;
279 }
280 
281 #ifdef UNITTESTS
282 
283 static int TmqhOutputFlowSetupCtxTest01(void)
284 {
285  TmqResetQueues();
286 
287  Tmq *tmq1 = TmqCreateQueue("queue1");
288  FAIL_IF_NULL(tmq1);
289  Tmq *tmq2 = TmqCreateQueue("queue2");
290  FAIL_IF_NULL(tmq2);
291  Tmq *tmq3 = TmqCreateQueue("another");
292  FAIL_IF_NULL(tmq3);
293  Tmq *tmq4 = TmqCreateQueue("yetanother");
294  FAIL_IF_NULL(tmq4);
295 
296  const char *str = "queue1,queue2,another,yetanother";
297  void *ctx = TmqhOutputFlowSetupCtx(str);
298  FAIL_IF_NULL(ctx);
299 
300  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
301 
302  FAIL_IF_NOT(fctx->size == 4);
303 
304  FAIL_IF_NULL(fctx->queues);
305 
306  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
307  FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
308  FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
309  FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
310 
311  TmqhOutputFlowFreeCtx(fctx);
312  TmqResetQueues();
313  PASS;
314 }
315 
316 static int TmqhOutputFlowSetupCtxTest02(void)
317 {
318  TmqResetQueues();
319 
320  Tmq *tmq1 = TmqCreateQueue("queue1");
321  FAIL_IF_NULL(tmq1);
322  Tmq *tmq2 = TmqCreateQueue("queue2");
323  FAIL_IF_NULL(tmq2);
324  Tmq *tmq3 = TmqCreateQueue("another");
325  FAIL_IF_NULL(tmq3);
326  Tmq *tmq4 = TmqCreateQueue("yetanother");
327  FAIL_IF_NULL(tmq4);
328 
329  const char *str = "queue1";
330  void *ctx = TmqhOutputFlowSetupCtx(str);
331  FAIL_IF_NULL(ctx);
332 
333  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
334 
335  FAIL_IF_NOT(fctx->size == 1);
336 
337  FAIL_IF_NULL(fctx->queues);
338 
339  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
340  TmqhOutputFlowFreeCtx(fctx);
341  TmqResetQueues();
342 
343  PASS;
344 }
345 
346 static int TmqhOutputFlowSetupCtxTest03(void)
347 {
348  TmqResetQueues();
349 
350  const char *str = "queue1,queue2,another,yetanother";
351  void *ctx = TmqhOutputFlowSetupCtx(str);
352  FAIL_IF_NULL(ctx);
353 
354  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
355 
356  FAIL_IF_NOT(fctx->size == 4);
357 
358  FAIL_IF_NULL(fctx->queues);
359 
360  Tmq *tmq1 = TmqGetQueueByName("queue1");
361  FAIL_IF_NULL(tmq1);
362  Tmq *tmq2 = TmqGetQueueByName("queue2");
363  FAIL_IF_NULL(tmq2);
364  Tmq *tmq3 = TmqGetQueueByName("another");
365  FAIL_IF_NULL(tmq3);
366  Tmq *tmq4 = TmqGetQueueByName("yetanother");
367  FAIL_IF_NULL(tmq4);
368 
369  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
370  FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
371  FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
372  FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
373 
374  TmqhOutputFlowFreeCtx(fctx);
375  TmqResetQueues();
376  PASS;
377 }
378 
379 #endif /* UNITTESTS */
380 
382 {
383 #ifdef UNITTESTS
384  UtRegisterTest("TmqhOutputFlowSetupCtxTest01",
385  TmqhOutputFlowSetupCtxTest01);
386  UtRegisterTest("TmqhOutputFlowSetupCtxTest02",
387  TmqhOutputFlowSetupCtxTest02);
388  UtRegisterTest("TmqhOutputFlowSetupCtxTest03",
389  TmqhOutputFlowSetupCtxTest03);
390 #endif
391 
392  return;
393 }
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
TMQH_FLOW
@ TMQH_FLOW
Definition: tm-queuehandlers.h:31
FAIL_IF_NULL
#define FAIL_IF_NULL(expr)
Fail a test if expression evaluates to NULL.
Definition: util-unittest.h:89
TmqhOutputFlowIPPair
void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p)
select the queue to output based on IP address pair.
Definition: tmqh-flow.c:252
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
UtRegisterTest
void UtRegisterTest(const char *name, int(*TestFn)(void))
Register unit test.
Definition: util-unittest.c:103
TmqhFlowPrintAutofpHandler
void TmqhFlowPrintAutofpHandler(void)
Definition: tmqh-flow.c:82
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:105
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:298
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:173
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:47
Packet_::flags
uint32_t flags
Definition: decode.h:446
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
TmqhFlowRegisterTests
void TmqhFlowRegisterTests(void)
Definition: tmqh-flow.c:381
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
TmqhFlowMode_
Definition: tmqh-flow.h:27
PKT_WANTS_FLOW
#define PKT_WANTS_FLOW
Definition: decode.h:1108
packet-queue.h
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
StatsSyncCountersIfSignalled
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:137
util-unittest.h
FAIL_IF_NOT
#define FAIL_IF_NOT(expr)
Fail a test if expression to true.
Definition: util-unittest.h:82
TmqhFlowRegister
void TmqhFlowRegister(void)
Definition: tmqh-flow.c:49
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmqhOutputFlowFreeCtx
void TmqhOutputFlowFreeCtx(void *ctx)
Definition: tmqh-flow.c:209
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:54
ConfGet
int ConfGet(const char *name, const char **vptr)
Retrieve the value of a configuration node.
Definition: conf.c:331
decode.h
TmqhFlowCtx_::size
uint16_t size
Definition: tmqh-flow.h:35
PASS
#define PASS
Pass the test.
Definition: util-unittest.h:105
TmqhFlowCtx_::queues
TmqhFlowMode * queues
Definition: tmqh-flow.h:38
SCCondWait
#define SCCondWait
Definition: threads-debug.h:141
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
Tmqh_::RegisterTests
void(* RegisterTests)(void)
Definition: tm-queuehandlers.h:43
TmqhOutputFlowSetupCtx
void * TmqhOutputFlowSetupCtx(const char *queue_str)
setup the queue handlers ctx
Definition: tmqh-flow.c:163
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
TmqhFlowMode_::q
PacketQueue * q
Definition: tmqh-flow.h:28
TmqhFlowCtx_::last
uint16_t last
Definition: tmqh-flow.h:36
Packet_
Definition: decode.h:411
Tmqh_::name
const char * name
Definition: tm-queuehandlers.h:37
conf.h
TmqhOutputFlowHash
void TmqhOutputFlowHash(ThreadVars *t, Packet *p)
Definition: tmqh-flow.c:221
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:55
SC_ERR_INVALID_YAML_CONF_ENTRY
@ SC_ERR_INVALID_YAML_CONF_ENTRY
Definition: util-error.h:169
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
tm-queuehandlers.h
PRINT_IF_FUNC
#define PRINT_IF_FUNC(f, msg)
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:90
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:224
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:50
SCLogError
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:257
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
tmqh_table
Tmqh tmqh_table[TMQH_SIZE]
Definition: tm-queuehandlers.c:37
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:29
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:212
threadvars.h
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::flow_hash
uint32_t flow_hash
Definition: decode.h:452
str
#define str(s)
Definition: suricata-common.h:273
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmqhInputFlow
Packet * TmqhInputFlow(ThreadVars *t)
Definition: tmqh-flow.c:95
tmqh-flow.h
suricata.h
Address_::family
char family
Definition: decode.h:114
Packet_::dst
Address dst
Definition: decode.h:416
Tmq_
Definition: tm-queues.h:29
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:232
TmqhFlowCtx_
Ctx for the flow queue handler.
Definition: tmqh-flow.h:34
Packet_::src
Address src
Definition: decode.h:415
TmqResetQueues
void TmqResetQueues(void)
Definition: tm-queues.c:80