suricata
tmqh-flow.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2020 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  *
24  * Simple output queue handler that makes sure all packets of the same flow
25  * are sent to the same queue. We support different kind of q handlers. Have
26  * a look at "autofp-scheduler" conf to further understand the various q
27  * handlers we provide.
28  */
29 
30 #include "suricata.h"
31 #include "packet-queue.h"
32 #include "decode.h"
33 #include "threads.h"
34 #include "threadvars.h"
35 #include "tmqh-flow.h"
36 #include "flow-hash.h"
37 
38 #include "tm-queuehandlers.h"
39 
40 #include "conf.h"
41 #include "util-unittest.h"
42 
46 static void TmqhOutputFlowFTPHash(ThreadVars *t, Packet *p);
47 void *TmqhOutputFlowSetupCtx(const char *queue_str);
48 void TmqhOutputFlowFreeCtx(void *ctx);
49 void TmqhFlowRegisterTests(void);
50 
51 void TmqhFlowRegister(void)
52 {
53  tmqh_table[TMQH_FLOW].name = "flow";
58 
59  const char *scheduler = NULL;
60  if (ConfGet("autofp-scheduler", &scheduler) == 1) {
61  if (strcasecmp(scheduler, "round-robin") == 0) {
62  SCLogNotice("using flow hash instead of round robin");
64  } else if (strcasecmp(scheduler, "active-packets") == 0) {
65  SCLogNotice("using flow hash instead of active packets");
67  } else if (strcasecmp(scheduler, "hash") == 0) {
69  } else if (strcasecmp(scheduler, "ippair") == 0) {
71  } else if (strcasecmp(scheduler, "ftp-hash") == 0) {
72  tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowFTPHash;
73  } else {
74  SCLogError("Invalid entry \"%s\" "
75  "for autofp-scheduler in conf. Killing engine.",
76  scheduler);
77  exit(EXIT_FAILURE);
78  }
79  } else {
81  }
82 
83  return;
84 }
85 
87 {
88 #define PRINT_IF_FUNC(f, msg) \
89  if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \
90  SCLogConfig("AutoFP mode using \"%s\" flow load balancer", (msg))
91 
94  PRINT_IF_FUNC(TmqhOutputFlowFTPHash, "FTPHash");
95 
96 #undef PRINT_IF_FUNC
97 }
98 
99 /* same as 'simple' */
101 {
102  PacketQueue *q = tv->inq->pq;
103 
105 
106  SCMutexLock(&q->mutex_q);
107  if (q->len == 0) {
108  /* if we have no packets in queue, wait... */
109  SCCondWait(&q->cond_q, &q->mutex_q);
110  }
111 
112  if (q->len > 0) {
113  Packet *p = PacketDequeue(q);
114  SCMutexUnlock(&q->mutex_q);
115  return p;
116  } else {
117  /* return NULL if we have no pkt. Should only happen on signals. */
118  SCMutexUnlock(&q->mutex_q);
119  return NULL;
120  }
121 }
122 
123 static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
124 {
125  void *ptmp;
126  Tmq *tmq = TmqGetQueueByName(name);
127  if (tmq == NULL) {
128  tmq = TmqCreateQueue(name);
129  if (tmq == NULL)
130  return -1;
131  }
132  tmq->writer_cnt++;
133 
134  if (ctx->queues == NULL) {
135  ctx->size = 1;
136  ctx->queues = SCCalloc(1, ctx->size * sizeof(TmqhFlowMode));
137  if (ctx->queues == NULL) {
138  return -1;
139  }
140  } else {
141  ctx->size++;
142  ptmp = SCRealloc(ctx->queues, ctx->size * sizeof(TmqhFlowMode));
143  if (ptmp == NULL) {
144  SCFree(ctx->queues);
145  ctx->queues = NULL;
146  return -1;
147  }
148  ctx->queues = ptmp;
149 
150  memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
151  }
152  ctx->queues[ctx->size - 1].q = tmq->pq;
153 
154  return 0;
155 }
156 
157 /**
158  * \brief setup the queue handlers ctx
159  *
160  * Parses a comma separated string "queuename1,queuename2,etc"
161  * and sets the ctx up to devide flows over these queue's.
162  *
163  * \param queue_str comma separated string with output queue names
164  *
165  * \retval ctx queues handlers ctx or NULL in error
166  */
167 void *TmqhOutputFlowSetupCtx(const char *queue_str)
168 {
169  if (queue_str == NULL || strlen(queue_str) == 0)
170  return NULL;
171 
172  SCLogDebug("queue_str %s", queue_str);
173 
174  TmqhFlowCtx *ctx = SCCalloc(1, sizeof(TmqhFlowCtx));
175  if (unlikely(ctx == NULL))
176  return NULL;
177 
178  char *str = SCStrdup(queue_str);
179  if (unlikely(str == NULL)) {
180  goto error;
181  }
182  char *tstr = str;
183 
184  /* parse the comma separated string */
185  do {
186  char *comma = strchr(tstr,',');
187  if (comma != NULL) {
188  *comma = '\0';
189  char *qname = tstr;
190  int r = StoreQueueId(ctx,qname);
191  if (r < 0)
192  goto error;
193  } else {
194  char *qname = tstr;
195  int r = StoreQueueId(ctx,qname);
196  if (r < 0)
197  goto error;
198  }
199  tstr = comma ? (comma + 1) : comma;
200  } while (tstr != NULL);
201 
202  SCFree(str);
203  return (void *)ctx;
204 
205 error:
206  SCFree(ctx);
207  if (str != NULL)
208  SCFree(str);
209  return NULL;
210 }
211 
212 void TmqhOutputFlowFreeCtx(void *ctx)
213 {
214  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
215 
216  SCLogPerf("AutoFP - Total flow handler queues - %" PRIu16,
217  fctx->size);
218  SCFree(fctx->queues);
219  SCFree(fctx);
220 
221  return;
222 }
223 
225 {
226  uint32_t qid;
227  TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
228 
229  if (p->flags & PKT_WANTS_FLOW) {
230  uint32_t hash = p->flow_hash;
231  qid = hash % ctx->size;
232  } else {
233  qid = ctx->last++;
234 
235  if (ctx->last == ctx->size)
236  ctx->last = 0;
237  }
238 
239  PacketQueue *q = ctx->queues[qid].q;
240  SCMutexLock(&q->mutex_q);
241  PacketEnqueue(q, p);
242  SCCondSignal(&q->cond_q);
243  SCMutexUnlock(&q->mutex_q);
244 
245  return;
246 }
247 
248 /**
249  * \brief select the queue to output based on IP address pair.
250  *
251  * \param tv thread vars.
252  * \param p packet.
253  */
255 {
256  uint32_t addr_hash = 0;
257 
258  TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
259 
260  if (p->src.family == AF_INET6) {
261  for (int i = 0; i < 4; i++) {
262  addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
263  }
264  } else {
265  addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
266  }
267 
268  uint32_t qid = addr_hash % ctx->size;
269  PacketQueue *q = ctx->queues[qid].q;
270  SCMutexLock(&q->mutex_q);
271  PacketEnqueue(q, p);
272  SCCondSignal(&q->cond_q);
273  SCMutexUnlock(&q->mutex_q);
274 
275  return;
276 }
277 
278 static void TmqhOutputFlowFTPHash(ThreadVars *tv, Packet *p)
279 {
280  uint32_t qid;
281  TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
282 
283  if (p->flags & PKT_WANTS_FLOW) {
284  uint32_t hash = p->flow_hash;
285  if (p->tcph != NULL && ((p->sp >= 1024 && p->dp >= 1024) || p->dp == 21 || p->sp == 21 ||
286  p->dp == 20 || p->sp == 20)) {
287  hash = FlowGetIpPairProtoHash(p);
288  }
289  qid = hash % ctx->size;
290  } else {
291  qid = ctx->last++;
292 
293  if (ctx->last == ctx->size)
294  ctx->last = 0;
295  }
296 
297  PacketQueue *q = ctx->queues[qid].q;
298  SCMutexLock(&q->mutex_q);
299  PacketEnqueue(q, p);
300  SCCondSignal(&q->cond_q);
301  SCMutexUnlock(&q->mutex_q);
302 
303  return;
304 }
305 
306 #ifdef UNITTESTS
307 
308 static int TmqhOutputFlowSetupCtxTest01(void)
309 {
310  TmqResetQueues();
311 
312  Tmq *tmq1 = TmqCreateQueue("queue1");
313  FAIL_IF_NULL(tmq1);
314  Tmq *tmq2 = TmqCreateQueue("queue2");
315  FAIL_IF_NULL(tmq2);
316  Tmq *tmq3 = TmqCreateQueue("another");
317  FAIL_IF_NULL(tmq3);
318  Tmq *tmq4 = TmqCreateQueue("yetanother");
319  FAIL_IF_NULL(tmq4);
320 
321  const char *str = "queue1,queue2,another,yetanother";
322  void *ctx = TmqhOutputFlowSetupCtx(str);
323  FAIL_IF_NULL(ctx);
324 
325  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
326 
327  FAIL_IF_NOT(fctx->size == 4);
328 
329  FAIL_IF_NULL(fctx->queues);
330 
331  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
332  FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
333  FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
334  FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
335 
336  TmqhOutputFlowFreeCtx(fctx);
337  TmqResetQueues();
338  PASS;
339 }
340 
341 static int TmqhOutputFlowSetupCtxTest02(void)
342 {
343  TmqResetQueues();
344 
345  Tmq *tmq1 = TmqCreateQueue("queue1");
346  FAIL_IF_NULL(tmq1);
347  Tmq *tmq2 = TmqCreateQueue("queue2");
348  FAIL_IF_NULL(tmq2);
349  Tmq *tmq3 = TmqCreateQueue("another");
350  FAIL_IF_NULL(tmq3);
351  Tmq *tmq4 = TmqCreateQueue("yetanother");
352  FAIL_IF_NULL(tmq4);
353 
354  const char *str = "queue1";
355  void *ctx = TmqhOutputFlowSetupCtx(str);
356  FAIL_IF_NULL(ctx);
357 
358  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
359 
360  FAIL_IF_NOT(fctx->size == 1);
361 
362  FAIL_IF_NULL(fctx->queues);
363 
364  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
365  TmqhOutputFlowFreeCtx(fctx);
366  TmqResetQueues();
367 
368  PASS;
369 }
370 
371 static int TmqhOutputFlowSetupCtxTest03(void)
372 {
373  TmqResetQueues();
374 
375  const char *str = "queue1,queue2,another,yetanother";
376  void *ctx = TmqhOutputFlowSetupCtx(str);
377  FAIL_IF_NULL(ctx);
378 
379  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
380 
381  FAIL_IF_NOT(fctx->size == 4);
382 
383  FAIL_IF_NULL(fctx->queues);
384 
385  Tmq *tmq1 = TmqGetQueueByName("queue1");
386  FAIL_IF_NULL(tmq1);
387  Tmq *tmq2 = TmqGetQueueByName("queue2");
388  FAIL_IF_NULL(tmq2);
389  Tmq *tmq3 = TmqGetQueueByName("another");
390  FAIL_IF_NULL(tmq3);
391  Tmq *tmq4 = TmqGetQueueByName("yetanother");
392  FAIL_IF_NULL(tmq4);
393 
394  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
395  FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
396  FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
397  FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
398 
399  TmqhOutputFlowFreeCtx(fctx);
400  TmqResetQueues();
401  PASS;
402 }
403 
404 #endif /* UNITTESTS */
405 
407 {
408 #ifdef UNITTESTS
409  UtRegisterTest("TmqhOutputFlowSetupCtxTest01",
410  TmqhOutputFlowSetupCtxTest01);
411  UtRegisterTest("TmqhOutputFlowSetupCtxTest02",
412  TmqhOutputFlowSetupCtxTest02);
413  UtRegisterTest("TmqhOutputFlowSetupCtxTest03",
414  TmqhOutputFlowSetupCtxTest03);
415 #endif
416 
417  return;
418 }
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
FAIL_IF_NULL
#define FAIL_IF_NULL(expr)
Fail a test if expression evaluates to NULL.
Definition: util-unittest.h:89
TmqhOutputFlowIPPair
void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p)
select the queue to output based on IP address pair.
Definition: tmqh-flow.c:254
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
UtRegisterTest
void UtRegisterTest(const char *name, int(*TestFn)(void))
Register unit test.
Definition: util-unittest.c:103
TmqhFlowPrintAutofpHandler
void TmqhFlowPrintAutofpHandler(void)
Definition: tmqh-flow.c:86
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:104
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:175
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
Packet_::flags
uint32_t flags
Definition: decode.h:473
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
TmqhFlowRegisterTests
void TmqhFlowRegisterTests(void)
Definition: tmqh-flow.c:406
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
TmqhFlowMode_
Definition: tmqh-flow.h:27
PKT_WANTS_FLOW
#define PKT_WANTS_FLOW
Definition: decode.h:1048
flow-hash.h
packet-queue.h
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
util-unittest.h
FAIL_IF_NOT
#define FAIL_IF_NOT(expr)
Fail a test if expression evaluates to false.
Definition: util-unittest.h:82
TmqhFlowRegister
void TmqhFlowRegister(void)
Definition: tmqh-flow.c:51
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmqhOutputFlowFreeCtx
void TmqhOutputFlowFreeCtx(void *ctx)
Definition: tmqh-flow.c:212
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
ConfGet
int ConfGet(const char *name, const char **vptr)
Retrieve the value of a configuration node.
Definition: conf.c:335
decode.h
TmqhFlowCtx_::size
uint16_t size
Definition: tmqh-flow.h:35
PASS
#define PASS
Pass the test.
Definition: util-unittest.h:105
TmqhFlowCtx_::queues
TmqhFlowMode * queues
Definition: tmqh-flow.h:38
SCCondWait
#define SCCondWait
Definition: threads-debug.h:141
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
Tmqh_::RegisterTests
void(* RegisterTests)(void)
Definition: tm-queuehandlers.h:43
Packet_::sp
Port sp
Definition: decode.h:443
TmqhOutputFlowSetupCtx
void * TmqhOutputFlowSetupCtx(const char *queue_str)
setup the queue handlers ctx
Definition: tmqh-flow.c:167
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
TmqhFlowMode_::q
PacketQueue * q
Definition: tmqh-flow.h:28
TmqhFlowCtx_::last
uint16_t last
Definition: tmqh-flow.h:36
Packet_
Definition: decode.h:436
Tmqh_::name
const char * name
Definition: tm-queuehandlers.h:37
conf.h
TmqhOutputFlowHash
void TmqhOutputFlowHash(ThreadVars *t, Packet *p)
Definition: tmqh-flow.c:224
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
FlowGetIpPairProtoHash
uint32_t FlowGetIpPairProtoHash(const Packet *p)
Definition: flow-hash.c:116
tm-queuehandlers.h
PRINT_IF_FUNC
#define PRINT_IF_FUNC(f, msg)
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:89
Packet_::tcph
TCPHdr * tcph
Definition: decode.h:566
TMQH_FLOW
@ TMQH_FLOW
Definition: tm-queuehandlers.h:31
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
tmqh_table
Tmqh tmqh_table[TMQH_SIZE]
Definition: tm-queuehandlers.c:37
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:216
threadvars.h
Packet_::flow_hash
uint32_t flow_hash
Definition: decode.h:479
str
#define str(s)
Definition: suricata-common.h:291
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmqhInputFlow
Packet * TmqhInputFlow(ThreadVars *t)
Definition: tmqh-flow.c:100
tmqh-flow.h
suricata.h
Address_::family
char family
Definition: decode.h:116
Packet_::dst
Address dst
Definition: decode.h:441
Tmq_
Definition: tm-queues.h:29
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition: counters.c:461
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
Packet_::dp
Port dp
Definition: decode.h:451
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
TmqhFlowCtx_
Ctx for the flow queue handler.
Definition: tmqh-flow.h:34
Packet_::src
Address src
Definition: decode.h:440
TmqResetQueues
void TmqResetQueues(void)
Definition: tm-queues.c:80