suricata
flow-manager.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2013 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22  * \author Victor Julien <victor@inliniac.net>
23  */
24 
25 #include "suricata-common.h"
26 #include "suricata.h"
27 #include "decode.h"
28 #include "conf.h"
29 #include "threadvars.h"
30 #include "tm-threads.h"
31 #include "runmodes.h"
32 
33 #include "util-random.h"
34 #include "util-time.h"
35 
36 #include "flow.h"
37 #include "flow-queue.h"
38 #include "flow-hash.h"
39 #include "flow-util.h"
40 #include "flow-var.h"
41 #include "flow-private.h"
42 #include "flow-timeout.h"
43 #include "flow-manager.h"
44 #include "flow-storage.h"
45 
46 #include "stream-tcp-private.h"
47 #include "stream-tcp-reassemble.h"
48 #include "stream-tcp.h"
49 
50 #include "util-unittest.h"
51 #include "util-unittest-helper.h"
52 #include "util-byte.h"
53 
54 #include "util-debug.h"
55 #include "util-privs.h"
56 #include "util-signal.h"
57 
58 #include "threads.h"
59 #include "detect.h"
60 #include "detect-engine-state.h"
61 #include "stream.h"
62 
63 #include "app-layer-parser.h"
64 
65 #include "host-timeout.h"
66 #include "defrag-timeout.h"
67 #include "ippair-timeout.h"
68 
69 #include "output-flow.h"
70 
71 /* Run mode selected at suricata.c */
72 extern int run_mode;
73 
74 /* multi flow mananger support */
75 static uint32_t flowmgr_number = 1;
76 /* atomic counter for flow managers, to assign instance id */
77 SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
78 
79 /* multi flow recycler support */
80 static uint32_t flowrec_number = 1;
81 /* atomic counter for flow recyclers, to assign instance id */
82 SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
83 
84 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
85 
86 
89 
90 void FlowTimeoutsInit(void)
91 {
92  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
93 }
94 
96 {
97  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
98 }
99 
100 /* 1 seconds */
101 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
102 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
103 /* 0.1 seconds */
104 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
105 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 100000
106 #define NEW_FLOW_COUNT_COND 10
107 
108 typedef struct FlowTimeoutCounters_ {
109  uint32_t new;
110  uint32_t est;
111  uint32_t clo;
112  uint32_t byp;
113  uint32_t tcp_reuse;
114 
115  uint32_t flows_checked;
116  uint32_t flows_notimeout;
117  uint32_t flows_timeout;
119  uint32_t flows_removed;
120 
121  uint32_t rows_checked;
122  uint32_t rows_skipped;
123  uint32_t rows_empty;
124  uint32_t rows_busy;
125  uint32_t rows_maxlen;
126 
127  uint32_t bypassed_count;
128  uint64_t bypassed_pkts;
129  uint64_t bypassed_bytes;
131 
132 /**
133  * \brief Used to disable flow manager thread(s).
134  *
135  * \todo Kinda hackish since it uses the tv name to identify flow manager
136  * thread. We need an all weather identification scheme.
137  */
139 {
140 #ifdef AFLFUZZ_DISABLE_MGTTHREADS
141  return;
142 #endif
143  ThreadVars *tv = NULL;
144 
145  /* wake up threads */
146  uint32_t u;
147  for (u = 0; u < flowmgr_number; u++)
149 
151  /* flow manager thread(s) is/are a part of mgmt threads */
152  tv = tv_root[TVT_MGMT];
153  while (tv != NULL)
154  {
155  if (strncasecmp(tv->name, thread_name_flow_mgr,
156  strlen(thread_name_flow_mgr)) == 0)
157  {
159  }
160  tv = tv->next;
161  }
163 
164  struct timeval start_ts;
165  struct timeval cur_ts;
166  gettimeofday(&start_ts, NULL);
167 
168 again:
169  gettimeofday(&cur_ts, NULL);
170  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
171  FatalError(SC_ERR_SHUTDOWN, "unable to get all flow manager "
172  "threads to shutdown in time");
173  }
174 
176  tv = tv_root[TVT_MGMT];
177  while (tv != NULL)
178  {
179  if (strncasecmp(tv->name, thread_name_flow_mgr,
180  strlen(thread_name_flow_mgr)) == 0)
181  {
184  /* sleep outside lock */
185  SleepMsec(1);
186  goto again;
187  }
188  }
189  tv = tv->next;
190  }
192 
193  /* wake up threads, another try */
194  for (u = 0; u < flowmgr_number; u++)
196 
197  /* reset count, so we can kill and respawn (unix socket) */
198  SC_ATOMIC_SET(flowmgr_cnt, 0);
199  return;
200 }
201 
202 /** \internal
203  * \brief get timeout for flow
204  *
205  * \param f flow
206  * \param state flow state
207  *
208  * \retval timeout timeout in seconds
209  */
210 static inline uint32_t FlowGetFlowTimeout(const Flow *f, enum FlowState state)
211 {
212  uint32_t timeout;
213  FlowProtoTimeoutPtr flow_timeouts = SC_ATOMIC_GET(flow_timeouts);
214  switch(state) {
215  default:
216  case FLOW_STATE_NEW:
217  timeout = flow_timeouts[f->protomap].new_timeout;
218  break;
220  timeout = flow_timeouts[f->protomap].est_timeout;
221  break;
222  case FLOW_STATE_CLOSED:
223  timeout = flow_timeouts[f->protomap].closed_timeout;
224  break;
225 #ifdef CAPTURE_OFFLOAD
226  case FLOW_STATE_CAPTURE_BYPASSED:
227  timeout = FLOW_BYPASSED_TIMEOUT;
228  break;
229 #endif
231  timeout = flow_timeouts[f->protomap].bypassed_timeout;
232  break;
233  }
234  return timeout;
235 }
236 
237 /** \internal
238  * \brief check if a flow is timed out
239  *
240  * \param f flow
241  * \param ts timestamp
242  *
243  * \retval 0 not timed out
244  * \retval 1 timed out
245  */
246 static int FlowManagerFlowTimeout(Flow *f, enum FlowState state, struct timeval *ts, int32_t *next_ts)
247 {
248  /* set the timeout value according to the flow operating mode,
249  * flow's state and protocol.*/
250  uint32_t timeout = FlowGetFlowTimeout(f, state);
251 
252  int32_t flow_times_out_at = (int32_t)(f->lastts.tv_sec + timeout);
253  if (*next_ts == 0 || flow_times_out_at < *next_ts)
254  *next_ts = flow_times_out_at;
255 
256  /* do the timeout check */
257  if (flow_times_out_at >= ts->tv_sec) {
258  return 0;
259  }
260 
261  return 1;
262 }
263 
264 static inline int FlowBypassedTimeout(Flow *f, struct timeval *ts,
265  FlowTimeoutCounters *counters)
266 {
267 #ifdef CAPTURE_OFFLOAD
268  if (SC_ATOMIC_GET(f->flow_state) != FLOW_STATE_CAPTURE_BYPASSED) {
269  return 1;
270  }
271 
273  if (fc && fc->BypassUpdate) {
274  /* flow will be possibly updated */
275  uint64_t pkts_tosrc = fc->tosrcpktcnt;
276  uint64_t bytes_tosrc = fc->tosrcbytecnt;
277  uint64_t pkts_todst = fc->todstpktcnt;
278  uint64_t bytes_todst = fc->todstbytecnt;
279  bool update = fc->BypassUpdate(f, fc->bypass_data, ts->tv_sec);
280  if (update) {
281  SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f));
282  pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
283  bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
284  pkts_todst = fc->todstpktcnt - pkts_todst;
285  bytes_todst = fc->todstbytecnt - bytes_todst;
286  if (f->livedev) {
287  SC_ATOMIC_ADD(f->livedev->bypassed,
288  pkts_tosrc + pkts_todst);
289  }
290  counters->bypassed_pkts += pkts_tosrc + pkts_todst;
291  counters->bypassed_bytes += bytes_tosrc + bytes_todst;
292  return 0;
293  } else {
294  SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
295  if (f->livedev) {
296  if (FLOW_IS_IPV4(f)) {
297  LiveDevSubBypassStats(f->livedev, 1, AF_INET);
298  } else if (FLOW_IS_IPV6(f)) {
299  LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
300  }
301  }
302  counters->bypassed_count++;
303  return 1;
304  }
305  }
306 #endif /* CAPTURE_OFFLOAD */
307  return 1;
308 }
309 
310 /** \internal
311  * \brief See if we can really discard this flow. Check use_cnt reference
312  * counter and force reassembly if necessary.
313  *
314  * \param f flow
315  * \param ts timestamp
316  *
317  * \retval 0 not timed out just yet
318  * \retval 1 fully timed out, lets kill it
319  */
320 static inline int FlowManagerFlowTimedOut(Flow *f, struct timeval *ts,
321  FlowTimeoutCounters *counters)
322 {
323  /* never prune a flow that is used by a packet we
324  * are currently processing in one of the threads */
325  if (SC_ATOMIC_GET(f->use_cnt) > 0) {
326  return 0;
327  }
328 
329  if (!FlowBypassedTimeout(f, ts, counters)) {
330  return 0;
331  }
332 
333  int server = 0, client = 0;
334 
335  if (!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
336 #ifdef CAPTURE_OFFLOAD
337  SC_ATOMIC_GET(f->flow_state) != FLOW_STATE_CAPTURE_BYPASSED &&
338 #endif
339  SC_ATOMIC_GET(f->flow_state) != FLOW_STATE_LOCAL_BYPASSED &&
340  FlowForceReassemblyNeedReassembly(f, &server, &client) == 1) {
341  FlowForceReassemblyForFlow(f, server, client);
342  return 0;
343  }
344 #ifdef DEBUG
345  /* this should not be possible */
346  BUG_ON(SC_ATOMIC_GET(f->use_cnt) > 0);
347 #endif
348 
349  return 1;
350 }
351 
352 /**
353  * \internal
354  *
355  * \brief check all flows in a hash row for timing out
356  *
357  * \param f last flow in the hash row
358  * \param ts timestamp
359  * \param emergency bool indicating emergency mode
360  * \param counters ptr to FlowTimeoutCounters structure
361  *
362  * \retval cnt timed out flows
363  */
364 static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
365  int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
366 {
367  uint32_t cnt = 0;
368  uint32_t checked = 0;
369 
370  do {
371  checked++;
372 
373  /* check flow timeout based on lastts and state. Both can be
374  * accessed w/o Flow lock as we do have the hash row lock (so flow
375  * can't disappear) and flow_state is atomic. lastts can only
376  * be modified when we have both the flow and hash row lock */
377 
378  enum FlowState state = SC_ATOMIC_GET(f->flow_state);
379 
380  /* timeout logic goes here */
381  if (FlowManagerFlowTimeout(f, state, ts, next_ts) == 0) {
382 
383  counters->flows_notimeout++;
384 
385  f = f->hprev;
386  continue;
387  }
388 
389  /* before grabbing the flow lock, make sure we have at least
390  * 3 packets in the pool */
392 
393  FLOWLOCK_WRLOCK(f);
394 
395  Flow *next_flow = f->hprev;
396 
397  counters->flows_timeout++;
398 
399  /* check if the flow is fully timed out and
400  * ready to be discarded. */
401  if (FlowManagerFlowTimedOut(f, ts, counters) == 1) {
402  /* remove from the hash */
403  if (f->hprev != NULL)
404  f->hprev->hnext = f->hnext;
405  if (f->hnext != NULL)
406  f->hnext->hprev = f->hprev;
407  if (f->fb->head == f)
408  f->fb->head = f->hnext;
409  if (f->fb->tail == f)
410  f->fb->tail = f->hprev;
411 
412  f->hnext = NULL;
413  f->hprev = NULL;
414 
415  if (f->flags & FLOW_TCP_REUSED)
416  counters->tcp_reuse++;
417 
418  if (state == FLOW_STATE_NEW)
420  else if (state == FLOW_STATE_ESTABLISHED)
422  else if (state == FLOW_STATE_CLOSED)
424  else if (state == FLOW_STATE_LOCAL_BYPASSED)
426 #ifdef CAPTURE_OFFLOAD
427  else if (state == FLOW_STATE_CAPTURE_BYPASSED)
429 #endif
430 
431  if (emergency)
434 
435  /* no one is referring to this flow, use_cnt 0, removed from hash
436  * so we can unlock it and pass it to the flow recycler */
437  FLOWLOCK_UNLOCK(f);
439 
440  cnt++;
441 
442  switch (state) {
443  case FLOW_STATE_NEW:
444  default:
445  counters->new++;
446  break;
448  counters->est++;
449  break;
450  case FLOW_STATE_CLOSED:
451  counters->clo++;
452  break;
454 #ifdef CAPTURE_OFFLOAD
455  case FLOW_STATE_CAPTURE_BYPASSED:
456 #endif
457  counters->byp++;
458  break;
459  }
460  counters->flows_removed++;
461  } else {
462  counters->flows_timeout_inuse++;
463  FLOWLOCK_UNLOCK(f);
464  }
465 
466  f = next_flow;
467  } while (f != NULL);
468 
469  counters->flows_checked += checked;
470  if (checked > counters->rows_maxlen)
471  counters->rows_maxlen = checked;
472 
473  return cnt;
474 }
475 
476 /**
477  * \brief time out flows from the hash
478  *
479  * \param ts timestamp
480  * \param try_cnt number of flows to time out max (0 is unlimited)
481  * \param hash_min min hash index to consider
482  * \param hash_max max hash index to consider
483  * \param counters ptr to FlowTimeoutCounters structure
484  *
485  * \retval cnt number of timed out flow
486  */
487 static uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt,
488  uint32_t hash_min, uint32_t hash_max,
489  FlowTimeoutCounters *counters)
490 {
491  uint32_t idx = 0;
492  uint32_t cnt = 0;
493  int emergency = 0;
494 
495  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
496  emergency = 1;
497 
498  for (idx = hash_min; idx < hash_max; idx++) {
499  FlowBucket *fb = &flow_hash[idx];
500 
501  counters->rows_checked++;
502 
503  int32_t check_ts = SC_ATOMIC_GET(fb->next_ts);
504  if (check_ts > (int32_t)ts->tv_sec) {
505  counters->rows_skipped++;
506  continue;
507  }
508 
509  /* before grabbing the row lock, make sure we have at least
510  * 9 packets in the pool */
512 
513  if (FBLOCK_TRYLOCK(fb) != 0) {
514  counters->rows_busy++;
515  continue;
516  }
517 
518  /* flow hash bucket is now locked */
519 
520  if (fb->tail == NULL) {
521  SC_ATOMIC_SET(fb->next_ts, INT_MAX);
522  counters->rows_empty++;
523  goto next;
524  }
525 
526  int32_t next_ts = 0;
527 
528  /* we have a flow, or more than one */
529  cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters, &next_ts);
530 
531  SC_ATOMIC_SET(fb->next_ts, next_ts);
532 
533 next:
534  FBLOCK_UNLOCK(fb);
535 
536  if (try_cnt > 0 && cnt >= try_cnt)
537  break;
538  }
539 
540  return cnt;
541 }
542 
543 /**
544  * \internal
545  *
546  * \brief move all flows out of a hash row
547  *
548  * \param f last flow in the hash row
549  *
550  * \retval cnt removed out flows
551  */
552 static uint32_t FlowManagerHashRowCleanup(Flow *f)
553 {
554  uint32_t cnt = 0;
555 
556  do {
557  FLOWLOCK_WRLOCK(f);
558 
559  Flow *next_flow = f->hprev;
560 
561  int state = SC_ATOMIC_GET(f->flow_state);
562 
563  /* remove from the hash */
564  if (f->hprev != NULL)
565  f->hprev->hnext = f->hnext;
566  if (f->hnext != NULL)
567  f->hnext->hprev = f->hprev;
568  if (f->fb->head == f)
569  f->fb->head = f->hnext;
570  if (f->fb->tail == f)
571  f->fb->tail = f->hprev;
572 
573  f->hnext = NULL;
574  f->hprev = NULL;
575 
576  if (state == FLOW_STATE_NEW)
578  else if (state == FLOW_STATE_ESTABLISHED)
580  else if (state == FLOW_STATE_CLOSED)
582 
584 
585  /* no one is referring to this flow, use_cnt 0, removed from hash
586  * so we can unlock it and move it to the recycle queue. */
587  FLOWLOCK_UNLOCK(f);
588 
590 
591  cnt++;
592 
593  f = next_flow;
594  } while (f != NULL);
595 
596  return cnt;
597 }
598 
599 /**
600  * \brief remove all flows from the hash
601  *
602  * \retval cnt number of removes out flows
603  */
604 static uint32_t FlowCleanupHash(void){
605  uint32_t idx = 0;
606  uint32_t cnt = 0;
607 
608  for (idx = 0; idx < flow_config.hash_size; idx++) {
609  FlowBucket *fb = &flow_hash[idx];
610 
611  FBLOCK_LOCK(fb);
612 
613  if (fb->tail != NULL) {
614  /* we have a flow, or more than one */
615  cnt += FlowManagerHashRowCleanup(fb->tail);
616  }
617 
618  FBLOCK_UNLOCK(fb);
619  }
620 
621  return cnt;
622 }
623 
624 extern int g_detect_disabled;
625 
626 typedef struct FlowManagerThreadData_ {
627  uint32_t instance;
628  uint32_t min;
629  uint32_t max;
630 
635  uint16_t flow_mgr_spare;
638  uint16_t flow_tcp_reuse;
639 
645 
651 
655 
657 
658 static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
659 {
661  if (ftd == NULL)
662  return TM_ECODE_FAILED;
663 
664  ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
665  SCLogDebug("flow manager instance %u", ftd->instance);
666 
667  /* set the min and max value used for hash row walking
668  * each thread has it's own section of the flow hash */
669  uint32_t range = flow_config.hash_size / flowmgr_number;
670  if (ftd->instance == 1)
671  ftd->max = range;
672  else if (ftd->instance == flowmgr_number) {
673  ftd->min = (range * (ftd->instance - 1));
674  ftd->max = flow_config.hash_size;
675  } else {
676  ftd->min = (range * (ftd->instance - 1));
677  ftd->max = (range * ftd->instance);
678  }
680 
681  SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
682 
683  /* pass thread data back to caller */
684  *data = ftd;
685 
686  ftd->flow_mgr_cnt_clo = StatsRegisterCounter("flow_mgr.closed_pruned", t);
687  ftd->flow_mgr_cnt_new = StatsRegisterCounter("flow_mgr.new_pruned", t);
688  ftd->flow_mgr_cnt_est = StatsRegisterCounter("flow_mgr.est_pruned", t);
689  ftd->flow_mgr_cnt_byp = StatsRegisterCounter("flow_mgr.bypassed_pruned", t);
690  ftd->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
691  ftd->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
692  ftd->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
693  ftd->flow_tcp_reuse = StatsRegisterCounter("flow.tcp_reuse", t);
694 
695  ftd->flow_mgr_flows_checked = StatsRegisterCounter("flow_mgr.flows_checked", t);
696  ftd->flow_mgr_flows_notimeout = StatsRegisterCounter("flow_mgr.flows_notimeout", t);
697  ftd->flow_mgr_flows_timeout = StatsRegisterCounter("flow_mgr.flows_timeout", t);
698  ftd->flow_mgr_flows_timeout_inuse = StatsRegisterCounter("flow_mgr.flows_timeout_inuse", t);
699  ftd->flow_mgr_flows_removed = StatsRegisterCounter("flow_mgr.flows_removed", t);
700 
701  ftd->flow_mgr_rows_checked = StatsRegisterCounter("flow_mgr.rows_checked", t);
702  ftd->flow_mgr_rows_skipped = StatsRegisterCounter("flow_mgr.rows_skipped", t);
703  ftd->flow_mgr_rows_empty = StatsRegisterCounter("flow_mgr.rows_empty", t);
704  ftd->flow_mgr_rows_busy = StatsRegisterCounter("flow_mgr.rows_busy", t);
705  ftd->flow_mgr_rows_maxlen = StatsRegisterCounter("flow_mgr.rows_maxlen", t);
706 
707  ftd->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
708  ftd->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
709  ftd->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
710 
711  PacketPoolInit();
712  return TM_ECODE_OK;
713 }
714 
715 static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
716 {
718  SCFree(data);
719  return TM_ECODE_OK;
720 }
721 
722 
723 /** \brief Thread that manages the flow table and times out flows.
724  *
725  * \param td ThreadVars casted to void ptr
726  *
727  * Keeps an eye on the spare list, alloc flows if needed...
728  */
729 static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
730 {
731  FlowManagerThreadData *ftd = thread_data;
732  struct timeval ts;
733  uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
734  int emerg = FALSE;
735  int prev_emerg = FALSE;
736  struct timespec cond_time;
737  int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
738  int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
739 /* VJ leaving disabled for now, as hosts are only used by tags and the numbers
740  * are really low. Might confuse ppl
741  uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v);
742  uint16_t flow_mgr_host_active = StatsRegisterCounter("hosts.active", th_v);
743  uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v);
744 */
745  memset(&ts, 0, sizeof(ts));
746 
747  while (1)
748  {
749  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
753  }
754 
755  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
756  emerg = TRUE;
757 
758  if (emerg == TRUE && prev_emerg == FALSE) {
759  prev_emerg = TRUE;
760 
761  SCLogDebug("Flow emergency mode entered...");
762 
763  StatsIncr(th_v, ftd->flow_emerg_mode_enter);
764  }
765  }
766 
767  /* Get the time */
768  memset(&ts, 0, sizeof(ts));
769  TimeGet(&ts);
770  SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
771 
772  /* see if we still have enough spare flows */
773  if (ftd->instance == 1)
775 
776  /* try to time out flows */
777  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0};
778  FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters);
779 
780 
781  if (ftd->instance == 1) {
782  DefragTimeoutHash(&ts);
783  //uint32_t hosts_pruned =
784  HostTimeoutHash(&ts);
785  IPPairTimeoutHash(&ts);
786  }
787 /*
788  StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned);
789  uint32_t hosts_active = HostGetActiveCount();
790  StatsSetUI64(th_v, flow_mgr_host_active, (uint64_t)hosts_active);
791  uint32_t hosts_spare = HostGetSpareCount();
792  StatsSetUI64(th_v, flow_mgr_host_spare, (uint64_t)hosts_spare);
793 */
794  StatsAddUI64(th_v, ftd->flow_mgr_cnt_clo, (uint64_t)counters.clo);
795  StatsAddUI64(th_v, ftd->flow_mgr_cnt_new, (uint64_t)counters.new);
796  StatsAddUI64(th_v, ftd->flow_mgr_cnt_est, (uint64_t)counters.est);
797  StatsAddUI64(th_v, ftd->flow_mgr_cnt_byp, (uint64_t)counters.byp);
798  StatsAddUI64(th_v, ftd->flow_tcp_reuse, (uint64_t)counters.tcp_reuse);
799 
800  StatsSetUI64(th_v, ftd->flow_mgr_flows_checked, (uint64_t)counters.flows_checked);
801  StatsSetUI64(th_v, ftd->flow_mgr_flows_notimeout, (uint64_t)counters.flows_notimeout);
802  StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout, (uint64_t)counters.flows_timeout);
803  StatsSetUI64(th_v, ftd->flow_mgr_flows_removed, (uint64_t)counters.flows_removed);
804  StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout_inuse, (uint64_t)counters.flows_timeout_inuse);
805 
806  StatsSetUI64(th_v, ftd->flow_mgr_rows_checked, (uint64_t)counters.rows_checked);
807  StatsSetUI64(th_v, ftd->flow_mgr_rows_skipped, (uint64_t)counters.rows_skipped);
808  StatsSetUI64(th_v, ftd->flow_mgr_rows_maxlen, (uint64_t)counters.rows_maxlen);
809  StatsSetUI64(th_v, ftd->flow_mgr_rows_busy, (uint64_t)counters.rows_busy);
810  StatsSetUI64(th_v, ftd->flow_mgr_rows_empty, (uint64_t)counters.rows_empty);
811 
812  StatsAddUI64(th_v, ftd->flow_bypassed_cnt_clo, (uint64_t)counters.bypassed_count);
813  StatsAddUI64(th_v, ftd->flow_bypassed_pkts, (uint64_t)counters.bypassed_pkts);
814  StatsAddUI64(th_v, ftd->flow_bypassed_bytes, (uint64_t)counters.bypassed_bytes);
815 
816  uint32_t len = 0;
818  len = flow_spare_q.len;
820  StatsSetUI64(th_v, ftd->flow_mgr_spare, (uint64_t)len);
821 
822  /* Don't fear, FlowManagerThread is here...
823  * clear emergency bit if we have at least xx flows pruned. */
824  if (emerg == TRUE) {
825  SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
826  "flow_spare_q status: %"PRIu32"%% flows at the queue",
827  len, flow_config.prealloc, len * 100 / flow_config.prealloc);
828  /* only if we have pruned this "emergency_recovery" percentage
829  * of flows, we will unset the emergency bit */
831  SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
832 
834 
835  emerg = FALSE;
836  prev_emerg = FALSE;
837 
838  flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
839  flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
840  SCLogInfo("Flow emergency mode over, back to normal... unsetting"
841  " FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", "
842  "ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32
843  "%% flows at the queue", (uintmax_t)ts.tv_sec,
844  (uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
845 
846  StatsIncr(th_v, ftd->flow_emerg_mode_over);
847  } else {
848  flow_update_delay_sec = FLOW_EMERG_MODE_UPDATE_DELAY_SEC;
849  flow_update_delay_nsec = FLOW_EMERG_MODE_UPDATE_DELAY_NSEC;
850  }
851  }
852 
853  if (TmThreadsCheckFlag(th_v, THV_KILL)) {
854  StatsSyncCounters(th_v);
855  break;
856  }
857 
858  cond_time.tv_sec = time(NULL) + flow_update_delay_sec;
859  cond_time.tv_nsec = flow_update_delay_nsec;
862  &cond_time);
864 
865  SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
866 
868  }
869 
870  SCLogPerf("%" PRIu32 " new flows, %" PRIu32 " established flows were "
871  "timed out, %"PRIu32" flows in closed state", new_cnt,
872  established_cnt, closing_cnt);
873 
874  return TM_ECODE_OK;
875 }
876 
877 /** \brief spawn the flow manager thread */
879 {
880 #ifdef AFLFUZZ_DISABLE_MGTTHREADS
881  return;
882 #endif
883  intmax_t setting = 1;
884  (void)ConfGetInt("flow.managers", &setting);
885 
886  if (setting < 1 || setting > 1024) {
888  "invalid flow.managers setting %"PRIdMAX, setting);
889  exit(EXIT_FAILURE);
890  }
891  flowmgr_number = (uint32_t)setting;
892 
893  SCLogConfig("using %u flow manager threads", flowmgr_number);
896 
898 
899  uint32_t u;
900  for (u = 0; u < flowmgr_number; u++)
901  {
902  ThreadVars *tv_flowmgr = NULL;
903 
904  char name[TM_THREAD_NAME_MAX];
905  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
906 
907  tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
908  "FlowManager", 0);
909  BUG_ON(tv_flowmgr == NULL);
910 
911  if (tv_flowmgr == NULL) {
912  printf("ERROR: TmThreadsCreate failed\n");
913  exit(1);
914  }
915  if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
916  printf("ERROR: TmThreadSpawn failed\n");
917  exit(1);
918  }
919  }
920  return;
921 }
922 
923 typedef struct FlowRecyclerThreadData_ {
926 
927 static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
928 {
930  if (ftd == NULL)
931  return TM_ECODE_FAILED;
932 
933  if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
934  SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
935  SCFree(ftd);
936  return TM_ECODE_FAILED;
937  }
938  SCLogDebug("output_thread_data %p", ftd->output_thread_data);
939 
940  *data = ftd;
941  return TM_ECODE_OK;
942 }
943 
944 static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
945 {
947  if (ftd->output_thread_data != NULL)
949 
950  SCFree(data);
951  return TM_ECODE_OK;
952 }
953 
954 /** \brief Thread that manages timed out flows.
955  *
956  * \param td ThreadVars casted to void ptr
957  */
958 static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
959 {
960  struct timeval ts;
961  struct timespec cond_time;
962  int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
963  int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
964  uint64_t recycled_cnt = 0;
965  FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
966  BUG_ON(ftd == NULL);
967 
968  memset(&ts, 0, sizeof(ts));
969 
970  while (1)
971  {
972  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
976  }
977 
978  /* Get the time */
979  memset(&ts, 0, sizeof(ts));
980  TimeGet(&ts);
981  SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
982 
983  uint32_t len = 0;
985  len = flow_recycle_q.len;
987 
988  /* Loop through the queue and clean up all flows in it */
989  if (len) {
990  Flow *f;
991 
992  while ((f = FlowDequeue(&flow_recycle_q)) != NULL) {
993  FLOWLOCK_WRLOCK(f);
994 
995  (void)OutputFlowLog(th_v, ftd->output_thread_data, f);
996 
997  FlowClearMemory (f, f->protomap);
998  FLOWLOCK_UNLOCK(f);
999  FlowMoveToSpare(f);
1000  recycled_cnt++;
1001  }
1002  }
1003 
1004  SCLogDebug("%u flows to recycle", len);
1005 
1006  if (TmThreadsCheckFlag(th_v, THV_KILL)) {
1007  StatsSyncCounters(th_v);
1008  break;
1009  }
1010 
1011  cond_time.tv_sec = time(NULL) + flow_update_delay_sec;
1012  cond_time.tv_nsec = flow_update_delay_nsec;
1015  &flow_recycler_ctrl_mutex, &cond_time);
1017 
1018  SCLogDebug("woke up...");
1019 
1021  }
1022 
1023  SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
1024 
1025  return TM_ECODE_OK;
1026 }
1027 
1028 static int FlowRecyclerReadyToShutdown(void)
1029 {
1030  uint32_t len = 0;
1032  len = flow_recycle_q.len;
1034 
1035  return ((len == 0));
1036 }
1037 
1038 /** \brief spawn the flow recycler thread */
1040 {
1041 #ifdef AFLFUZZ_DISABLE_MGTTHREADS
1042  return;
1043 #endif
1044  intmax_t setting = 1;
1045  (void)ConfGetInt("flow.recyclers", &setting);
1046 
1047  if (setting < 1 || setting > 1024) {
1049  "invalid flow.recyclers setting %"PRIdMAX, setting);
1050  exit(EXIT_FAILURE);
1051  }
1052  flowrec_number = (uint32_t)setting;
1053 
1054  SCLogConfig("using %u flow recycler threads", flowrec_number);
1055 
1058 
1059 
1060  uint32_t u;
1061  for (u = 0; u < flowrec_number; u++)
1062  {
1063  ThreadVars *tv_flowmgr = NULL;
1064 
1065  char name[TM_THREAD_NAME_MAX];
1066  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
1067 
1068  tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
1069  "FlowRecycler", 0);
1070  BUG_ON(tv_flowmgr == NULL);
1071 
1072  if (tv_flowmgr == NULL) {
1073  printf("ERROR: TmThreadsCreate failed\n");
1074  exit(1);
1075  }
1076  if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
1077  printf("ERROR: TmThreadSpawn failed\n");
1078  exit(1);
1079  }
1080  }
1081  return;
1082 }
1083 
1084 /**
1085  * \brief Used to disable flow recycler thread(s).
1086  *
1087  * \note this should only be called when the flow manager is already gone
1088  *
1089  * \todo Kinda hackish since it uses the tv name to identify flow recycler
1090  * thread. We need an all weather identification scheme.
1091  */
1093 {
1094 #ifdef AFLFUZZ_DISABLE_MGTTHREADS
1095  return;
1096 #endif
1097  ThreadVars *tv = NULL;
1098  int cnt = 0;
1099 
1100  /* move all flows still in the hash to the recycler queue */
1101  FlowCleanupHash();
1102 
1103  /* make sure all flows are processed */
1104  do {
1106  usleep(10);
1107  } while (FlowRecyclerReadyToShutdown() == 0);
1108 
1109  /* wake up threads */
1110  uint32_t u;
1111  for (u = 0; u < flowrec_number; u++)
1113 
1115  /* flow recycler thread(s) is/are a part of mgmt threads */
1116  tv = tv_root[TVT_MGMT];
1117  while (tv != NULL)
1118  {
1119  if (strncasecmp(tv->name, thread_name_flow_rec,
1120  strlen(thread_name_flow_rec)) == 0)
1121  {
1123  cnt++;
1124  }
1125  tv = tv->next;
1126  }
1128 
1129  struct timeval start_ts;
1130  struct timeval cur_ts;
1131  gettimeofday(&start_ts, NULL);
1132 
1133 again:
1134  gettimeofday(&cur_ts, NULL);
1135  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1136  FatalError(SC_ERR_SHUTDOWN, "unable to get all flow recycler "
1137  "threads to shutdown in time");
1138  }
1139 
1141  tv = tv_root[TVT_MGMT];
1142  while (tv != NULL)
1143  {
1144  if (strncasecmp(tv->name, thread_name_flow_rec,
1145  strlen(thread_name_flow_rec)) == 0)
1146  {
1149  /* sleep outside lock */
1150  SleepMsec(1);
1151  goto again;
1152  }
1153  }
1154  tv = tv->next;
1155  }
1157 
1158  /* wake up threads, another try */
1159  for (u = 0; u < flowrec_number; u++)
1161 
1162  /* reset count, so we can kill and respawn (unix socket) */
1163  SC_ATOMIC_SET(flowrec_cnt, 0);
1164  return;
1165 }
1166 
1168 {
1169  tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1170  tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1171  tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1172 // tmm_modules[TMM_FLOWMANAGER].RegisterTests = FlowManagerRegisterTests;
1173  tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1176  SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1177 
1178  SC_ATOMIC_INIT(flowmgr_cnt);
1179  SC_ATOMIC_INIT(flow_timeouts);
1180 }
1181 
1183 {
1184  tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1185  tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1186  tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1187 // tmm_modules[TMM_FLOWRECYCLER].RegisterTests = FlowRecyclerRegisterTests;
1188  tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1191  SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1192 
1193  SC_ATOMIC_INIT(flowrec_cnt);
1194 }
1195 
1196 #ifdef UNITTESTS
1197 
1198 /**
1199  * \test Test the timing out of a flow with a fresh TcpSession
1200  * (just initialized, no data segments) in normal mode.
1201  *
1202  * \retval On success it returns 1 and on failure 0.
1203  */
1204 
1205 static int FlowMgrTest01 (void)
1206 {
1207  TcpSession ssn;
1208  Flow f;
1209  FlowBucket fb;
1210  struct timeval ts;
1211 
1213 
1214  memset(&ssn, 0, sizeof(TcpSession));
1215  memset(&f, 0, sizeof(Flow));
1216  memset(&ts, 0, sizeof(ts));
1217  memset(&fb, 0, sizeof(FlowBucket));
1218 
1219  FBLOCK_INIT(&fb);
1220 
1221  FLOW_INITIALIZE(&f);
1223 
1224  TimeGet(&ts);
1225  f.lastts.tv_sec = ts.tv_sec - 5000;
1226  f.protoctx = &ssn;
1227  f.fb = &fb;
1228 
1229  f.proto = IPPROTO_TCP;
1230 
1231  int32_t next_ts = 0;
1232  int state = SC_ATOMIC_GET(f.flow_state);
1233  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0};
1234  if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts, &counters) != 1) {
1235  FBLOCK_DESTROY(&fb);
1236  FLOW_DESTROY(&f);
1238  return 0;
1239  }
1240 
1241  FBLOCK_DESTROY(&fb);
1242  FLOW_DESTROY(&f);
1243 
1245  return 1;
1246 }
1247 
1248 /**
1249  * \test Test the timing out of a flow with a TcpSession
1250  * (with data segments) in normal mode.
1251  *
1252  * \retval On success it returns 1 and on failure 0.
1253  */
1254 
1255 static int FlowMgrTest02 (void)
1256 {
1257  TcpSession ssn;
1258  Flow f;
1259  FlowBucket fb;
1260  struct timeval ts;
1261  TcpSegment seg;
1262  TcpStream client;
1263 
1265 
1266  memset(&ssn, 0, sizeof(TcpSession));
1267  memset(&f, 0, sizeof(Flow));
1268  memset(&fb, 0, sizeof(FlowBucket));
1269  memset(&ts, 0, sizeof(ts));
1270  memset(&seg, 0, sizeof(TcpSegment));
1271  memset(&client, 0, sizeof(TcpStream));
1272 
1273  FBLOCK_INIT(&fb);
1274  FLOW_INITIALIZE(&f);
1276 
1277  TimeGet(&ts);
1278  TCP_SEG_LEN(&seg) = 3;
1279  TCPSEG_RB_INSERT(&client.seg_tree, &seg);
1280  ssn.client = client;
1281  ssn.server = client;
1282  ssn.state = TCP_ESTABLISHED;
1283  f.lastts.tv_sec = ts.tv_sec - 5000;
1284  f.protoctx = &ssn;
1285  f.fb = &fb;
1286  f.proto = IPPROTO_TCP;
1287 
1288  int32_t next_ts = 0;
1289  int state = SC_ATOMIC_GET(f.flow_state);
1290  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0};
1291  if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts, &counters) != 1) {
1292  FBLOCK_DESTROY(&fb);
1293  FLOW_DESTROY(&f);
1295  return 0;
1296  }
1297  FBLOCK_DESTROY(&fb);
1298  FLOW_DESTROY(&f);
1300  return 1;
1301 
1302 }
1303 
1304 /**
1305  * \test Test the timing out of a flow with a fresh TcpSession
1306  * (just initialized, no data segments) in emergency mode.
1307  *
1308  * \retval On success it returns 1 and on failure 0.
1309  */
1310 
1311 static int FlowMgrTest03 (void)
1312 {
1313  TcpSession ssn;
1314  Flow f;
1315  FlowBucket fb;
1316  struct timeval ts;
1317 
1319 
1320  memset(&ssn, 0, sizeof(TcpSession));
1321  memset(&f, 0, sizeof(Flow));
1322  memset(&ts, 0, sizeof(ts));
1323  memset(&fb, 0, sizeof(FlowBucket));
1324 
1325  FBLOCK_INIT(&fb);
1326  FLOW_INITIALIZE(&f);
1328 
1329  TimeGet(&ts);
1330  ssn.state = TCP_SYN_SENT;
1331  f.lastts.tv_sec = ts.tv_sec - 300;
1332  f.protoctx = &ssn;
1333  f.fb = &fb;
1334  f.proto = IPPROTO_TCP;
1335  f.flags |= FLOW_EMERGENCY;
1336 
1337  int next_ts = 0;
1338  int state = SC_ATOMIC_GET(f.flow_state);
1339  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0};
1340  if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts, &counters) != 1) {
1341  FBLOCK_DESTROY(&fb);
1342  FLOW_DESTROY(&f);
1344  return 0;
1345  }
1346 
1347  FBLOCK_DESTROY(&fb);
1348  FLOW_DESTROY(&f);
1350  return 1;
1351 }
1352 
1353 /**
1354  * \test Test the timing out of a flow with a TcpSession
1355  * (with data segments) in emergency mode.
1356  *
1357  * \retval On success it returns 1 and on failure 0.
1358  */
1359 
1360 static int FlowMgrTest04 (void)
1361 {
1362 
1363  TcpSession ssn;
1364  Flow f;
1365  FlowBucket fb;
1366  struct timeval ts;
1367  TcpSegment seg;
1368  TcpStream client;
1369 
1371 
1372  memset(&ssn, 0, sizeof(TcpSession));
1373  memset(&f, 0, sizeof(Flow));
1374  memset(&fb, 0, sizeof(FlowBucket));
1375  memset(&ts, 0, sizeof(ts));
1376  memset(&seg, 0, sizeof(TcpSegment));
1377  memset(&client, 0, sizeof(TcpStream));
1378 
1379  FBLOCK_INIT(&fb);
1380  FLOW_INITIALIZE(&f);
1382 
1383  TimeGet(&ts);
1384  TCP_SEG_LEN(&seg) = 3;
1385  TCPSEG_RB_INSERT(&client.seg_tree, &seg);
1386  ssn.client = client;
1387  ssn.server = client;
1388  ssn.state = TCP_ESTABLISHED;
1389  f.lastts.tv_sec = ts.tv_sec - 5000;
1390  f.protoctx = &ssn;
1391  f.fb = &fb;
1392  f.proto = IPPROTO_TCP;
1393  f.flags |= FLOW_EMERGENCY;
1394 
1395  int next_ts = 0;
1396  int state = SC_ATOMIC_GET(f.flow_state);
1397  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0};
1398  if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts, &counters) != 1) {
1399  FBLOCK_DESTROY(&fb);
1400  FLOW_DESTROY(&f);
1402  return 0;
1403  }
1404 
1405  FBLOCK_DESTROY(&fb);
1406  FLOW_DESTROY(&f);
1408  return 1;
1409 }
1410 
1411 /**
1412  * \test Test flow allocations when it reach memcap
1413  *
1414  *
1415  * \retval On success it returns 1 and on failure 0.
1416  */
1417 
1418 static int FlowMgrTest05 (void)
1419 {
1420  int result = 0;
1421 
1423  FlowConfig backup;
1424  memcpy(&backup, &flow_config, sizeof(FlowConfig));
1425 
1426  uint32_t ini = 0;
1427  uint32_t end = flow_spare_q.len;
1428  SC_ATOMIC_SET(flow_config.memcap, 10000);
1429  flow_config.prealloc = 100;
1430 
1431  /* Let's get the flow_spare_q empty */
1432  UTHBuildPacketOfFlows(ini, end, 0);
1433 
1434  /* And now let's try to reach the memcap val */
1435  while (FLOW_CHECK_MEMCAP(sizeof(Flow))) {
1436  ini = end + 1;
1437  end = end + 2;
1438  UTHBuildPacketOfFlows(ini, end, 0);
1439  }
1440 
1441  /* should time out normal */
1442  TimeSetIncrementTime(2000);
1443  ini = end + 1;
1444  end = end + 2;;
1445  UTHBuildPacketOfFlows(ini, end, 0);
1446 
1447  struct timeval ts;
1448  TimeGet(&ts);
1449  /* try to time out flows */
1450  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0};
1451  FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters);
1452 
1453  if (flow_recycle_q.len > 0) {
1454  result = 1;
1455  }
1456 
1457  memcpy(&flow_config, &backup, sizeof(FlowConfig));
1458  FlowShutdown();
1459  return result;
1460 }
1461 #endif /* UNITTESTS */
1462 
1463 /**
1464  * \brief Function to register the Flow Unitests.
1465  */
1467 {
1468 #ifdef UNITTESTS
1469  UtRegisterTest("FlowMgrTest01 -- Timeout a flow having fresh TcpSession",
1470  FlowMgrTest01);
1471  UtRegisterTest("FlowMgrTest02 -- Timeout a flow having TcpSession with segments",
1472  FlowMgrTest02);
1473  UtRegisterTest("FlowMgrTest03 -- Timeout a flow in emergency having fresh TcpSession",
1474  FlowMgrTest03);
1475  UtRegisterTest("FlowMgrTest04 -- Timeout a flow in emergency having TcpSession with segments",
1476  FlowMgrTest04);
1477  UtRegisterTest("FlowMgrTest05 -- Test flow Allocations when it reach memcap",
1478  FlowMgrTest05);
1479 #endif /* UNITTESTS */
1480 }
FlowQueue flow_recycle_q
Definition: flow-private.h:94
#define FLOW_TCP_REUSED
Definition: flow.h:51
#define FLOW_IS_IPV4(f)
Definition: flow.h:134
uint16_t flow_mgr_flows_timeout
Definition: flow-manager.c:642
#define FLOW_CHECK_MEMCAP(size)
check if a memory alloc would fit in the memcap
Definition: flow-util.h:137
#define SCLogDebug(...)
Definition: util-debug.h:335
#define FBLOCK_UNLOCK(fb)
Definition: flow-hash.h:70
uint64_t tosrcbytecnt
Definition: flow.h:492
#define FLOW_TIMEOUT_REASSEMBLY_DONE
Definition: flow.h:91
uint16_t flow_mgr_flows_timeout_inuse
Definition: flow-manager.c:643
uint8_t cap_flags
Definition: tm-modules.h:67
uint32_t bypassed_timeout
Definition: flow.h:480
#define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC
Definition: flow-manager.c:101
struct HtpBodyChunk_ * next
struct Flow_ * hnext
Definition: flow.h:451
void FlowDisableFlowManagerThread(void)
Used to disable flow manager thread(s).
Definition: flow-manager.c:138
#define BUG_ON(x)
uint32_t new_timeout
Definition: flow.h:477
uint8_t flags
Definition: tm-modules.h:70
uint8_t proto
Definition: flow.h:344
#define THV_RUNNING_DONE
Definition: threadvars.h:45
#define FALSE
void LiveDevSubBypassStats(LiveDevice *dev, uint64_t cnt, int family)
Definition: util-device.c:565
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:67
#define FLOWLOCK_UNLOCK(fb)
Definition: flow.h:243
#define FLOW_END_FLAG_STATE_ESTABLISHED
Definition: flow.h:210
FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX]
Definition: flow-private.h:86
void PacketPoolWaitForN(int n)
Wait until we have the requested amount of packets in the pool.
#define FLOW_END_FLAG_TIMEOUT
Definition: flow.h:213
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
Definition: flow-private.h:87
FlowProtoTimeout * FlowProtoTimeoutPtr
Definition: flow-manager.c:87
int FlowForceReassemblyNeedReassembly(Flow *f, int *server, int *client)
Check if a flow needs forced reassembly, or any other processing.
Definition: flow-timeout.c:285
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:72
uint16_t flow_emerg_mode_enter
Definition: flow-manager.c:636
#define FlowTimeoutsReset()
Definition: flow-manager.h:27
#define SCCtrlCondInit
uint32_t prealloc
Definition: flow.h:266
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:107
#define FLOW_EMERG_MODE_UPDATE_DELAY_SEC
Definition: flow-manager.c:104
#define FLOW_QUIET
Definition: flow.h:38
#define FLOW_EMERGENCY
Definition: flow-private.h:37
#define FBLOCK_INIT(fb)
Definition: flow-hash.h:66
#define THV_PAUSED
Definition: threadvars.h:38
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value from our atomic variable.
Definition: util-atomic.h:141
#define FLOWLOCK_WRLOCK(fb)
Definition: flow.h:240
SCCtrlMutex flow_recycler_ctrl_mutex
Definition: flow-manager.h:42
void FlowMoveToSpare(Flow *f)
Transfer a flow from a queue to the spare queue.
Definition: flow-queue.c:146
#define FLOW_END_FLAG_STATE_BYPASSED
Definition: flow.h:216
void FlowMgrRegisterTests(void)
Function to register the Flow Unitests.
SCCtrlCondT flow_manager_ctrl_cond
Definition: flow-manager.h:32
#define TRUE
Flow * head
Definition: flow-hash.h:42
#define SCMutexLock(mut)
uint64_t todstpktcnt
Definition: flow.h:493
void * protoctx
Definition: flow.h:400
#define FBLOCK_DESTROY(fb)
Definition: flow-hash.h:67
FlowConfig flow_config
Definition: flow-private.h:97
void FlowQueueDestroy(FlowQueue *q)
Destroy a flow queue.
Definition: flow-queue.c:61
struct Flow_ * hprev
Definition: flow.h:452
uint64_t todstbytecnt
Definition: flow.h:494
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:943
#define SCCtrlCondSignal
#define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC
Definition: flow-manager.c:102
uint32_t HostTimeoutHash(struct timeval *ts)
time out hosts from the hash
Definition: host-timeout.c:156
void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag)
Set a thread flag.
Definition: tm-threads.c:97
#define SC_ATOMIC_INIT(name)
Initialize the previously declared atomic variable and it&#39;s lock.
Definition: util-atomic.h:81
struct FlowManagerThreadData_ FlowManagerThreadData
#define SCCalloc(nm, a)
Definition: util-mem.h:253
#define SCMutexUnlock(mut)
FlowQueue * FlowQueueInit(FlowQueue *q)
Definition: flow-queue.c:47
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:190
#define FLOW_END_FLAG_STATE_NEW
Definition: flow.h:209
int run_mode
Definition: suricata.c:204
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt)
int FlowUpdateSpareFlows(void)
Make sure we have enough spare flows.
Definition: flow.c:144
SCMutex tv_root_lock
Definition: tm-threads.c:81
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
void TimeGet(struct timeval *tv)
Definition: util-time.c:146
#define TCP_SEG_LEN(seg)
void TmModuleFlowManagerRegister(void)
SCCtrlMutex flow_manager_ctrl_mutex
Definition: flow-manager.h:33
uint32_t IPPairTimeoutHash(struct timeval *ts)
time out ippairs from the hash
Data structures and function prototypes for keeping state for the detection engine.
SC_ATOMIC_EXTERN(unsigned int, flow_flags)
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:294
#define FLOW_DESTROY(f)
Definition: flow-util.h:121
struct ThreadVars_ * next
Definition: threadvars.h:111
uint32_t flows_timeout_inuse
Definition: flow-manager.c:118
#define TM_FLAG_MANAGEMENT_TM
Definition: tm-modules.h:36
void UtRegisterTest(const char *name, int(*TestFn)(void))
Register unit test.
#define SleepMsec(msec)
Definition: tm-threads.h:44
uint32_t est_timeout
Definition: flow.h:478
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition: output-flow.c:91
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1299
#define FLOW_END_FLAG_STATE_CLOSED
Definition: flow.h:211
uint16_t flow_mgr_rows_checked
Definition: flow-manager.c:646
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:168
Flow * tail
Definition: flow-hash.h:43
void * bypass_data
Definition: flow.h:490
void TmThreadsUnsetFlag(ThreadVars *tv, uint16_t flag)
Unset a thread flag.
Definition: tm-threads.c:105
#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC
Definition: flow-manager.c:105
uint16_t flow_mgr_flows_removed
Definition: flow-manager.c:644
uint16_t StatsRegisterGlobalCounter(const char *name, uint64_t(*Func)(void))
Registers a counter, which represents a global value.
Definition: counters.c:1001
SCCtrlCondT flow_recycler_ctrl_cond
Definition: flow-manager.h:41
void PacketPoolDestroy(void)
struct LiveDevice_ * livedev
Definition: flow.h:350
uint32_t UTHBuildPacketOfFlows(uint32_t start, uint32_t end, uint8_t dir)
#define FLOW_BYPASSED_TIMEOUT
Definition: flow-private.h:65
int FlowForceReassemblyForFlow(Flow *f, int server, int client)
Definition: flow-timeout.c:339
#define THV_PAUSE
Definition: threadvars.h:37
struct timeval lastts
Definition: flow.h:358
struct FlowBucket_ * fb
Definition: flow.h:453
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void *initdata, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
Definition: output-flow.c:129
#define THV_KILL
Definition: threadvars.h:39
uint8_t flow_end_flags
Definition: flow.h:406
void PacketPoolInit(void)
const char * name
Definition: tm-modules.h:44
void TimeSetIncrementTime(uint32_t tv_sec)
increment the time in the engine
Definition: util-time.c:175
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:207
uint16_t flow_mgr_rows_skipped
Definition: flow-manager.c:647
struct TCPSEG seg_tree
Flow * FlowDequeue(FlowQueue *q)
remove a flow from the queue
Definition: flow-queue.c:105
FlowBucket * flow_hash
Definition: flow-private.h:96
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:254
#define SCCtrlMutexLock(mut)
#define SCFree(a)
Definition: util-mem.h:322
#define FLOW_IS_IPV6(f)
Definition: flow.h:136
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.h:73
#define FLOW_END_FLAG_EMERGENCY
Definition: flow.h:212
int ConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition: conf.c:437
uint16_t flow_mgr_flows_checked
Definition: flow-manager.c:640
#define SCCtrlCondTimedwait
uint64_t FlowGetMemuse(void)
Definition: flow.c:119
void FlowShutdown(void)
shutdown the flow engine
Definition: flow.c:670
uint64_t ts
#define FBLOCK_TRYLOCK(fb)
Definition: flow-hash.h:69
struct FlowTimeoutCounters_ FlowTimeoutCounters
#define SCLogPerf(...)
Definition: util-debug.h:261
uint32_t closed_timeout
Definition: flow.h:479
#define FatalError(x,...)
Definition: util-debug.h:559
uint64_t tosrcpktcnt
Definition: flow.h:491
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:137
uint16_t flow_bypassed_cnt_clo
Definition: flow-manager.c:652
#define FLOW_INITIALIZE(f)
Definition: flow-util.h:39
struct FlowRecyclerThreadData_ FlowRecyclerThreadData
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
#define TM_THREAD_NAME_MAX
Definition: tm-threads.h:48
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
Definition: output-flow.c:171
void FlowManagerThreadSpawn()
spawn the flow manager thread
Definition: flow-manager.c:878
int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:89
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:192
void * FlowGetStorageById(Flow *f, int id)
Definition: flow-storage.c:39
#define StatsSyncCounters(tv)
Definition: counters.h:134
#define SCCtrlMutexUnlock(mut)
bool(* BypassUpdate)(Flow *f, void *data, time_t tsec)
Definition: flow.h:488
char name[16]
Definition: threadvars.h:59
int g_detect_disabled
Definition: suricata.c:218
uint32_t len
Definition: flow-queue.h:45
void TmModuleFlowRecyclerRegister(void)
void FlowRecyclerThreadSpawn()
spawn the flow recycler thread
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition: flow.c:1030
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:78
int GetFlowBypassInfoID(void)
Definition: flow-util.c:209
uint32_t DefragTimeoutHash(struct timeval *ts)
time out tracker from the hash
uint8_t len
Per thread variable structure.
Definition: threadvars.h:57
void FlowTimeoutsEmergency(void)
Definition: flow-manager.c:95
#define SCCtrlMutexInit(mut, mutattr)
#define FLOW_END_FLAG_SHUTDOWN
Definition: flow.h:215
struct SCLogConfig_ SCLogConfig
Holds the config state used by the logging api.
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:147
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1875
uint8_t protomap
Definition: flow.h:404
Flow data structure.
Definition: flow.h:325
Definition: flow.h:261
uint32_t flags
Definition: flow.h:379
void FlowTimeoutsInit(void)
Definition: flow-manager.c:90
FlowState
Definition: flow.h:466
#define FBLOCK_LOCK(fb)
Definition: flow-hash.h:68
uint16_t flow_mgr_flows_notimeout
Definition: flow-manager.c:641
const char * thread_name_flow_mgr
Definition: runmodes.c:65
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:69
uint32_t emergency_recovery
Definition: flow.h:273
const char * thread_name_flow_rec
Definition: runmodes.c:66
FlowQueue flow_spare_q
Definition: flow-private.h:91
void FlowDisableFlowRecyclerThread(void)
Used to disable flow recycler thread(s).
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1973
void FlowInitConfig(char quiet)
initialize the configuration
Definition: flow.c:515
uint32_t hash_size
Definition: flow.h:264