suricata
flow-manager.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2013 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22  * \author Victor Julien <victor@inliniac.net>
23  */
24 
25 #include "suricata-common.h"
26 #include "suricata.h"
27 #include "decode.h"
28 #include "conf.h"
29 #include "threadvars.h"
30 #include "tm-threads.h"
31 #include "runmodes.h"
32 
33 #include "util-random.h"
34 #include "util-time.h"
35 
36 #include "flow.h"
37 #include "flow-queue.h"
38 #include "flow-hash.h"
39 #include "flow-util.h"
40 #include "flow-var.h"
41 #include "flow-private.h"
42 #include "flow-timeout.h"
43 #include "flow-manager.h"
44 
45 #include "stream-tcp-private.h"
46 #include "stream-tcp-reassemble.h"
47 #include "stream-tcp.h"
48 
49 #include "util-unittest.h"
50 #include "util-unittest-helper.h"
51 #include "util-byte.h"
52 
53 #include "util-debug.h"
54 #include "util-privs.h"
55 #include "util-signal.h"
56 
57 #include "threads.h"
58 #include "detect.h"
59 #include "detect-engine-state.h"
60 #include "stream.h"
61 
62 #include "app-layer-parser.h"
63 
64 #include "host-timeout.h"
65 #include "defrag-timeout.h"
66 #include "ippair-timeout.h"
67 
68 #include "output-flow.h"
69 
70 /* Run mode selected at suricata.c */
71 extern int run_mode;
72 
73 /* multi flow mananger support */
74 static uint32_t flowmgr_number = 1;
75 /* atomic counter for flow managers, to assign instance id */
76 SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
77 
78 /* multi flow recycler support */
79 static uint32_t flowrec_number = 1;
80 /* atomic counter for flow recyclers, to assign instance id */
81 SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
82 
83 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
84 
85 
88 
89 void FlowTimeoutsInit(void)
90 {
91  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
92 }
93 
95 {
96  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
97 }
98 
99 /* 1 seconds */
100 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
101 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
102 /* 0.1 seconds */
103 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
104 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 100000
105 #define NEW_FLOW_COUNT_COND 10
106 
107 typedef struct FlowTimeoutCounters_ {
108  uint32_t new;
109  uint32_t est;
110  uint32_t clo;
111  uint32_t byp;
112  uint32_t tcp_reuse;
113 
114  uint32_t flows_checked;
115  uint32_t flows_notimeout;
116  uint32_t flows_timeout;
118  uint32_t flows_removed;
119 
120  uint32_t rows_checked;
121  uint32_t rows_skipped;
122  uint32_t rows_empty;
123  uint32_t rows_busy;
124  uint32_t rows_maxlen;
126 
127 /**
128  * \brief Used to disable flow manager thread(s).
129  *
130  * \todo Kinda hackish since it uses the tv name to identify flow manager
131  * thread. We need an all weather identification scheme.
132  */
134 {
135 #ifdef AFLFUZZ_DISABLE_MGTTHREADS
136  return;
137 #endif
138  ThreadVars *tv = NULL;
139 
140  /* wake up threads */
141  uint32_t u;
142  for (u = 0; u < flowmgr_number; u++)
144 
146  /* flow manager thread(s) is/are a part of mgmt threads */
147  tv = tv_root[TVT_MGMT];
148  while (tv != NULL)
149  {
150  if (strncasecmp(tv->name, thread_name_flow_mgr,
151  strlen(thread_name_flow_mgr)) == 0)
152  {
154  }
155  tv = tv->next;
156  }
158 
159  double total_wait_time = 0;
160  /* value in seconds */
161  #define THREAD_KILL_MAX_WAIT_TIME 60
162  /* value in microseconds */
163  #define WAIT_TIME 100
164 
165 again:
167 
168  tv = tv_root[TVT_MGMT];
169  while (tv != NULL)
170  {
171  if (strncasecmp(tv->name, thread_name_flow_mgr,
172  strlen(thread_name_flow_mgr)) == 0)
173  {
176 
177  usleep(WAIT_TIME);
178  total_wait_time += WAIT_TIME / 1000000.0;
179  if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) {
180  SCLogError(SC_ERR_FATAL, "Engine unable to "
181  "disable detect thread - \"%s\". "
182  "Killing engine", tv->name);
183  exit(EXIT_FAILURE);
184  }
185  goto again;
186  }
187  }
188  tv = tv->next;
189  }
191 
192  /* wake up threads, another try */
193  for (u = 0; u < flowmgr_number; u++)
195 
196  /* reset count, so we can kill and respawn (unix socket) */
197  SC_ATOMIC_SET(flowmgr_cnt, 0);
198  return;
199 }
200 
201 /** \internal
202  * \brief get timeout for flow
203  *
204  * \param f flow
205  * \param state flow state
206  *
207  * \retval timeout timeout in seconds
208  */
209 static inline uint32_t FlowGetFlowTimeout(const Flow *f, enum FlowState state)
210 {
211  uint32_t timeout;
212  FlowProtoTimeoutPtr flow_timeouts = SC_ATOMIC_GET(flow_timeouts);
213  switch(state) {
214  default:
215  case FLOW_STATE_NEW:
216  timeout = flow_timeouts[f->protomap].new_timeout;
217  break;
219  timeout = flow_timeouts[f->protomap].est_timeout;
220  break;
221  case FLOW_STATE_CLOSED:
222  timeout = flow_timeouts[f->protomap].closed_timeout;
223  break;
225  timeout = FLOW_BYPASSED_TIMEOUT;
226  break;
228  timeout = flow_timeouts[f->protomap].bypassed_timeout;
229  break;
230  }
231  return timeout;
232 }
233 
234 /** \internal
235  * \brief check if a flow is timed out
236  *
237  * \param f flow
238  * \param ts timestamp
239  *
240  * \retval 0 not timed out
241  * \retval 1 timed out
242  */
243 static int FlowManagerFlowTimeout(const Flow *f, enum FlowState state, struct timeval *ts, int32_t *next_ts)
244 {
245  /* set the timeout value according to the flow operating mode,
246  * flow's state and protocol.*/
247  uint32_t timeout = FlowGetFlowTimeout(f, state);
248 
249  int32_t flow_times_out_at = (int32_t)(f->lastts.tv_sec + timeout);
250  if (*next_ts == 0 || flow_times_out_at < *next_ts)
251  *next_ts = flow_times_out_at;
252 
253  /* do the timeout check */
254  if (flow_times_out_at >= ts->tv_sec) {
255  return 0;
256  }
257 
258  return 1;
259 }
260 
261 /** \internal
262  * \brief See if we can really discard this flow. Check use_cnt reference
263  * counter and force reassembly if necessary.
264  *
265  * \param f flow
266  * \param ts timestamp
267  *
268  * \retval 0 not timed out just yet
269  * \retval 1 fully timed out, lets kill it
270  */
271 static int FlowManagerFlowTimedOut(Flow *f, struct timeval *ts)
272 {
273  /* never prune a flow that is used by a packet we
274  * are currently processing in one of the threads */
275  if (SC_ATOMIC_GET(f->use_cnt) > 0) {
276  return 0;
277  }
278 
279  int server = 0, client = 0;
280 
281  if (!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
282  FlowForceReassemblyNeedReassembly(f, &server, &client) == 1) {
283  FlowForceReassemblyForFlow(f, server, client);
284  return 0;
285  }
286 #ifdef DEBUG
287  /* this should not be possible */
288  BUG_ON(SC_ATOMIC_GET(f->use_cnt) > 0);
289 #endif
290 
291  return 1;
292 }
293 
294 /**
295  * \internal
296  *
297  * \brief check all flows in a hash row for timing out
298  *
299  * \param f last flow in the hash row
300  * \param ts timestamp
301  * \param emergency bool indicating emergency mode
302  * \param counters ptr to FlowTimeoutCounters structure
303  *
304  * \retval cnt timed out flows
305  */
306 static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
307  int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
308 {
309  uint32_t cnt = 0;
310  uint32_t checked = 0;
311 
312  do {
313  checked++;
314 
315  /* check flow timeout based on lastts and state. Both can be
316  * accessed w/o Flow lock as we do have the hash row lock (so flow
317  * can't disappear) and flow_state is atomic. lastts can only
318  * be modified when we have both the flow and hash row lock */
319 
320  enum FlowState state = SC_ATOMIC_GET(f->flow_state);
321 
322  /* timeout logic goes here */
323  if (FlowManagerFlowTimeout(f, state, ts, next_ts) == 0) {
324 
325  counters->flows_notimeout++;
326 
327  f = f->hprev;
328  continue;
329  }
330 
331  /* before grabbing the flow lock, make sure we have at least
332  * 3 packets in the pool */
334 
335  FLOWLOCK_WRLOCK(f);
336 
337  Flow *next_flow = f->hprev;
338 
339  counters->flows_timeout++;
340 
341  /* check if the flow is fully timed out and
342  * ready to be discarded. */
343  if (FlowManagerFlowTimedOut(f, ts) == 1) {
344  /* remove from the hash */
345  if (f->hprev != NULL)
346  f->hprev->hnext = f->hnext;
347  if (f->hnext != NULL)
348  f->hnext->hprev = f->hprev;
349  if (f->fb->head == f)
350  f->fb->head = f->hnext;
351  if (f->fb->tail == f)
352  f->fb->tail = f->hprev;
353 
354  f->hnext = NULL;
355  f->hprev = NULL;
356 
357  if (f->flags & FLOW_TCP_REUSED)
358  counters->tcp_reuse++;
359 
360  if (state == FLOW_STATE_NEW)
362  else if (state == FLOW_STATE_ESTABLISHED)
364  else if (state == FLOW_STATE_CLOSED)
366  else if (state == FLOW_STATE_LOCAL_BYPASSED)
368  else if (state == FLOW_STATE_CAPTURE_BYPASSED)
370 
371  if (emergency)
374 
375  /* no one is referring to this flow, use_cnt 0, removed from hash
376  * so we can unlock it and pass it to the flow recycler */
377  FLOWLOCK_UNLOCK(f);
379 
380  cnt++;
381 
382  switch (state) {
383  case FLOW_STATE_NEW:
384  default:
385  counters->new++;
386  break;
388  counters->est++;
389  break;
390  case FLOW_STATE_CLOSED:
391  counters->clo++;
392  break;
395  counters->byp++;
396  break;
397  }
398  counters->flows_removed++;
399  } else {
400  counters->flows_timeout_inuse++;
401  FLOWLOCK_UNLOCK(f);
402  }
403 
404  f = next_flow;
405  } while (f != NULL);
406 
407  counters->flows_checked += checked;
408  if (checked > counters->rows_maxlen)
409  counters->rows_maxlen = checked;
410 
411  return cnt;
412 }
413 
414 /**
415  * \brief time out flows from the hash
416  *
417  * \param ts timestamp
418  * \param try_cnt number of flows to time out max (0 is unlimited)
419  * \param hash_min min hash index to consider
420  * \param hash_max max hash index to consider
421  * \param counters ptr to FlowTimeoutCounters structure
422  *
423  * \retval cnt number of timed out flow
424  */
425 static uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt,
426  uint32_t hash_min, uint32_t hash_max,
427  FlowTimeoutCounters *counters)
428 {
429  uint32_t idx = 0;
430  uint32_t cnt = 0;
431  int emergency = 0;
432 
433  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
434  emergency = 1;
435 
436  for (idx = hash_min; idx < hash_max; idx++) {
437  FlowBucket *fb = &flow_hash[idx];
438 
439  counters->rows_checked++;
440 
441  int32_t check_ts = SC_ATOMIC_GET(fb->next_ts);
442  if (check_ts > (int32_t)ts->tv_sec) {
443  counters->rows_skipped++;
444  continue;
445  }
446 
447  /* before grabbing the row lock, make sure we have at least
448  * 9 packets in the pool */
450 
451  if (FBLOCK_TRYLOCK(fb) != 0) {
452  counters->rows_busy++;
453  continue;
454  }
455 
456  /* flow hash bucket is now locked */
457 
458  if (fb->tail == NULL) {
459  SC_ATOMIC_SET(fb->next_ts, INT_MAX);
460  counters->rows_empty++;
461  goto next;
462  }
463 
464  int32_t next_ts = 0;
465 
466  /* we have a flow, or more than one */
467  cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters, &next_ts);
468 
469  SC_ATOMIC_SET(fb->next_ts, next_ts);
470 
471 next:
472  FBLOCK_UNLOCK(fb);
473 
474  if (try_cnt > 0 && cnt >= try_cnt)
475  break;
476  }
477 
478  return cnt;
479 }
480 
481 /**
482  * \internal
483  *
484  * \brief move all flows out of a hash row
485  *
486  * \param f last flow in the hash row
487  *
488  * \retval cnt removed out flows
489  */
490 static uint32_t FlowManagerHashRowCleanup(Flow *f)
491 {
492  uint32_t cnt = 0;
493 
494  do {
495  FLOWLOCK_WRLOCK(f);
496 
497  Flow *next_flow = f->hprev;
498 
499  int state = SC_ATOMIC_GET(f->flow_state);
500 
501  /* remove from the hash */
502  if (f->hprev != NULL)
503  f->hprev->hnext = f->hnext;
504  if (f->hnext != NULL)
505  f->hnext->hprev = f->hprev;
506  if (f->fb->head == f)
507  f->fb->head = f->hnext;
508  if (f->fb->tail == f)
509  f->fb->tail = f->hprev;
510 
511  f->hnext = NULL;
512  f->hprev = NULL;
513 
514  if (state == FLOW_STATE_NEW)
516  else if (state == FLOW_STATE_ESTABLISHED)
518  else if (state == FLOW_STATE_CLOSED)
520 
522 
523  /* no one is referring to this flow, use_cnt 0, removed from hash
524  * so we can unlock it and move it to the recycle queue. */
525  FLOWLOCK_UNLOCK(f);
526 
528 
529  cnt++;
530 
531  f = next_flow;
532  } while (f != NULL);
533 
534  return cnt;
535 }
536 
537 /**
538  * \brief remove all flows from the hash
539  *
540  * \retval cnt number of removes out flows
541  */
542 static uint32_t FlowCleanupHash(void){
543  uint32_t idx = 0;
544  uint32_t cnt = 0;
545 
546  for (idx = 0; idx < flow_config.hash_size; idx++) {
547  FlowBucket *fb = &flow_hash[idx];
548 
549  FBLOCK_LOCK(fb);
550 
551  if (fb->tail != NULL) {
552  /* we have a flow, or more than one */
553  cnt += FlowManagerHashRowCleanup(fb->tail);
554  }
555 
556  FBLOCK_UNLOCK(fb);
557  }
558 
559  return cnt;
560 }
561 
562 extern int g_detect_disabled;
563 
564 typedef struct FlowManagerThreadData_ {
565  uint32_t instance;
566  uint32_t min;
567  uint32_t max;
568 
573  uint16_t flow_mgr_spare;
576  uint16_t flow_tcp_reuse;
577 
583 
589 
591 
592 static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
593 {
595  if (ftd == NULL)
596  return TM_ECODE_FAILED;
597 
598  ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
599  SCLogDebug("flow manager instance %u", ftd->instance);
600 
601  /* set the min and max value used for hash row walking
602  * each thread has it's own section of the flow hash */
603  uint32_t range = flow_config.hash_size / flowmgr_number;
604  if (ftd->instance == 1)
605  ftd->max = range;
606  else if (ftd->instance == flowmgr_number) {
607  ftd->min = (range * (ftd->instance - 1));
608  ftd->max = flow_config.hash_size;
609  } else {
610  ftd->min = (range * (ftd->instance - 1));
611  ftd->max = (range * ftd->instance);
612  }
614 
615  SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
616 
617  /* pass thread data back to caller */
618  *data = ftd;
619 
620  ftd->flow_mgr_cnt_clo = StatsRegisterCounter("flow_mgr.closed_pruned", t);
621  ftd->flow_mgr_cnt_new = StatsRegisterCounter("flow_mgr.new_pruned", t);
622  ftd->flow_mgr_cnt_est = StatsRegisterCounter("flow_mgr.est_pruned", t);
623  ftd->flow_mgr_cnt_byp = StatsRegisterCounter("flow_mgr.bypassed_pruned", t);
624  ftd->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
625  ftd->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
626  ftd->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
627  ftd->flow_tcp_reuse = StatsRegisterCounter("flow.tcp_reuse", t);
628 
629  ftd->flow_mgr_flows_checked = StatsRegisterCounter("flow_mgr.flows_checked", t);
630  ftd->flow_mgr_flows_notimeout = StatsRegisterCounter("flow_mgr.flows_notimeout", t);
631  ftd->flow_mgr_flows_timeout = StatsRegisterCounter("flow_mgr.flows_timeout", t);
632  ftd->flow_mgr_flows_timeout_inuse = StatsRegisterCounter("flow_mgr.flows_timeout_inuse", t);
633  ftd->flow_mgr_flows_removed = StatsRegisterCounter("flow_mgr.flows_removed", t);
634 
635  ftd->flow_mgr_rows_checked = StatsRegisterCounter("flow_mgr.rows_checked", t);
636  ftd->flow_mgr_rows_skipped = StatsRegisterCounter("flow_mgr.rows_skipped", t);
637  ftd->flow_mgr_rows_empty = StatsRegisterCounter("flow_mgr.rows_empty", t);
638  ftd->flow_mgr_rows_busy = StatsRegisterCounter("flow_mgr.rows_busy", t);
639  ftd->flow_mgr_rows_maxlen = StatsRegisterCounter("flow_mgr.rows_maxlen", t);
640 
641  PacketPoolInit();
642  return TM_ECODE_OK;
643 }
644 
645 static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
646 {
648  SCFree(data);
649  return TM_ECODE_OK;
650 }
651 
652 
653 /** \brief Thread that manages the flow table and times out flows.
654  *
655  * \param td ThreadVars casted to void ptr
656  *
657  * Keeps an eye on the spare list, alloc flows if needed...
658  */
659 static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
660 {
661  FlowManagerThreadData *ftd = thread_data;
662  struct timeval ts;
663  uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
664  int emerg = FALSE;
665  int prev_emerg = FALSE;
666  struct timespec cond_time;
667  int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
668  int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
669 /* VJ leaving disabled for now, as hosts are only used by tags and the numbers
670  * are really low. Might confuse ppl
671  uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v);
672  uint16_t flow_mgr_host_active = StatsRegisterCounter("hosts.active", th_v);
673  uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v);
674 */
675  memset(&ts, 0, sizeof(ts));
676 
677  while (1)
678  {
679  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
683  }
684 
685  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
686  emerg = TRUE;
687 
688  if (emerg == TRUE && prev_emerg == FALSE) {
689  prev_emerg = TRUE;
690 
691  SCLogDebug("Flow emergency mode entered...");
692 
693  StatsIncr(th_v, ftd->flow_emerg_mode_enter);
694  }
695  }
696 
697  /* Get the time */
698  memset(&ts, 0, sizeof(ts));
699  TimeGet(&ts);
700  SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
701 
702  /* see if we still have enough spare flows */
703  if (ftd->instance == 1)
705 
706  /* try to time out flows */
707  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0};
708  FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters);
709 
710 
711  if (ftd->instance == 1) {
712  DefragTimeoutHash(&ts);
713  //uint32_t hosts_pruned =
714  HostTimeoutHash(&ts);
715  IPPairTimeoutHash(&ts);
716  }
717 /*
718  StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned);
719  uint32_t hosts_active = HostGetActiveCount();
720  StatsSetUI64(th_v, flow_mgr_host_active, (uint64_t)hosts_active);
721  uint32_t hosts_spare = HostGetSpareCount();
722  StatsSetUI64(th_v, flow_mgr_host_spare, (uint64_t)hosts_spare);
723 */
724  StatsAddUI64(th_v, ftd->flow_mgr_cnt_clo, (uint64_t)counters.clo);
725  StatsAddUI64(th_v, ftd->flow_mgr_cnt_new, (uint64_t)counters.new);
726  StatsAddUI64(th_v, ftd->flow_mgr_cnt_est, (uint64_t)counters.est);
727  StatsAddUI64(th_v, ftd->flow_mgr_cnt_byp, (uint64_t)counters.byp);
728  StatsAddUI64(th_v, ftd->flow_tcp_reuse, (uint64_t)counters.tcp_reuse);
729 
730  StatsSetUI64(th_v, ftd->flow_mgr_flows_checked, (uint64_t)counters.flows_checked);
731  StatsSetUI64(th_v, ftd->flow_mgr_flows_notimeout, (uint64_t)counters.flows_notimeout);
732  StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout, (uint64_t)counters.flows_timeout);
733  StatsSetUI64(th_v, ftd->flow_mgr_flows_removed, (uint64_t)counters.flows_removed);
734  StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout_inuse, (uint64_t)counters.flows_timeout_inuse);
735 
736  StatsSetUI64(th_v, ftd->flow_mgr_rows_checked, (uint64_t)counters.rows_checked);
737  StatsSetUI64(th_v, ftd->flow_mgr_rows_skipped, (uint64_t)counters.rows_skipped);
738  StatsSetUI64(th_v, ftd->flow_mgr_rows_maxlen, (uint64_t)counters.rows_maxlen);
739  StatsSetUI64(th_v, ftd->flow_mgr_rows_busy, (uint64_t)counters.rows_busy);
740  StatsSetUI64(th_v, ftd->flow_mgr_rows_empty, (uint64_t)counters.rows_empty);
741 
742  uint32_t len = 0;
744  len = flow_spare_q.len;
746  StatsSetUI64(th_v, ftd->flow_mgr_spare, (uint64_t)len);
747 
748  /* Don't fear, FlowManagerThread is here...
749  * clear emergency bit if we have at least xx flows pruned. */
750  if (emerg == TRUE) {
751  SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
752  "flow_spare_q status: %"PRIu32"%% flows at the queue",
753  len, flow_config.prealloc, len * 100 / flow_config.prealloc);
754  /* only if we have pruned this "emergency_recovery" percentage
755  * of flows, we will unset the emergency bit */
757  SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
758 
760 
761  emerg = FALSE;
762  prev_emerg = FALSE;
763 
764  flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
765  flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
766  SCLogInfo("Flow emergency mode over, back to normal... unsetting"
767  " FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", "
768  "ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32
769  "%% flows at the queue", (uintmax_t)ts.tv_sec,
770  (uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
771 
772  StatsIncr(th_v, ftd->flow_emerg_mode_over);
773  } else {
774  flow_update_delay_sec = FLOW_EMERG_MODE_UPDATE_DELAY_SEC;
775  flow_update_delay_nsec = FLOW_EMERG_MODE_UPDATE_DELAY_NSEC;
776  }
777  }
778 
779  if (TmThreadsCheckFlag(th_v, THV_KILL)) {
780  StatsSyncCounters(th_v);
781  break;
782  }
783 
784  cond_time.tv_sec = time(NULL) + flow_update_delay_sec;
785  cond_time.tv_nsec = flow_update_delay_nsec;
788  &cond_time);
790 
791  SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
792 
794  }
795 
796  SCLogPerf("%" PRIu32 " new flows, %" PRIu32 " established flows were "
797  "timed out, %"PRIu32" flows in closed state", new_cnt,
798  established_cnt, closing_cnt);
799 
800  return TM_ECODE_OK;
801 }
802 
803 /** \brief spawn the flow manager thread */
805 {
806 #ifdef AFLFUZZ_DISABLE_MGTTHREADS
807  return;
808 #endif
809  intmax_t setting = 1;
810  (void)ConfGetInt("flow.managers", &setting);
811 
812  if (setting < 1 || setting > 1024) {
814  "invalid flow.managers setting %"PRIdMAX, setting);
815  exit(EXIT_FAILURE);
816  }
817  flowmgr_number = (uint32_t)setting;
818 
819  SCLogConfig("using %u flow manager threads", flowmgr_number);
822 
824 
825  uint32_t u;
826  for (u = 0; u < flowmgr_number; u++)
827  {
828  ThreadVars *tv_flowmgr = NULL;
829 
830  char name[TM_THREAD_NAME_MAX];
831  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
832 
833  tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
834  "FlowManager", 0);
835  BUG_ON(tv_flowmgr == NULL);
836 
837  if (tv_flowmgr == NULL) {
838  printf("ERROR: TmThreadsCreate failed\n");
839  exit(1);
840  }
841  if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
842  printf("ERROR: TmThreadSpawn failed\n");
843  exit(1);
844  }
845  }
846  return;
847 }
848 
849 typedef struct FlowRecyclerThreadData_ {
852 
853 static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
854 {
856  if (ftd == NULL)
857  return TM_ECODE_FAILED;
858 
859  if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
860  SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
861  SCFree(ftd);
862  return TM_ECODE_FAILED;
863  }
864  SCLogDebug("output_thread_data %p", ftd->output_thread_data);
865 
866  *data = ftd;
867  return TM_ECODE_OK;
868 }
869 
870 static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
871 {
873  if (ftd->output_thread_data != NULL)
875 
876  SCFree(data);
877  return TM_ECODE_OK;
878 }
879 
880 /** \brief Thread that manages timed out flows.
881  *
882  * \param td ThreadVars casted to void ptr
883  */
884 static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
885 {
886  struct timeval ts;
887  struct timespec cond_time;
888  int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
889  int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
890  uint64_t recycled_cnt = 0;
891  FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
892  BUG_ON(ftd == NULL);
893 
894  memset(&ts, 0, sizeof(ts));
895 
896  while (1)
897  {
898  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
902  }
903 
904  /* Get the time */
905  memset(&ts, 0, sizeof(ts));
906  TimeGet(&ts);
907  SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
908 
909  uint32_t len = 0;
911  len = flow_recycle_q.len;
913 
914  /* Loop through the queue and clean up all flows in it */
915  if (len) {
916  Flow *f;
917 
918  while ((f = FlowDequeue(&flow_recycle_q)) != NULL) {
919  FLOWLOCK_WRLOCK(f);
920 
921  (void)OutputFlowLog(th_v, ftd->output_thread_data, f);
922 
923  FlowClearMemory (f, f->protomap);
924  FLOWLOCK_UNLOCK(f);
925  FlowMoveToSpare(f);
926  recycled_cnt++;
927  }
928  }
929 
930  SCLogDebug("%u flows to recycle", len);
931 
932  if (TmThreadsCheckFlag(th_v, THV_KILL)) {
933  StatsSyncCounters(th_v);
934  break;
935  }
936 
937  cond_time.tv_sec = time(NULL) + flow_update_delay_sec;
938  cond_time.tv_nsec = flow_update_delay_nsec;
941  &flow_recycler_ctrl_mutex, &cond_time);
943 
944  SCLogDebug("woke up...");
945 
947  }
948 
949  SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
950 
951  return TM_ECODE_OK;
952 }
953 
954 static int FlowRecyclerReadyToShutdown(void)
955 {
956  uint32_t len = 0;
958  len = flow_recycle_q.len;
960 
961  return ((len == 0));
962 }
963 
964 /** \brief spawn the flow recycler thread */
966 {
967 #ifdef AFLFUZZ_DISABLE_MGTTHREADS
968  return;
969 #endif
970  intmax_t setting = 1;
971  (void)ConfGetInt("flow.recyclers", &setting);
972 
973  if (setting < 1 || setting > 1024) {
975  "invalid flow.recyclers setting %"PRIdMAX, setting);
976  exit(EXIT_FAILURE);
977  }
978  flowrec_number = (uint32_t)setting;
979 
980  SCLogConfig("using %u flow recycler threads", flowrec_number);
981 
984 
985 
986  uint32_t u;
987  for (u = 0; u < flowrec_number; u++)
988  {
989  ThreadVars *tv_flowmgr = NULL;
990 
991  char name[TM_THREAD_NAME_MAX];
992  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
993 
994  tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
995  "FlowRecycler", 0);
996  BUG_ON(tv_flowmgr == NULL);
997 
998  if (tv_flowmgr == NULL) {
999  printf("ERROR: TmThreadsCreate failed\n");
1000  exit(1);
1001  }
1002  if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
1003  printf("ERROR: TmThreadSpawn failed\n");
1004  exit(1);
1005  }
1006  }
1007  return;
1008 }
1009 
1010 /**
1011  * \brief Used to disable flow recycler thread(s).
1012  *
1013  * \note this should only be called when the flow manager is already gone
1014  *
1015  * \todo Kinda hackish since it uses the tv name to identify flow recycler
1016  * thread. We need an all weather identification scheme.
1017  */
1019 {
1020 #ifdef AFLFUZZ_DISABLE_MGTTHREADS
1021  return;
1022 #endif
1023  ThreadVars *tv = NULL;
1024  int cnt = 0;
1025 
1026  /* move all flows still in the hash to the recycler queue */
1027  FlowCleanupHash();
1028 
1029  /* make sure all flows are processed */
1030  do {
1032  usleep(10);
1033  } while (FlowRecyclerReadyToShutdown() == 0);
1034 
1035  /* wake up threads */
1036  uint32_t u;
1037  for (u = 0; u < flowrec_number; u++)
1039 
1041  /* flow recycler thread(s) is/are a part of mgmt threads */
1042  tv = tv_root[TVT_MGMT];
1043  while (tv != NULL)
1044  {
1045  if (strncasecmp(tv->name, thread_name_flow_rec,
1046  strlen(thread_name_flow_rec)) == 0)
1047  {
1049  cnt++;
1050  }
1051  tv = tv->next;
1052  }
1054 
1055  double total_wait_time = 0;
1056  /* value in seconds */
1057 #define THREAD_KILL_MAX_WAIT_TIME 60
1058  /* value in microseconds */
1059 #define WAIT_TIME 100
1060 
1061 again:
1063  tv = tv_root[TVT_MGMT];
1064  while (tv != NULL)
1065  {
1066  if (strncasecmp(tv->name, thread_name_flow_rec,
1067  strlen(thread_name_flow_rec)) == 0)
1068  {
1070  if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) {
1071  SCLogError(SC_ERR_FATAL, "Engine unable to "
1072  "disable detect thread - \"%s\". "
1073  "Killing engine", tv->name);
1074  exit(EXIT_FAILURE);
1075  }
1077  usleep(WAIT_TIME);
1078  total_wait_time += WAIT_TIME / 1000000.0;
1079  goto again;
1080  }
1081  }
1082  tv = tv->next;
1083  }
1084 
1085  /* wake up threads, another try */
1086  for (u = 0; u < flowrec_number; u++)
1088 
1090 
1091  /* reset count, so we can kill and respawn (unix socket) */
1092  SC_ATOMIC_SET(flowrec_cnt, 0);
1093  return;
1094 }
1095 
1097 {
1098  tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1099  tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1100  tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1101 // tmm_modules[TMM_FLOWMANAGER].RegisterTests = FlowManagerRegisterTests;
1102  tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1105  SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1106 
1107  SC_ATOMIC_INIT(flowmgr_cnt);
1108  SC_ATOMIC_INIT(flow_timeouts);
1109 }
1110 
1112 {
1113  tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1114  tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1115  tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1116 // tmm_modules[TMM_FLOWRECYCLER].RegisterTests = FlowRecyclerRegisterTests;
1117  tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1120  SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1121 
1122  SC_ATOMIC_INIT(flowrec_cnt);
1123 }
1124 
1125 #ifdef UNITTESTS
1126 
1127 /**
1128  * \test Test the timing out of a flow with a fresh TcpSession
1129  * (just initialized, no data segments) in normal mode.
1130  *
1131  * \retval On success it returns 1 and on failure 0.
1132  */
1133 
1134 static int FlowMgrTest01 (void)
1135 {
1136  TcpSession ssn;
1137  Flow f;
1138  FlowBucket fb;
1139  struct timeval ts;
1140 
1142 
1143  memset(&ssn, 0, sizeof(TcpSession));
1144  memset(&f, 0, sizeof(Flow));
1145  memset(&ts, 0, sizeof(ts));
1146  memset(&fb, 0, sizeof(FlowBucket));
1147 
1148  FBLOCK_INIT(&fb);
1149 
1150  FLOW_INITIALIZE(&f);
1152 
1153  TimeGet(&ts);
1154  f.lastts.tv_sec = ts.tv_sec - 5000;
1155  f.protoctx = &ssn;
1156  f.fb = &fb;
1157 
1158  f.proto = IPPROTO_TCP;
1159 
1160  int32_t next_ts = 0;
1161  int state = SC_ATOMIC_GET(f.flow_state);
1162  if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
1163  FBLOCK_DESTROY(&fb);
1164  FLOW_DESTROY(&f);
1166  return 0;
1167  }
1168 
1169  FBLOCK_DESTROY(&fb);
1170  FLOW_DESTROY(&f);
1171 
1173  return 1;
1174 }
1175 
1176 /**
1177  * \test Test the timing out of a flow with a TcpSession
1178  * (with data segments) in normal mode.
1179  *
1180  * \retval On success it returns 1 and on failure 0.
1181  */
1182 
1183 static int FlowMgrTest02 (void)
1184 {
1185  TcpSession ssn;
1186  Flow f;
1187  FlowBucket fb;
1188  struct timeval ts;
1189  TcpSegment seg;
1190  TcpStream client;
1191 
1193 
1194  memset(&ssn, 0, sizeof(TcpSession));
1195  memset(&f, 0, sizeof(Flow));
1196  memset(&fb, 0, sizeof(FlowBucket));
1197  memset(&ts, 0, sizeof(ts));
1198  memset(&seg, 0, sizeof(TcpSegment));
1199  memset(&client, 0, sizeof(TcpStream));
1200 
1201  FBLOCK_INIT(&fb);
1202  FLOW_INITIALIZE(&f);
1204 
1205  TimeGet(&ts);
1206  TCP_SEG_LEN(&seg) = 3;
1207  TCPSEG_RB_INSERT(&client.seg_tree, &seg);
1208  ssn.client = client;
1209  ssn.server = client;
1210  ssn.state = TCP_ESTABLISHED;
1211  f.lastts.tv_sec = ts.tv_sec - 5000;
1212  f.protoctx = &ssn;
1213  f.fb = &fb;
1214  f.proto = IPPROTO_TCP;
1215 
1216  int32_t next_ts = 0;
1217  int state = SC_ATOMIC_GET(f.flow_state);
1218  if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
1219  FBLOCK_DESTROY(&fb);
1220  FLOW_DESTROY(&f);
1222  return 0;
1223  }
1224  FBLOCK_DESTROY(&fb);
1225  FLOW_DESTROY(&f);
1227  return 1;
1228 
1229 }
1230 
1231 /**
1232  * \test Test the timing out of a flow with a fresh TcpSession
1233  * (just initialized, no data segments) in emergency mode.
1234  *
1235  * \retval On success it returns 1 and on failure 0.
1236  */
1237 
1238 static int FlowMgrTest03 (void)
1239 {
1240  TcpSession ssn;
1241  Flow f;
1242  FlowBucket fb;
1243  struct timeval ts;
1244 
1246 
1247  memset(&ssn, 0, sizeof(TcpSession));
1248  memset(&f, 0, sizeof(Flow));
1249  memset(&ts, 0, sizeof(ts));
1250  memset(&fb, 0, sizeof(FlowBucket));
1251 
1252  FBLOCK_INIT(&fb);
1253  FLOW_INITIALIZE(&f);
1255 
1256  TimeGet(&ts);
1257  ssn.state = TCP_SYN_SENT;
1258  f.lastts.tv_sec = ts.tv_sec - 300;
1259  f.protoctx = &ssn;
1260  f.fb = &fb;
1261  f.proto = IPPROTO_TCP;
1262  f.flags |= FLOW_EMERGENCY;
1263 
1264  int next_ts = 0;
1265  int state = SC_ATOMIC_GET(f.flow_state);
1266  if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
1267  FBLOCK_DESTROY(&fb);
1268  FLOW_DESTROY(&f);
1270  return 0;
1271  }
1272 
1273  FBLOCK_DESTROY(&fb);
1274  FLOW_DESTROY(&f);
1276  return 1;
1277 }
1278 
1279 /**
1280  * \test Test the timing out of a flow with a TcpSession
1281  * (with data segments) in emergency mode.
1282  *
1283  * \retval On success it returns 1 and on failure 0.
1284  */
1285 
1286 static int FlowMgrTest04 (void)
1287 {
1288 
1289  TcpSession ssn;
1290  Flow f;
1291  FlowBucket fb;
1292  struct timeval ts;
1293  TcpSegment seg;
1294  TcpStream client;
1295 
1297 
1298  memset(&ssn, 0, sizeof(TcpSession));
1299  memset(&f, 0, sizeof(Flow));
1300  memset(&fb, 0, sizeof(FlowBucket));
1301  memset(&ts, 0, sizeof(ts));
1302  memset(&seg, 0, sizeof(TcpSegment));
1303  memset(&client, 0, sizeof(TcpStream));
1304 
1305  FBLOCK_INIT(&fb);
1306  FLOW_INITIALIZE(&f);
1308 
1309  TimeGet(&ts);
1310  TCP_SEG_LEN(&seg) = 3;
1311  TCPSEG_RB_INSERT(&client.seg_tree, &seg);
1312  ssn.client = client;
1313  ssn.server = client;
1314  ssn.state = TCP_ESTABLISHED;
1315  f.lastts.tv_sec = ts.tv_sec - 5000;
1316  f.protoctx = &ssn;
1317  f.fb = &fb;
1318  f.proto = IPPROTO_TCP;
1319  f.flags |= FLOW_EMERGENCY;
1320 
1321  int next_ts = 0;
1322  int state = SC_ATOMIC_GET(f.flow_state);
1323  if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
1324  FBLOCK_DESTROY(&fb);
1325  FLOW_DESTROY(&f);
1327  return 0;
1328  }
1329 
1330  FBLOCK_DESTROY(&fb);
1331  FLOW_DESTROY(&f);
1333  return 1;
1334 }
1335 
1336 /**
1337  * \test Test flow allocations when it reach memcap
1338  *
1339  *
1340  * \retval On success it returns 1 and on failure 0.
1341  */
1342 
1343 static int FlowMgrTest05 (void)
1344 {
1345  int result = 0;
1346 
1348  FlowConfig backup;
1349  memcpy(&backup, &flow_config, sizeof(FlowConfig));
1350 
1351  uint32_t ini = 0;
1352  uint32_t end = flow_spare_q.len;
1353  SC_ATOMIC_SET(flow_config.memcap, 10000);
1354  flow_config.prealloc = 100;
1355 
1356  /* Let's get the flow_spare_q empty */
1357  UTHBuildPacketOfFlows(ini, end, 0);
1358 
1359  /* And now let's try to reach the memcap val */
1360  while (FLOW_CHECK_MEMCAP(sizeof(Flow))) {
1361  ini = end + 1;
1362  end = end + 2;
1363  UTHBuildPacketOfFlows(ini, end, 0);
1364  }
1365 
1366  /* should time out normal */
1367  TimeSetIncrementTime(2000);
1368  ini = end + 1;
1369  end = end + 2;;
1370  UTHBuildPacketOfFlows(ini, end, 0);
1371 
1372  struct timeval ts;
1373  TimeGet(&ts);
1374  /* try to time out flows */
1375  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0};
1376  FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters);
1377 
1378  if (flow_recycle_q.len > 0) {
1379  result = 1;
1380  }
1381 
1382  memcpy(&flow_config, &backup, sizeof(FlowConfig));
1383  FlowShutdown();
1384  return result;
1385 }
1386 #endif /* UNITTESTS */
1387 
1388 /**
1389  * \brief Function to register the Flow Unitests.
1390  */
1392 {
1393 #ifdef UNITTESTS
1394  UtRegisterTest("FlowMgrTest01 -- Timeout a flow having fresh TcpSession",
1395  FlowMgrTest01);
1396  UtRegisterTest("FlowMgrTest02 -- Timeout a flow having TcpSession with segments",
1397  FlowMgrTest02);
1398  UtRegisterTest("FlowMgrTest03 -- Timeout a flow in emergency having fresh TcpSession",
1399  FlowMgrTest03);
1400  UtRegisterTest("FlowMgrTest04 -- Timeout a flow in emergency having TcpSession with segments",
1401  FlowMgrTest04);
1402  UtRegisterTest("FlowMgrTest05 -- Test flow Allocations when it reach memcap",
1403  FlowMgrTest05);
1404 #endif /* UNITTESTS */
1405 }
FlowQueue flow_recycle_q
Definition: flow-private.h:94
#define FLOW_TCP_REUSED
Definition: flow.h:50
uint16_t flow_mgr_flows_timeout
Definition: flow-manager.c:580
#define SCCtrlCondSignal
#define FLOW_CHECK_MEMCAP(size)
check if a memory alloc would fit in the memcap
Definition: flow-util.h:131
#define SCLogDebug(...)
Definition: util-debug.h:335
#define FBLOCK_UNLOCK(fb)
Definition: flow-hash.h:70
#define FLOW_TIMEOUT_REASSEMBLY_DONE
Definition: flow.h:90
uint16_t flow_mgr_flows_timeout_inuse
Definition: flow-manager.c:581
uint8_t cap_flags
Definition: tm-modules.h:67
uint32_t bypassed_timeout
Definition: flow.h:476
#define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC
Definition: flow-manager.c:100
struct HtpBodyChunk_ * next
struct Flow_ * hnext
Definition: flow.h:449
void FlowDisableFlowManagerThread(void)
Used to disable flow manager thread(s).
Definition: flow-manager.c:133
#define BUG_ON(x)
uint32_t new_timeout
Definition: flow.h:473
uint8_t flags
Definition: tm-modules.h:70
uint8_t proto
Definition: flow.h:346
#define THV_RUNNING_DONE
Definition: threadvars.h:45
#define FALSE
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:67
#define FLOWLOCK_UNLOCK(fb)
Definition: flow.h:235
#define FLOW_END_FLAG_STATE_ESTABLISHED
Definition: flow.h:202
FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX]
Definition: flow-private.h:86
void PacketPoolWaitForN(int n)
Wait until we have the requested amount of packets in the pool.
#define SCCtrlMutexUnlock(mut)
#define FLOW_END_FLAG_TIMEOUT
Definition: flow.h:205
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
Definition: flow-private.h:87
FlowProtoTimeout * FlowProtoTimeoutPtr
Definition: flow-manager.c:86
int FlowForceReassemblyNeedReassembly(Flow *f, int *server, int *client)
Check if a flow needs forced reassembly, or any other processing.
Definition: flow-timeout.c:278
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:72
uint16_t flow_emerg_mode_enter
Definition: flow-manager.c:574
#define FlowTimeoutsReset()
Definition: flow-manager.h:27
uint32_t prealloc
Definition: flow.h:258
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:108
#define FLOW_EMERG_MODE_UPDATE_DELAY_SEC
Definition: flow-manager.c:103
#define FLOW_QUIET
Definition: flow.h:37
#define FLOW_EMERGENCY
Definition: flow-private.h:37
#define FBLOCK_INIT(fb)
Definition: flow-hash.h:66
#define THV_PAUSED
Definition: threadvars.h:38
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value from our atomic variable.
Definition: util-atomic.h:142
#define FLOWLOCK_WRLOCK(fb)
Definition: flow.h:232
SCCtrlMutex flow_recycler_ctrl_mutex
Definition: flow-manager.h:42
void FlowMoveToSpare(Flow *f)
Transfer a flow from a queue to the spare queue.
Definition: flow-queue.c:146
#define FLOW_END_FLAG_STATE_BYPASSED
Definition: flow.h:208
void FlowMgrRegisterTests(void)
Function to register the Flow Unitests.
SCCtrlCondT flow_manager_ctrl_cond
Definition: flow-manager.h:32
#define TRUE
Flow * head
Definition: flow-hash.h:42
void * protoctx
Definition: flow.h:398
#define FBLOCK_DESTROY(fb)
Definition: flow-hash.h:67
FlowConfig flow_config
Definition: flow-private.h:97
void FlowQueueDestroy(FlowQueue *q)
Destroy a flow queue.
Definition: flow-queue.c:61
struct Flow_ * hprev
Definition: flow.h:450
#define SCCtrlCondInit
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:939
#define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC
Definition: flow-manager.c:101
uint32_t HostTimeoutHash(struct timeval *ts)
time out hosts from the hash
Definition: host-timeout.c:156
void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag)
Set a thread flag.
Definition: tm-threads.c:113
#define SC_ATOMIC_INIT(name)
Initialize the previously declared atomic variable and it&#39;s lock.
Definition: util-atomic.h:82
struct FlowManagerThreadData_ FlowManagerThreadData
#define SCCalloc(nm, a)
Definition: util-mem.h:205
FlowQueue * FlowQueueInit(FlowQueue *q)
Definition: flow-queue.c:47
#define SCMutexUnlock(mut)
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:185
#define FLOW_END_FLAG_STATE_NEW
Definition: flow.h:201
int run_mode
Definition: suricata.c:204
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt)
int FlowUpdateSpareFlows(void)
Make sure we have enough spare flows.
Definition: flow.c:144
SCMutex tv_root_lock
Definition: tm-threads.c:97
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
void TimeGet(struct timeval *tv)
Definition: util-time.c:138
#define TCP_SEG_LEN(seg)
void TmModuleFlowManagerRegister(void)
SCCtrlMutex flow_manager_ctrl_mutex
Definition: flow-manager.h:33
uint32_t IPPairTimeoutHash(struct timeval *ts)
time out ippairs from the hash
Data structures and function prototypes for keeping state for the detection engine.
SC_ATOMIC_EXTERN(unsigned int, flow_flags)
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:294
#define FLOW_DESTROY(f)
Definition: flow-util.h:115
#define THREAD_KILL_MAX_WAIT_TIME
struct ThreadVars_ * next
Definition: threadvars.h:111
uint32_t flows_timeout_inuse
Definition: flow-manager.c:117
#define TM_FLAG_MANAGEMENT_TM
Definition: tm-modules.h:36
void UtRegisterTest(const char *name, int(*TestFn)(void))
Register unit test.
uint32_t est_timeout
Definition: flow.h:474
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition: output-flow.c:90
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1310
#define FLOW_END_FLAG_STATE_CLOSED
Definition: flow.h:203
uint16_t flow_mgr_rows_checked
Definition: flow-manager.c:584
#define SCCtrlCondTimedwait
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:163
Flow * tail
Definition: flow-hash.h:43
void TmThreadsUnsetFlag(ThreadVars *tv, uint16_t flag)
Unset a thread flag.
Definition: tm-threads.c:121
#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC
Definition: flow-manager.c:104
uint16_t flow_mgr_flows_removed
Definition: flow-manager.c:582
uint16_t StatsRegisterGlobalCounter(const char *name, uint64_t(*Func)(void))
Registers a counter, which represents a global value.
Definition: counters.c:1000
#define SCCtrlMutexInit(mut, mutattr)
SCCtrlCondT flow_recycler_ctrl_cond
Definition: flow-manager.h:41
void PacketPoolDestroy(void)
uint32_t UTHBuildPacketOfFlows(uint32_t start, uint32_t end, uint8_t dir)
#define FLOW_BYPASSED_TIMEOUT
Definition: flow-private.h:65
int FlowForceReassemblyForFlow(Flow *f, int server, int client)
Definition: flow-timeout.c:334
#define THV_PAUSE
Definition: threadvars.h:37
struct timeval lastts
Definition: flow.h:356
struct FlowBucket_ * fb
Definition: flow.h:451
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void *initdata, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
Definition: output-flow.c:128
#define THV_KILL
Definition: threadvars.h:39
uint8_t flow_end_flags
Definition: flow.h:404
void PacketPoolInit(void)
const char * name
Definition: tm-modules.h:44
void TimeSetIncrementTime(uint32_t tv_sec)
increment the time in the engine
Definition: util-time.c:167
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:208
uint16_t flow_mgr_rows_skipped
Definition: flow-manager.c:585
struct TCPSEG seg_tree
Flow * FlowDequeue(FlowQueue *q)
remove a flow from the queue
Definition: flow-queue.c:105
#define WAIT_TIME
FlowBucket * flow_hash
Definition: flow-private.h:96
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:254
#define SCFree(a)
Definition: util-mem.h:236
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.h:73
#define FLOW_END_FLAG_EMERGENCY
Definition: flow.h:204
int ConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition: conf.c:437
uint16_t flow_mgr_flows_checked
Definition: flow-manager.c:578
uint64_t FlowGetMemuse(void)
Definition: flow.c:119
void FlowShutdown(void)
shutdown the flow engine
Definition: flow.c:599
uint64_t ts
#define FBLOCK_TRYLOCK(fb)
Definition: flow-hash.h:69
struct FlowTimeoutCounters_ FlowTimeoutCounters
#define SCLogPerf(...)
Definition: util-debug.h:261
uint32_t closed_timeout
Definition: flow.h:475
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:136
#define SCMutexLock(mut)
#define FLOW_INITIALIZE(f)
Definition: flow-util.h:39
struct FlowRecyclerThreadData_ FlowRecyclerThreadData
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
#define TM_THREAD_NAME_MAX
Definition: tm-threads.h:33
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
Definition: output-flow.c:170
void FlowManagerThreadSpawn()
spawn the flow manager thread
Definition: flow-manager.c:804
int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:105
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:193
#define StatsSyncCounters(tv)
Definition: counters.h:133
char name[16]
Definition: threadvars.h:59
int g_detect_disabled
Definition: suricata.c:218
uint32_t len
Definition: flow-queue.h:45
void TmModuleFlowRecyclerRegister(void)
void FlowRecyclerThreadSpawn()
spawn the flow recycler thread
Definition: flow-manager.c:965
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition: flow.c:959
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:94
uint32_t DefragTimeoutHash(struct timeval *ts)
time out tracker from the hash
uint8_t len
Per thread variable structure.
Definition: threadvars.h:57
void FlowTimeoutsEmergency(void)
Definition: flow-manager.c:94
#define FLOW_END_FLAG_SHUTDOWN
Definition: flow.h:207
struct SCLogConfig_ SCLogConfig
Holds the config state used by the logging api.
#define SCCtrlMutexLock(mut)
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:142
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1882
uint8_t protomap
Definition: flow.h:402
Flow data structure.
Definition: flow.h:327
Definition: flow.h:253
uint32_t flags
Definition: flow.h:377
void FlowTimeoutsInit(void)
Definition: flow-manager.c:89
FlowState
Definition: flow.h:464
#define FBLOCK_LOCK(fb)
Definition: flow-hash.h:68
uint16_t flow_mgr_flows_notimeout
Definition: flow-manager.c:579
const char * thread_name_flow_mgr
Definition: runmodes.c:65
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:69
uint32_t emergency_recovery
Definition: flow.h:265
const char * thread_name_flow_rec
Definition: runmodes.c:66
FlowQueue flow_spare_q
Definition: flow-private.h:91
void FlowDisableFlowRecyclerThread(void)
Used to disable flow recycler thread(s).
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1980
void FlowInitConfig(char quiet)
initialize the configuration
Definition: flow.c:444
uint32_t hash_size
Definition: flow.h:256