suricata
tmqh-flow.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2020 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  *
24  * Simple output queue handler that makes sure all packets of the same flow
25  * are sent to the same queue. We support different kind of q handlers. Have
26  * a look at "autofp-scheduler" conf to further understand the various q
27  * handlers we provide.
28  */
29 
30 #include "suricata.h"
31 #include "packet-queue.h"
32 #include "decode.h"
33 #include "threads.h"
34 #include "threadvars.h"
35 #include "tmqh-flow.h"
36 #include "flow-hash.h"
37 
38 #include "tm-queuehandlers.h"
39 
40 #include "conf.h"
41 #include "util-unittest.h"
42 
46 static void TmqhOutputFlowFTPHash(ThreadVars *t, Packet *p);
47 void *TmqhOutputFlowSetupCtx(const char *queue_str);
48 void TmqhOutputFlowFreeCtx(void *ctx);
49 void TmqhFlowRegisterTests(void);
50 
51 void TmqhFlowRegister(void)
52 {
53  tmqh_table[TMQH_FLOW].name = "flow";
58 
59  const char *scheduler = NULL;
60  if (ConfGet("autofp-scheduler", &scheduler) == 1) {
61  if (strcasecmp(scheduler, "round-robin") == 0) {
62  SCLogNotice("using flow hash instead of round robin");
64  } else if (strcasecmp(scheduler, "active-packets") == 0) {
65  SCLogNotice("using flow hash instead of active packets");
67  } else if (strcasecmp(scheduler, "hash") == 0) {
69  } else if (strcasecmp(scheduler, "ippair") == 0) {
71  } else if (strcasecmp(scheduler, "ftp-hash") == 0) {
72  tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowFTPHash;
73  } else {
74  SCLogError("Invalid entry \"%s\" "
75  "for autofp-scheduler in conf. Killing engine.",
76  scheduler);
77  exit(EXIT_FAILURE);
78  }
79  } else {
81  }
82 
83  return;
84 }
85 
87 {
88 #define PRINT_IF_FUNC(f, msg) \
89  if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \
90  SCLogConfig("AutoFP mode using \"%s\" flow load balancer", (msg))
91 
94  PRINT_IF_FUNC(TmqhOutputFlowFTPHash, "FTPHash");
95 
96 #undef PRINT_IF_FUNC
97 }
98 
99 /* same as 'simple' */
101 {
102  PacketQueue *q = tv->inq->pq;
103 
105 
106  SCMutexLock(&q->mutex_q);
107  if (q->len == 0) {
108  /* if we have no packets in queue, wait... */
109  SCCondWait(&q->cond_q, &q->mutex_q);
110  }
111 
112  if (q->len > 0) {
113  Packet *p = PacketDequeue(q);
114  SCMutexUnlock(&q->mutex_q);
115  return p;
116  } else {
117  /* return NULL if we have no pkt. Should only happen on signals. */
118  SCMutexUnlock(&q->mutex_q);
119  return NULL;
120  }
121 }
122 
123 static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
124 {
125  void *ptmp;
126  Tmq *tmq = TmqGetQueueByName(name);
127  if (tmq == NULL) {
128  tmq = TmqCreateQueue(name);
129  if (tmq == NULL)
130  return -1;
131  }
132  tmq->writer_cnt++;
133 
134  if (ctx->queues == NULL) {
135  ctx->size = 1;
136  ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode));
137  if (ctx->queues == NULL) {
138  return -1;
139  }
140  memset(ctx->queues, 0, ctx->size * sizeof(TmqhFlowMode));
141  } else {
142  ctx->size++;
143  ptmp = SCRealloc(ctx->queues, ctx->size * sizeof(TmqhFlowMode));
144  if (ptmp == NULL) {
145  SCFree(ctx->queues);
146  ctx->queues = NULL;
147  return -1;
148  }
149  ctx->queues = ptmp;
150 
151  memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
152  }
153  ctx->queues[ctx->size - 1].q = tmq->pq;
154 
155  return 0;
156 }
157 
158 /**
159  * \brief setup the queue handlers ctx
160  *
161  * Parses a comma separated string "queuename1,queuename2,etc"
162  * and sets the ctx up to devide flows over these queue's.
163  *
164  * \param queue_str comma separated string with output queue names
165  *
166  * \retval ctx queues handlers ctx or NULL in error
167  */
168 void *TmqhOutputFlowSetupCtx(const char *queue_str)
169 {
170  if (queue_str == NULL || strlen(queue_str) == 0)
171  return NULL;
172 
173  SCLogDebug("queue_str %s", queue_str);
174 
175  TmqhFlowCtx *ctx = SCMalloc(sizeof(TmqhFlowCtx));
176  if (unlikely(ctx == NULL))
177  return NULL;
178  memset(ctx,0x00,sizeof(TmqhFlowCtx));
179 
180  char *str = SCStrdup(queue_str);
181  if (unlikely(str == NULL)) {
182  goto error;
183  }
184  char *tstr = str;
185 
186  /* parse the comma separated string */
187  do {
188  char *comma = strchr(tstr,',');
189  if (comma != NULL) {
190  *comma = '\0';
191  char *qname = tstr;
192  int r = StoreQueueId(ctx,qname);
193  if (r < 0)
194  goto error;
195  } else {
196  char *qname = tstr;
197  int r = StoreQueueId(ctx,qname);
198  if (r < 0)
199  goto error;
200  }
201  tstr = comma ? (comma + 1) : comma;
202  } while (tstr != NULL);
203 
204  SCFree(str);
205  return (void *)ctx;
206 
207 error:
208  SCFree(ctx);
209  if (str != NULL)
210  SCFree(str);
211  return NULL;
212 }
213 
214 void TmqhOutputFlowFreeCtx(void *ctx)
215 {
216  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
217 
218  SCLogPerf("AutoFP - Total flow handler queues - %" PRIu16,
219  fctx->size);
220  SCFree(fctx->queues);
221  SCFree(fctx);
222 
223  return;
224 }
225 
227 {
228  uint32_t qid;
229  TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
230 
231  if (p->flags & PKT_WANTS_FLOW) {
232  uint32_t hash = p->flow_hash;
233  qid = hash % ctx->size;
234  } else {
235  qid = ctx->last++;
236 
237  if (ctx->last == ctx->size)
238  ctx->last = 0;
239  }
240 
241  PacketQueue *q = ctx->queues[qid].q;
242  SCMutexLock(&q->mutex_q);
243  PacketEnqueue(q, p);
244  SCCondSignal(&q->cond_q);
245  SCMutexUnlock(&q->mutex_q);
246 
247  return;
248 }
249 
250 /**
251  * \brief select the queue to output based on IP address pair.
252  *
253  * \param tv thread vars.
254  * \param p packet.
255  */
257 {
258  uint32_t addr_hash = 0;
259 
260  TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
261 
262  if (p->src.family == AF_INET6) {
263  for (int i = 0; i < 4; i++) {
264  addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
265  }
266  } else {
267  addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
268  }
269 
270  uint32_t qid = addr_hash % ctx->size;
271  PacketQueue *q = ctx->queues[qid].q;
272  SCMutexLock(&q->mutex_q);
273  PacketEnqueue(q, p);
274  SCCondSignal(&q->cond_q);
275  SCMutexUnlock(&q->mutex_q);
276 
277  return;
278 }
279 
280 static void TmqhOutputFlowFTPHash(ThreadVars *tv, Packet *p)
281 {
282  uint32_t qid;
283  TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
284 
285  if (p->flags & PKT_WANTS_FLOW) {
286  uint32_t hash = p->flow_hash;
287  if (p->tcph != NULL && ((p->sp >= 1024 && p->dp >= 1024) || p->dp == 21 || p->sp == 21 ||
288  p->dp == 20 || p->sp == 20)) {
289  hash = FlowGetIpPairProtoHash(p);
290  }
291  qid = hash % ctx->size;
292  } else {
293  qid = ctx->last++;
294 
295  if (ctx->last == ctx->size)
296  ctx->last = 0;
297  }
298 
299  PacketQueue *q = ctx->queues[qid].q;
300  SCMutexLock(&q->mutex_q);
301  PacketEnqueue(q, p);
302  SCCondSignal(&q->cond_q);
303  SCMutexUnlock(&q->mutex_q);
304 
305  return;
306 }
307 
308 #ifdef UNITTESTS
309 
310 static int TmqhOutputFlowSetupCtxTest01(void)
311 {
312  TmqResetQueues();
313 
314  Tmq *tmq1 = TmqCreateQueue("queue1");
315  FAIL_IF_NULL(tmq1);
316  Tmq *tmq2 = TmqCreateQueue("queue2");
317  FAIL_IF_NULL(tmq2);
318  Tmq *tmq3 = TmqCreateQueue("another");
319  FAIL_IF_NULL(tmq3);
320  Tmq *tmq4 = TmqCreateQueue("yetanother");
321  FAIL_IF_NULL(tmq4);
322 
323  const char *str = "queue1,queue2,another,yetanother";
324  void *ctx = TmqhOutputFlowSetupCtx(str);
325  FAIL_IF_NULL(ctx);
326 
327  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
328 
329  FAIL_IF_NOT(fctx->size == 4);
330 
331  FAIL_IF_NULL(fctx->queues);
332 
333  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
334  FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
335  FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
336  FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
337 
338  TmqhOutputFlowFreeCtx(fctx);
339  TmqResetQueues();
340  PASS;
341 }
342 
343 static int TmqhOutputFlowSetupCtxTest02(void)
344 {
345  TmqResetQueues();
346 
347  Tmq *tmq1 = TmqCreateQueue("queue1");
348  FAIL_IF_NULL(tmq1);
349  Tmq *tmq2 = TmqCreateQueue("queue2");
350  FAIL_IF_NULL(tmq2);
351  Tmq *tmq3 = TmqCreateQueue("another");
352  FAIL_IF_NULL(tmq3);
353  Tmq *tmq4 = TmqCreateQueue("yetanother");
354  FAIL_IF_NULL(tmq4);
355 
356  const char *str = "queue1";
357  void *ctx = TmqhOutputFlowSetupCtx(str);
358  FAIL_IF_NULL(ctx);
359 
360  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
361 
362  FAIL_IF_NOT(fctx->size == 1);
363 
364  FAIL_IF_NULL(fctx->queues);
365 
366  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
367  TmqhOutputFlowFreeCtx(fctx);
368  TmqResetQueues();
369 
370  PASS;
371 }
372 
373 static int TmqhOutputFlowSetupCtxTest03(void)
374 {
375  TmqResetQueues();
376 
377  const char *str = "queue1,queue2,another,yetanother";
378  void *ctx = TmqhOutputFlowSetupCtx(str);
379  FAIL_IF_NULL(ctx);
380 
381  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
382 
383  FAIL_IF_NOT(fctx->size == 4);
384 
385  FAIL_IF_NULL(fctx->queues);
386 
387  Tmq *tmq1 = TmqGetQueueByName("queue1");
388  FAIL_IF_NULL(tmq1);
389  Tmq *tmq2 = TmqGetQueueByName("queue2");
390  FAIL_IF_NULL(tmq2);
391  Tmq *tmq3 = TmqGetQueueByName("another");
392  FAIL_IF_NULL(tmq3);
393  Tmq *tmq4 = TmqGetQueueByName("yetanother");
394  FAIL_IF_NULL(tmq4);
395 
396  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
397  FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
398  FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
399  FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
400 
401  TmqhOutputFlowFreeCtx(fctx);
402  TmqResetQueues();
403  PASS;
404 }
405 
406 #endif /* UNITTESTS */
407 
409 {
410 #ifdef UNITTESTS
411  UtRegisterTest("TmqhOutputFlowSetupCtxTest01",
412  TmqhOutputFlowSetupCtxTest01);
413  UtRegisterTest("TmqhOutputFlowSetupCtxTest02",
414  TmqhOutputFlowSetupCtxTest02);
415  UtRegisterTest("TmqhOutputFlowSetupCtxTest03",
416  TmqhOutputFlowSetupCtxTest03);
417 #endif
418 
419  return;
420 }
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
FAIL_IF_NULL
#define FAIL_IF_NULL(expr)
Fail a test if expression evaluates to NULL.
Definition: util-unittest.h:89
TmqhOutputFlowIPPair
void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p)
select the queue to output based on IP address pair.
Definition: tmqh-flow.c:256
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
UtRegisterTest
void UtRegisterTest(const char *name, int(*TestFn)(void))
Register unit test.
Definition: util-unittest.c:103
TmqhFlowPrintAutofpHandler
void TmqhFlowPrintAutofpHandler(void)
Definition: tmqh-flow.c:86
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:104
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:175
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
Packet_::flags
uint32_t flags
Definition: decode.h:467
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
TmqhFlowRegisterTests
void TmqhFlowRegisterTests(void)
Definition: tmqh-flow.c:408
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
TmqhFlowMode_
Definition: tmqh-flow.h:27
PKT_WANTS_FLOW
#define PKT_WANTS_FLOW
Definition: decode.h:1030
flow-hash.h
packet-queue.h
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
StatsSyncCountersIfSignalled
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:141
util-unittest.h
FAIL_IF_NOT
#define FAIL_IF_NOT(expr)
Fail a test if expression evaluates to false.
Definition: util-unittest.h:82
TmqhFlowRegister
void TmqhFlowRegister(void)
Definition: tmqh-flow.c:51
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmqhOutputFlowFreeCtx
void TmqhOutputFlowFreeCtx(void *ctx)
Definition: tmqh-flow.c:214
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
ConfGet
int ConfGet(const char *name, const char **vptr)
Retrieve the value of a configuration node.
Definition: conf.c:335
decode.h
TmqhFlowCtx_::size
uint16_t size
Definition: tmqh-flow.h:35
PASS
#define PASS
Pass the test.
Definition: util-unittest.h:105
TmqhFlowCtx_::queues
TmqhFlowMode * queues
Definition: tmqh-flow.h:38
SCCondWait
#define SCCondWait
Definition: threads-debug.h:141
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
Tmqh_::RegisterTests
void(* RegisterTests)(void)
Definition: tm-queuehandlers.h:43
Packet_::sp
Port sp
Definition: decode.h:437
TmqhOutputFlowSetupCtx
void * TmqhOutputFlowSetupCtx(const char *queue_str)
setup the queue handlers ctx
Definition: tmqh-flow.c:168
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
TmqhFlowMode_::q
PacketQueue * q
Definition: tmqh-flow.h:28
TmqhFlowCtx_::last
uint16_t last
Definition: tmqh-flow.h:36
Packet_
Definition: decode.h:430
Tmqh_::name
const char * name
Definition: tm-queuehandlers.h:37
conf.h
TmqhOutputFlowHash
void TmqhOutputFlowHash(ThreadVars *t, Packet *p)
Definition: tmqh-flow.c:226
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
FlowGetIpPairProtoHash
uint32_t FlowGetIpPairProtoHash(const Packet *p)
Definition: flow-hash.c:116
tm-queuehandlers.h
PRINT_IF_FUNC
#define PRINT_IF_FUNC(f, msg)
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:89
Packet_::tcph
TCPHdr * tcph
Definition: decode.h:557
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
tmqh_table
Tmqh tmqh_table[TMQH_SIZE]
Definition: tm-queuehandlers.c:37
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:216
threadvars.h
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::flow_hash
uint32_t flow_hash
Definition: decode.h:473
str
#define str(s)
Definition: suricata-common.h:286
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmqhInputFlow
Packet * TmqhInputFlow(ThreadVars *t)
Definition: tmqh-flow.c:100
tmqh-flow.h
suricata.h
Address_::family
char family
Definition: decode.h:116
Packet_::dst
Address dst
Definition: decode.h:435
Tmq_
Definition: tm-queues.h:29
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
Packet_::dp
Port dp
Definition: decode.h:445
TMQH_FLOW
@ TMQH_FLOW
Definition: tm-queuehandlers.h:31
TmqhFlowCtx_
Ctx for the flow queue handler.
Definition: tmqh-flow.h:34
Packet_::src
Address src
Definition: decode.h:434
TmqResetQueues
void TmqResetQueues(void)
Definition: tm-queues.c:80