suricata
tmqh-flow.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2020 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  *
24  * Simple output queue handler that makes sure all packets of the same flow
25  * are sent to the same queue. We support different kind of q handlers. Have
26  * a look at "autofp-scheduler" conf to further undertsand the various q
27  * handlers we provide.
28  */
29 
30 #include "suricata.h"
31 #include "packet-queue.h"
32 #include "decode.h"
33 #include "threads.h"
34 #include "threadvars.h"
35 #include "tmqh-flow.h"
36 
37 #include "tm-queuehandlers.h"
38 
39 #include "conf.h"
40 #include "util-unittest.h"
41 
45 void *TmqhOutputFlowSetupCtx(const char *queue_str);
46 void TmqhOutputFlowFreeCtx(void *ctx);
47 void TmqhFlowRegisterTests(void);
48 
49 void TmqhFlowRegister(void)
50 {
51  tmqh_table[TMQH_FLOW].name = "flow";
56 
57  const char *scheduler = NULL;
58  if (ConfGet("autofp-scheduler", &scheduler) == 1) {
59  if (strcasecmp(scheduler, "round-robin") == 0) {
60  SCLogNotice("using flow hash instead of round robin");
62  } else if (strcasecmp(scheduler, "active-packets") == 0) {
63  SCLogNotice("using flow hash instead of active packets");
65  } else if (strcasecmp(scheduler, "hash") == 0) {
67  } else if (strcasecmp(scheduler, "ippair") == 0) {
69  } else {
70  SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" "
71  "for autofp-scheduler in conf. Killing engine.",
72  scheduler);
73  exit(EXIT_FAILURE);
74  }
75  } else {
77  }
78 
79  return;
80 }
81 
83 {
84 #define PRINT_IF_FUNC(f, msg) \
85  if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \
86  SCLogConfig("AutoFP mode using \"%s\" flow load balancer", (msg))
87 
90 
91 #undef PRINT_IF_FUNC
92 }
93 
94 /* same as 'simple' */
96 {
97  PacketQueue *q = tv->inq->pq;
98 
100 
101  SCMutexLock(&q->mutex_q);
102  if (q->len == 0) {
103  /* if we have no packets in queue, wait... */
104  SCCondWait(&q->cond_q, &q->mutex_q);
105  }
106 
107  if (q->len > 0) {
108  Packet *p = PacketDequeue(q);
109  SCMutexUnlock(&q->mutex_q);
110  return p;
111  } else {
112  /* return NULL if we have no pkt. Should only happen on signals. */
113  SCMutexUnlock(&q->mutex_q);
114  return NULL;
115  }
116 }
117 
118 static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
119 {
120  void *ptmp;
121  Tmq *tmq = TmqGetQueueByName(name);
122  if (tmq == NULL) {
123  tmq = TmqCreateQueue(name);
124  if (tmq == NULL)
125  return -1;
126  }
127  tmq->writer_cnt++;
128 
129  if (ctx->queues == NULL) {
130  ctx->size = 1;
131  ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode));
132  if (ctx->queues == NULL) {
133  return -1;
134  }
135  memset(ctx->queues, 0, ctx->size * sizeof(TmqhFlowMode));
136  } else {
137  ctx->size++;
138  ptmp = SCRealloc(ctx->queues, ctx->size * sizeof(TmqhFlowMode));
139  if (ptmp == NULL) {
140  SCFree(ctx->queues);
141  ctx->queues = NULL;
142  return -1;
143  }
144  ctx->queues = ptmp;
145 
146  memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
147  }
148  ctx->queues[ctx->size - 1].q = tmq->pq;
149 
150  return 0;
151 }
152 
153 /**
154  * \brief setup the queue handlers ctx
155  *
156  * Parses a comma separated string "queuename1,queuename2,etc"
157  * and sets the ctx up to devide flows over these queue's.
158  *
159  * \param queue_str comma separated string with output queue names
160  *
161  * \retval ctx queues handlers ctx or NULL in error
162  */
163 void *TmqhOutputFlowSetupCtx(const char *queue_str)
164 {
165  if (queue_str == NULL || strlen(queue_str) == 0)
166  return NULL;
167 
168  SCLogDebug("queue_str %s", queue_str);
169 
170  TmqhFlowCtx *ctx = SCMalloc(sizeof(TmqhFlowCtx));
171  if (unlikely(ctx == NULL))
172  return NULL;
173  memset(ctx,0x00,sizeof(TmqhFlowCtx));
174 
175  char *str = SCStrdup(queue_str);
176  if (unlikely(str == NULL)) {
177  goto error;
178  }
179  char *tstr = str;
180 
181  /* parse the comma separated string */
182  do {
183  char *comma = strchr(tstr,',');
184  if (comma != NULL) {
185  *comma = '\0';
186  char *qname = tstr;
187  int r = StoreQueueId(ctx,qname);
188  if (r < 0)
189  goto error;
190  } else {
191  char *qname = tstr;
192  int r = StoreQueueId(ctx,qname);
193  if (r < 0)
194  goto error;
195  }
196  tstr = comma ? (comma + 1) : comma;
197  } while (tstr != NULL);
198 
199  SCFree(str);
200  return (void *)ctx;
201 
202 error:
203  SCFree(ctx);
204  if (str != NULL)
205  SCFree(str);
206  return NULL;
207 }
208 
209 void TmqhOutputFlowFreeCtx(void *ctx)
210 {
211  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
212 
213  SCLogPerf("AutoFP - Total flow handler queues - %" PRIu16,
214  fctx->size);
215  SCFree(fctx->queues);
216  SCFree(fctx);
217 
218  return;
219 }
220 
222 {
223  uint32_t qid;
224  TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
225 
226  if (p->flags & PKT_WANTS_FLOW) {
227  uint32_t hash = p->flow_hash;
228  qid = hash % ctx->size;
229  } else {
230  qid = ctx->last++;
231 
232  if (ctx->last == ctx->size)
233  ctx->last = 0;
234  }
235 
236  PacketQueue *q = ctx->queues[qid].q;
237  SCMutexLock(&q->mutex_q);
238  PacketEnqueue(q, p);
239  SCCondSignal(&q->cond_q);
240  SCMutexUnlock(&q->mutex_q);
241 
242  return;
243 }
244 
245 /**
246  * \brief select the queue to output based on IP address pair.
247  *
248  * \param tv thread vars.
249  * \param p packet.
250  */
252 {
253  uint32_t addr_hash = 0;
254 
255  TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
256 
257  if (p->src.family == AF_INET6) {
258  for (int i = 0; i < 4; i++) {
259  addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
260  }
261  } else {
262  addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
263  }
264 
265  uint32_t qid = addr_hash % ctx->size;
266  PacketQueue *q = ctx->queues[qid].q;
267  SCMutexLock(&q->mutex_q);
268  PacketEnqueue(q, p);
269  SCCondSignal(&q->cond_q);
270  SCMutexUnlock(&q->mutex_q);
271 
272  return;
273 }
274 
275 #ifdef UNITTESTS
276 
277 static int TmqhOutputFlowSetupCtxTest01(void)
278 {
279  TmqResetQueues();
280 
281  Tmq *tmq1 = TmqCreateQueue("queue1");
282  FAIL_IF_NULL(tmq1);
283  Tmq *tmq2 = TmqCreateQueue("queue2");
284  FAIL_IF_NULL(tmq2);
285  Tmq *tmq3 = TmqCreateQueue("another");
286  FAIL_IF_NULL(tmq3);
287  Tmq *tmq4 = TmqCreateQueue("yetanother");
288  FAIL_IF_NULL(tmq4);
289 
290  const char *str = "queue1,queue2,another,yetanother";
291  void *ctx = TmqhOutputFlowSetupCtx(str);
292  FAIL_IF_NULL(ctx);
293 
294  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
295 
296  FAIL_IF_NOT(fctx->size == 4);
297 
298  FAIL_IF_NULL(fctx->queues);
299 
300  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
301  FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
302  FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
303  FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
304 
305  TmqhOutputFlowFreeCtx(fctx);
306  TmqResetQueues();
307  PASS;
308 }
309 
310 static int TmqhOutputFlowSetupCtxTest02(void)
311 {
312  TmqResetQueues();
313 
314  Tmq *tmq1 = TmqCreateQueue("queue1");
315  FAIL_IF_NULL(tmq1);
316  Tmq *tmq2 = TmqCreateQueue("queue2");
317  FAIL_IF_NULL(tmq2);
318  Tmq *tmq3 = TmqCreateQueue("another");
319  FAIL_IF_NULL(tmq3);
320  Tmq *tmq4 = TmqCreateQueue("yetanother");
321  FAIL_IF_NULL(tmq4);
322 
323  const char *str = "queue1";
324  void *ctx = TmqhOutputFlowSetupCtx(str);
325  FAIL_IF_NULL(ctx);
326 
327  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
328 
329  FAIL_IF_NOT(fctx->size == 1);
330 
331  FAIL_IF_NULL(fctx->queues);
332 
333  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
334  TmqhOutputFlowFreeCtx(fctx);
335  TmqResetQueues();
336 
337  PASS;
338 }
339 
340 static int TmqhOutputFlowSetupCtxTest03(void)
341 {
342  TmqResetQueues();
343 
344  const char *str = "queue1,queue2,another,yetanother";
345  void *ctx = TmqhOutputFlowSetupCtx(str);
346  FAIL_IF_NULL(ctx);
347 
348  TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
349 
350  FAIL_IF_NOT(fctx->size == 4);
351 
352  FAIL_IF_NULL(fctx->queues);
353 
354  Tmq *tmq1 = TmqGetQueueByName("queue1");
355  FAIL_IF_NULL(tmq1);
356  Tmq *tmq2 = TmqGetQueueByName("queue2");
357  FAIL_IF_NULL(tmq2);
358  Tmq *tmq3 = TmqGetQueueByName("another");
359  FAIL_IF_NULL(tmq3);
360  Tmq *tmq4 = TmqGetQueueByName("yetanother");
361  FAIL_IF_NULL(tmq4);
362 
363  FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
364  FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
365  FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
366  FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
367 
368  TmqhOutputFlowFreeCtx(fctx);
369  TmqResetQueues();
370  PASS;
371 }
372 
373 #endif /* UNITTESTS */
374 
376 {
377 #ifdef UNITTESTS
378  UtRegisterTest("TmqhOutputFlowSetupCtxTest01",
379  TmqhOutputFlowSetupCtxTest01);
380  UtRegisterTest("TmqhOutputFlowSetupCtxTest02",
381  TmqhOutputFlowSetupCtxTest02);
382  UtRegisterTest("TmqhOutputFlowSetupCtxTest03",
383  TmqhOutputFlowSetupCtxTest03);
384 #endif
385 
386  return;
387 }
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
FAIL_IF_NULL
#define FAIL_IF_NULL(expr)
Fail a test if expression evaluates to NULL.
Definition: util-unittest.h:89
TmqhOutputFlowIPPair
void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p)
select the queue to output based on IP address pair.
Definition: tmqh-flow.c:251
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
UtRegisterTest
void UtRegisterTest(const char *name, int(*TestFn)(void))
Register unit test.
Definition: util-unittest.c:103
TmqhFlowPrintAutofpHandler
void TmqhFlowPrintAutofpHandler(void)
Definition: tmqh-flow.c:82
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:105
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:298
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:173
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:47
Packet_::flags
uint32_t flags
Definition: decode.h:449
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
TmqhFlowRegisterTests
void TmqhFlowRegisterTests(void)
Definition: tmqh-flow.c:375
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
TmqhFlowMode_
Definition: tmqh-flow.h:27
PKT_WANTS_FLOW
#define PKT_WANTS_FLOW
Definition: decode.h:1130
packet-queue.h
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
StatsSyncCountersIfSignalled
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:137
util-unittest.h
FAIL_IF_NOT
#define FAIL_IF_NOT(expr)
Fail a test if expression to true.
Definition: util-unittest.h:82
TmqhFlowRegister
void TmqhFlowRegister(void)
Definition: tmqh-flow.c:49
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmqhOutputFlowFreeCtx
void TmqhOutputFlowFreeCtx(void *ctx)
Definition: tmqh-flow.c:209
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:54
ConfGet
int ConfGet(const char *name, const char **vptr)
Retrieve the value of a configuration node.
Definition: conf.c:330
decode.h
TmqhFlowCtx_::size
uint16_t size
Definition: tmqh-flow.h:35
PASS
#define PASS
Pass the test.
Definition: util-unittest.h:105
TmqhFlowCtx_::queues
TmqhFlowMode * queues
Definition: tmqh-flow.h:38
SCCondWait
#define SCCondWait
Definition: threads-debug.h:141
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
Tmqh_::RegisterTests
void(* RegisterTests)(void)
Definition: tm-queuehandlers.h:43
TmqhOutputFlowSetupCtx
void * TmqhOutputFlowSetupCtx(const char *queue_str)
setup the queue handlers ctx
Definition: tmqh-flow.c:163
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
TmqhFlowMode_::q
PacketQueue * q
Definition: tmqh-flow.h:28
TmqhFlowCtx_::last
uint16_t last
Definition: tmqh-flow.h:36
Packet_
Definition: decode.h:414
Tmqh_::name
const char * name
Definition: tm-queuehandlers.h:37
conf.h
TmqhOutputFlowHash
void TmqhOutputFlowHash(ThreadVars *t, Packet *p)
Definition: tmqh-flow.c:221
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:55
SC_ERR_INVALID_YAML_CONF_ENTRY
@ SC_ERR_INVALID_YAML_CONF_ENTRY
Definition: util-error.h:169
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
tm-queuehandlers.h
PRINT_IF_FUNC
#define PRINT_IF_FUNC(f, msg)
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:90
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:224
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:50
SCLogError
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:257
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
tmqh_table
Tmqh tmqh_table[TMQH_SIZE]
Definition: tm-queuehandlers.c:37
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:29
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:212
threadvars.h
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::flow_hash
uint32_t flow_hash
Definition: decode.h:455
str
#define str(s)
Definition: suricata-common.h:273
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TMQH_FLOW
@ TMQH_FLOW
Definition: tm-queuehandlers.h:31
TmqhInputFlow
Packet * TmqhInputFlow(ThreadVars *t)
Definition: tmqh-flow.c:95
tmqh-flow.h
suricata.h
Address_::family
char family
Definition: decode.h:117
Packet_::dst
Address dst
Definition: decode.h:419
Tmq_
Definition: tm-queues.h:29
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:232
TmqhFlowCtx_
Ctx for the flow queue handler.
Definition: tmqh-flow.h:34
Packet_::src
Address src
Definition: decode.h:418
TmqResetQueues
void TmqResetQueues(void)
Definition: tm-queues.c:80