suricata
tmqh-flow.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2020 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  *
24  * Simple output queue handler that makes sure all packets of the same flow
25  * are sent to the same queue. We support different kind of q handlers. Have
26  * a look at "autofp-scheduler" conf to further understand the various q
27  * handlers we provide.
28  */
29 
30 #include "suricata.h"
31 #include "packet-queue.h"
32 #include "decode.h"
33 #include "threads.h"
34 #include "threadvars.h"
35 #include "tmqh-flow.h"
36 #include "flow-hash.h"
37 
38 #include "tm-queuehandlers.h"
39 
40 #include "conf.h"
41 #include "util-unittest.h"
42 
46 static void TmqhOutputFlowFTPHash(ThreadVars *t, Packet *p);
47 void *TmqhOutputFlowSetupCtx(const char *queue_str);
48 void TmqhOutputFlowFreeCtx(void *ctx);
49 void TmqhFlowRegisterTests(void);
50 
51 void TmqhFlowRegister(void)
52 {
53  tmqh_table[TMQH_FLOW].name = "flow";
58 
59  const char *scheduler = NULL;
60  if (ConfGet("autofp-scheduler", &scheduler) == 1) {
61  if (strcasecmp(scheduler, "round-robin") == 0) {
62  SCLogNotice("using flow hash instead of round robin");
64  } else if (strcasecmp(scheduler, "active-packets") == 0) {
65  SCLogNotice("using flow hash instead of active packets");
67  } else if (strcasecmp(scheduler, "hash") == 0) {
69  } else if (strcasecmp(scheduler, "ippair") == 0) {
71  } else if (strcasecmp(scheduler, "ftp-hash") == 0) {
72  tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowFTPHash;
73  } else {
74  SCLogError("Invalid entry \"%s\" "
75  "for autofp-scheduler in conf. Killing engine.",
76  scheduler);
77  exit(EXIT_FAILURE);
78  }
79  } else {
81  }
82 }
83 
85 {
86 #define PRINT_IF_FUNC(f, msg) \
87  if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \
88  SCLogConfig("AutoFP mode using \"%s\" flow load balancer", (msg))
89 
92  PRINT_IF_FUNC(TmqhOutputFlowFTPHash, "FTPHash");
93 
94 #undef PRINT_IF_FUNC
95 }
96 
97 /* same as 'simple' */
99 {
100  PacketQueue *q = tv->inq->pq;
101 
103 
104  SCMutexLock(&q->mutex_q);
105  if (q->len == 0) {
106  /* if we have no packets in queue, wait... */
107  SCCondWait(&q->cond_q, &q->mutex_q);
108  }
109 
110  if (q->len > 0) {
111  Packet *p = PacketDequeue(q);
112  SCMutexUnlock(&q->mutex_q);
113  return p;
114  } else {
115  /* return NULL if we have no pkt. Should only happen on signals. */
116  SCMutexUnlock(&q->mutex_q);
117  return NULL;
118  }
119 }
120 
121 static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
122 {
123  void *ptmp;
124  Tmq *tmq = TmqGetQueueByName(name);
125  if (tmq == NULL) {
126  tmq = TmqCreateQueue(name);
127  if (tmq == NULL)
128  return -1;
129  }
130  tmq->writer_cnt++;
131 
132  if (ctx->queues == NULL) {
133  ctx->size = 1;
134  ctx->queues = SCCalloc(1, ctx->size * sizeof(TmqhFlowMode));
135  if (ctx->queues == NULL) {
136  return -1;
137  }
138  } else {
139  ctx->size++;
140  ptmp = SCRealloc(ctx->queues, ctx->size * sizeof(TmqhFlowMode));
141  if (ptmp == NULL) {
142  SCFree(ctx->queues);
143  ctx->queues = NULL;
144  return -1;
145  }
146  ctx->queues = ptmp;
147 
148  memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
149  }
150  ctx->queues[ctx->size - 1].q = tmq->pq;
151 
152  return 0;
153 }
154 
155 /**
156  * \brief setup the queue handlers ctx
157  *
158  * Parses a comma separated string "queuename1,queuename2,etc"
159  * and sets the ctx up to devide flows over these queue's.
160  *
161  * \param queue_str comma separated string with output queue names
162  *
163  * \retval ctx queues handlers ctx or NULL in error
164  */
165 void *TmqhOutputFlowSetupCtx(const char *queue_str)
166 {
167  if (queue_str == NULL || strlen(queue_str) == 0)
168  return NULL;
169 
170  SCLogDebug("queue_str %s", queue_str);
171 
172  TmqhFlowCtx *ctx = SCCalloc(1, sizeof(TmqhFlowCtx));
173  if (unlikely(ctx == NULL))
174  return NULL;
175 
176  char *str = SCStrdup(queue_str);
177  if (unlikely(str == NULL)) {
178  goto error;
179  }
180  char *tstr = str;
181 
182  /* parse the comma separated string */
183  do {
184  char *comma = strchr(tstr,',');
185  if (comma != NULL) {
186  *comma = '\0';
187  char *qname = tstr;
188  int r = StoreQueueId(ctx,qname);
189  if (r < 0)
190  goto error;
191  } else {
192  char *qname = tstr;
193  int r = StoreQueueId(ctx,qname);
194  if (r < 0)
195  goto error;
196  }
197  tstr = comma ? (comma + 1) : comma;
198  } while (tstr != NULL);
199 
200  SCFree(str);
201  return (void *)ctx;
202 
203 error:
204  SCFree(ctx);
205  if (str != NULL)
206  SCFree(str);
207  return NULL;
208 }
209 
211 {
212  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
213 
214  SCLogPerf("AutoFP - Total flow handler queues - %" PRIu16,
215  fctx->size);
216  SCFree(fctx->queues);
217  SCFree(fctx);
218 }
219 
221 {
222  uint32_t qid;
224 
225  if (p->flags & PKT_WANTS_FLOW) {
226  uint32_t hash = p->flow_hash;
227  qid = hash % ctx->size;
228  } else {
229  qid = ctx->last++;
230 
231  if (ctx->last == ctx->size)
232  ctx->last = 0;
233  }
234 
235  PacketQueue *q = ctx->queues[qid].q;
236  SCMutexLock(&q->mutex_q);
237  PacketEnqueue(q, p);
238  SCCondSignal(&q->cond_q);
239  SCMutexUnlock(&q->mutex_q);
240 }
241 
242 /**
243  * \brief select the queue to output based on IP address pair.
244  *
245  * \param tv thread vars.
246  * \param p packet.
247  */
249 {
250  uint32_t addr_hash = 0;
251 
253 
254  if (p->src.family == AF_INET6) {
255  for (int i = 0; i < 4; i++) {
256  addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
257  }
258  } else {
259  addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
260  }
261 
262  uint32_t qid = addr_hash % ctx->size;
263  PacketQueue *q = ctx->queues[qid].q;
264  SCMutexLock(&q->mutex_q);
265  PacketEnqueue(q, p);
266  SCCondSignal(&q->cond_q);
267  SCMutexUnlock(&q->mutex_q);
268 }
269 
270 static void TmqhOutputFlowFTPHash(ThreadVars *tv, Packet *p)
271 {
272  uint32_t qid;
274 
275  if (p->flags & PKT_WANTS_FLOW) {
276  uint32_t hash = p->flow_hash;
277  if (PacketIsTCP(p) && ((p->sp >= 1024 && p->dp >= 1024) || p->dp == 21 || p->sp == 21 ||
278  p->dp == 20 || p->sp == 20)) {
279  hash = FlowGetIpPairProtoHash(p);
280  }
281  qid = hash % ctx->size;
282  } else {
283  qid = ctx->last++;
284 
285  if (ctx->last == ctx->size)
286  ctx->last = 0;
287  }
288 
289  PacketQueue *q = ctx->queues[qid].q;
290  SCMutexLock(&q->mutex_q);
291  PacketEnqueue(q, p);
292  SCCondSignal(&q->cond_q);
293  SCMutexUnlock(&q->mutex_q);
294 }
295 
296 #ifdef UNITTESTS
297 
298 static int TmqhOutputFlowSetupCtxTest01(void)
299 {
300  TmqResetQueues();
301 
302  Tmq *tmq1 = TmqCreateQueue("queue1");
303  FAIL_IF_NULL(tmq1);
304  Tmq *tmq2 = TmqCreateQueue("queue2");
305  FAIL_IF_NULL(tmq2);
306  Tmq *tmq3 = TmqCreateQueue("another");
307  FAIL_IF_NULL(tmq3);
308  Tmq *tmq4 = TmqCreateQueue("yetanother");
309  FAIL_IF_NULL(tmq4);
310 
311  const char *str = "queue1,queue2,another,yetanother";
312  void *ctx = TmqhOutputFlowSetupCtx(str);
313  FAIL_IF_NULL(ctx);
314 
315  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
316 
317  FAIL_IF_NOT(fctx->size == 4);
318 
319  FAIL_IF_NULL(fctx->queues);
320 
321  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
322  FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
323  FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
324  FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
325 
326  TmqhOutputFlowFreeCtx(fctx);
327  TmqResetQueues();
328  PASS;
329 }
330 
331 static int TmqhOutputFlowSetupCtxTest02(void)
332 {
333  TmqResetQueues();
334 
335  Tmq *tmq1 = TmqCreateQueue("queue1");
336  FAIL_IF_NULL(tmq1);
337  Tmq *tmq2 = TmqCreateQueue("queue2");
338  FAIL_IF_NULL(tmq2);
339  Tmq *tmq3 = TmqCreateQueue("another");
340  FAIL_IF_NULL(tmq3);
341  Tmq *tmq4 = TmqCreateQueue("yetanother");
342  FAIL_IF_NULL(tmq4);
343 
344  const char *str = "queue1";
345  void *ctx = TmqhOutputFlowSetupCtx(str);
346  FAIL_IF_NULL(ctx);
347 
348  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
349 
350  FAIL_IF_NOT(fctx->size == 1);
351 
352  FAIL_IF_NULL(fctx->queues);
353 
354  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
355  TmqhOutputFlowFreeCtx(fctx);
356  TmqResetQueues();
357 
358  PASS;
359 }
360 
361 static int TmqhOutputFlowSetupCtxTest03(void)
362 {
363  TmqResetQueues();
364 
365  const char *str = "queue1,queue2,another,yetanother";
366  void *ctx = TmqhOutputFlowSetupCtx(str);
367  FAIL_IF_NULL(ctx);
368 
369  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
370 
371  FAIL_IF_NOT(fctx->size == 4);
372 
373  FAIL_IF_NULL(fctx->queues);
374 
375  Tmq *tmq1 = TmqGetQueueByName("queue1");
376  FAIL_IF_NULL(tmq1);
377  Tmq *tmq2 = TmqGetQueueByName("queue2");
378  FAIL_IF_NULL(tmq2);
379  Tmq *tmq3 = TmqGetQueueByName("another");
380  FAIL_IF_NULL(tmq3);
381  Tmq *tmq4 = TmqGetQueueByName("yetanother");
382  FAIL_IF_NULL(tmq4);
383 
384  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
385  FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
386  FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
387  FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
388 
389  TmqhOutputFlowFreeCtx(fctx);
390  TmqResetQueues();
391  PASS;
392 }
393 
394 #endif /* UNITTESTS */
395 
397 {
398 #ifdef UNITTESTS
399  UtRegisterTest("TmqhOutputFlowSetupCtxTest01",
400  TmqhOutputFlowSetupCtxTest01);
401  UtRegisterTest("TmqhOutputFlowSetupCtxTest02",
402  TmqhOutputFlowSetupCtxTest02);
403  UtRegisterTest("TmqhOutputFlowSetupCtxTest03",
404  TmqhOutputFlowSetupCtxTest03);
405 #endif
406 }
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
TMQH_FLOW
@ TMQH_FLOW
Definition: tm-queuehandlers.h:31
FAIL_IF_NULL
#define FAIL_IF_NULL(expr)
Fail a test if expression evaluates to NULL.
Definition: util-unittest.h:89
TmqhOutputFlowIPPair
void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p)
select the queue to output based on IP address pair.
Definition: tmqh-flow.c:248
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
UtRegisterTest
void UtRegisterTest(const char *name, int(*TestFn)(void))
Register unit test.
Definition: util-unittest.c:103
TmqhFlowPrintAutofpHandler
void TmqhFlowPrintAutofpHandler(void)
Definition: tmqh-flow.c:84
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:105
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:175
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
Packet_::flags
uint32_t flags
Definition: decode.h:510
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
TmqhFlowRegisterTests
void TmqhFlowRegisterTests(void)
Definition: tmqh-flow.c:396
ctx
struct Thresholds ctx
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
TmqhFlowMode_
Definition: tmqh-flow.h:27
PKT_WANTS_FLOW
#define PKT_WANTS_FLOW
Definition: decode.h:1294
flow-hash.h
packet-queue.h
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
util-unittest.h
FAIL_IF_NOT
#define FAIL_IF_NOT(expr)
Fail a test if expression evaluates to false.
Definition: util-unittest.h:82
TmqhFlowRegister
void TmqhFlowRegister(void)
Definition: tmqh-flow.c:51
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmqhOutputFlowFreeCtx
void TmqhOutputFlowFreeCtx(void *ctx)
Definition: tmqh-flow.c:210
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
ConfGet
int ConfGet(const char *name, const char **vptr)
Retrieve the value of a configuration node.
Definition: conf.c:335
decode.h
TmqhFlowCtx_::size
uint16_t size
Definition: tmqh-flow.h:35
PASS
#define PASS
Pass the test.
Definition: util-unittest.h:105
TmqhFlowCtx_::queues
TmqhFlowMode * queues
Definition: tmqh-flow.h:38
SCCondWait
#define SCCondWait
Definition: threads-debug.h:141
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
Tmqh_::RegisterTests
void(* RegisterTests)(void)
Definition: tm-queuehandlers.h:43
Packet_::sp
Port sp
Definition: decode.h:480
TmqhOutputFlowSetupCtx
void * TmqhOutputFlowSetupCtx(const char *queue_str)
setup the queue handlers ctx
Definition: tmqh-flow.c:165
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
TmqhFlowMode_::q
PacketQueue * q
Definition: tmqh-flow.h:28
Packet_
Definition: decode.h:473
Tmqh_::name
const char * name
Definition: tm-queuehandlers.h:37
conf.h
TmqhOutputFlowHash
void TmqhOutputFlowHash(ThreadVars *t, Packet *p)
Definition: tmqh-flow.c:220
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
FlowGetIpPairProtoHash
uint32_t FlowGetIpPairProtoHash(const Packet *p)
Definition: flow-hash.c:117
tm-queuehandlers.h
PRINT_IF_FUNC
#define PRINT_IF_FUNC(f, msg)
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:90
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
tmqh_table
Tmqh tmqh_table[TMQH_SIZE]
Definition: tm-queuehandlers.c:37
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:216
threadvars.h
Packet_::flow_hash
uint32_t flow_hash
Definition: decode.h:516
str
#define str(s)
Definition: suricata-common.h:291
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmqhInputFlow
Packet * TmqhInputFlow(ThreadVars *t)
Definition: tmqh-flow.c:98
tmqh-flow.h
suricata.h
Address_::family
char family
Definition: decode.h:109
Packet_::dst
Address dst
Definition: decode.h:478
Tmq_
Definition: tm-queues.h:29
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition: counters.c:449
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
Packet_::dp
Port dp
Definition: decode.h:488
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
TmqhFlowCtx_
Ctx for the flow queue handler.
Definition: tmqh-flow.h:34
Packet_::src
Address src
Definition: decode.h:477
TmqResetQueues
void TmqResetQueues(void)
Definition: tm-queues.c:80