suricata
flow-manager.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2013 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22  * \author Victor Julien <victor@inliniac.net>
23  */
24 
25 #include "suricata-common.h"
26 #include "suricata.h"
27 #include "decode.h"
28 #include "conf.h"
29 #include "threadvars.h"
30 #include "tm-threads.h"
31 #include "runmodes.h"
32 
33 #include "util-random.h"
34 #include "util-time.h"
35 
36 #include "flow.h"
37 #include "flow-queue.h"
38 #include "flow-hash.h"
39 #include "flow-util.h"
40 #include "flow-var.h"
41 #include "flow-private.h"
42 #include "flow-timeout.h"
43 #include "flow-manager.h"
44 #include "flow-storage.h"
45 
46 #include "stream-tcp-private.h"
47 #include "stream-tcp-reassemble.h"
48 #include "stream-tcp.h"
49 
50 #include "util-unittest.h"
51 #include "util-unittest-helper.h"
52 #include "util-byte.h"
53 
54 #include "util-debug.h"
55 #include "util-privs.h"
56 #include "util-signal.h"
57 
58 #include "threads.h"
59 #include "detect.h"
60 #include "detect-engine-state.h"
61 #include "stream.h"
62 
63 #include "app-layer-parser.h"
64 
65 #include "host-timeout.h"
66 #include "defrag-timeout.h"
67 #include "ippair-timeout.h"
68 
69 #include "output-flow.h"
70 
71 /* Run mode selected at suricata.c */
72 extern int run_mode;
73 
74 /* multi flow mananger support */
75 static uint32_t flowmgr_number = 1;
76 /* atomic counter for flow managers, to assign instance id */
77 SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
78 
79 /* multi flow recycler support */
80 static uint32_t flowrec_number = 1;
81 /* atomic counter for flow recyclers, to assign instance id */
82 SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
83 
84 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
85 
86 
89 
90 void FlowTimeoutsInit(void)
91 {
92  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
93 }
94 
96 {
97  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
98 }
99 
100 /* 1 seconds */
101 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
102 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
103 /* 0.1 seconds */
104 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
105 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 100000
106 #define NEW_FLOW_COUNT_COND 10
107 
108 typedef struct FlowTimeoutCounters_ {
109  uint32_t new;
110  uint32_t est;
111  uint32_t clo;
112  uint32_t byp;
113  uint32_t tcp_reuse;
114 
115  uint32_t flows_checked;
116  uint32_t flows_notimeout;
117  uint32_t flows_timeout;
119  uint32_t flows_removed;
120 
121  uint32_t rows_checked;
122  uint32_t rows_skipped;
123  uint32_t rows_empty;
124  uint32_t rows_busy;
125  uint32_t rows_maxlen;
126 
127  uint32_t bypassed_count;
128  uint64_t bypassed_pkts;
129  uint64_t bypassed_bytes;
131 
132 /**
133  * \brief Used to disable flow manager thread(s).
134  *
135  * \todo Kinda hackish since it uses the tv name to identify flow manager
136  * thread. We need an all weather identification scheme.
137  */
139 {
140 #ifdef AFLFUZZ_DISABLE_MGTTHREADS
141  return;
142 #endif
143  ThreadVars *tv = NULL;
144 
145  /* wake up threads */
146  uint32_t u;
147  for (u = 0; u < flowmgr_number; u++)
149 
151  /* flow manager thread(s) is/are a part of mgmt threads */
152  tv = tv_root[TVT_MGMT];
153  while (tv != NULL)
154  {
155  if (strncasecmp(tv->name, thread_name_flow_mgr,
156  strlen(thread_name_flow_mgr)) == 0)
157  {
159  }
160  tv = tv->next;
161  }
163 
164  struct timeval start_ts;
165  struct timeval cur_ts;
166  gettimeofday(&start_ts, NULL);
167 
168 again:
169  gettimeofday(&cur_ts, NULL);
170  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
171  FatalError(SC_ERR_SHUTDOWN, "unable to get all flow manager "
172  "threads to shutdown in time");
173  }
174 
176  tv = tv_root[TVT_MGMT];
177  while (tv != NULL)
178  {
179  if (strncasecmp(tv->name, thread_name_flow_mgr,
180  strlen(thread_name_flow_mgr)) == 0)
181  {
184  /* sleep outside lock */
185  SleepMsec(1);
186  goto again;
187  }
188  }
189  tv = tv->next;
190  }
192 
193  /* wake up threads, another try */
194  for (u = 0; u < flowmgr_number; u++)
196 
197  /* reset count, so we can kill and respawn (unix socket) */
198  SC_ATOMIC_SET(flowmgr_cnt, 0);
199  return;
200 }
201 
202 /** \internal
203  * \brief get timeout for flow
204  *
205  * \param f flow
206  * \param state flow state
207  *
208  * \retval timeout timeout in seconds
209  */
210 static inline uint32_t FlowGetFlowTimeout(const Flow *f, enum FlowState state)
211 {
212  uint32_t timeout;
213  FlowProtoTimeoutPtr flow_timeouts = SC_ATOMIC_GET(flow_timeouts);
214  switch(state) {
215  default:
216  case FLOW_STATE_NEW:
217  timeout = flow_timeouts[f->protomap].new_timeout;
218  break;
220  timeout = flow_timeouts[f->protomap].est_timeout;
221  break;
222  case FLOW_STATE_CLOSED:
223  timeout = flow_timeouts[f->protomap].closed_timeout;
224  break;
226  timeout = FLOW_BYPASSED_TIMEOUT;
227  break;
229  timeout = flow_timeouts[f->protomap].bypassed_timeout;
230  break;
231  }
232  return timeout;
233 }
234 
235 /** \internal
236  * \brief check if a flow is timed out
237  *
238  * \param f flow
239  * \param ts timestamp
240  *
241  * \retval 0 not timed out
242  * \retval 1 timed out
243  */
244 static int FlowManagerFlowTimeout(Flow *f, enum FlowState state, struct timeval *ts, int32_t *next_ts)
245 {
246  /* set the timeout value according to the flow operating mode,
247  * flow's state and protocol.*/
248  uint32_t timeout = FlowGetFlowTimeout(f, state);
249 
250  int32_t flow_times_out_at = (int32_t)(f->lastts.tv_sec + timeout);
251  if (*next_ts == 0 || flow_times_out_at < *next_ts)
252  *next_ts = flow_times_out_at;
253 
254  /* do the timeout check */
255  if (flow_times_out_at >= ts->tv_sec) {
256  return 0;
257  }
258 
259  return 1;
260 }
261 
262 static inline int FlowBypassedTimeout(Flow *f, struct timeval *ts,
263  FlowTimeoutCounters *counters)
264 {
265  if (SC_ATOMIC_GET(f->flow_state) != FLOW_STATE_CAPTURE_BYPASSED) {
266  return 1;
267  }
268 
270  if (fc && fc->BypassUpdate) {
271  /* flow will be possibly updated */
272  uint64_t pkts_tosrc = fc->tosrcpktcnt;
273  uint64_t bytes_tosrc = fc->tosrcbytecnt;
274  uint64_t pkts_todst = fc->todstpktcnt;
275  uint64_t bytes_todst = fc->todstbytecnt;
276  bool update = fc->BypassUpdate(f, fc->bypass_data, ts->tv_sec);
277  if (update) {
278  SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f));
279  pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
280  bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
281  pkts_todst = fc->todstpktcnt - pkts_todst;
282  bytes_todst = fc->todstbytecnt - bytes_todst;
283  if (f->livedev) {
284  SC_ATOMIC_ADD(f->livedev->bypassed,
285  pkts_tosrc + pkts_todst);
286  }
287  counters->bypassed_pkts += pkts_tosrc + pkts_todst;
288  counters->bypassed_bytes += bytes_tosrc + bytes_todst;
289  return 0;
290  } else {
291  SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
292  if (f->livedev) {
293  if (FLOW_IS_IPV4(f)) {
294  LiveDevSubBypassStats(f->livedev, 1, AF_INET);
295  } else if (FLOW_IS_IPV6(f)) {
296  LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
297  }
298  }
299  counters->bypassed_count++;
300  return 1;
301  }
302  }
303  return 1;
304 }
305 
306 /** \internal
307  * \brief See if we can really discard this flow. Check use_cnt reference
308  * counter and force reassembly if necessary.
309  *
310  * \param f flow
311  * \param ts timestamp
312  *
313  * \retval 0 not timed out just yet
314  * \retval 1 fully timed out, lets kill it
315  */
316 static inline int FlowManagerFlowTimedOut(Flow *f, struct timeval *ts,
317  FlowTimeoutCounters *counters)
318 {
319  /* never prune a flow that is used by a packet we
320  * are currently processing in one of the threads */
321  if (SC_ATOMIC_GET(f->use_cnt) > 0) {
322  return 0;
323  }
324 
325  if (!FlowBypassedTimeout(f, ts, counters)) {
326  return 0;
327  }
328 
329  int server = 0, client = 0;
330 
331  if (!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
332  SC_ATOMIC_GET(f->flow_state) != FLOW_STATE_CAPTURE_BYPASSED &&
333  SC_ATOMIC_GET(f->flow_state) != FLOW_STATE_LOCAL_BYPASSED &&
334  FlowForceReassemblyNeedReassembly(f, &server, &client) == 1) {
335  FlowForceReassemblyForFlow(f, server, client);
336  return 0;
337  }
338 #ifdef DEBUG
339  /* this should not be possible */
340  BUG_ON(SC_ATOMIC_GET(f->use_cnt) > 0);
341 #endif
342 
343  return 1;
344 }
345 
346 /**
347  * \internal
348  *
349  * \brief check all flows in a hash row for timing out
350  *
351  * \param f last flow in the hash row
352  * \param ts timestamp
353  * \param emergency bool indicating emergency mode
354  * \param counters ptr to FlowTimeoutCounters structure
355  *
356  * \retval cnt timed out flows
357  */
358 static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
359  int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
360 {
361  uint32_t cnt = 0;
362  uint32_t checked = 0;
363 
364  do {
365  checked++;
366 
367  /* check flow timeout based on lastts and state. Both can be
368  * accessed w/o Flow lock as we do have the hash row lock (so flow
369  * can't disappear) and flow_state is atomic. lastts can only
370  * be modified when we have both the flow and hash row lock */
371 
372  enum FlowState state = SC_ATOMIC_GET(f->flow_state);
373 
374  /* timeout logic goes here */
375  if (FlowManagerFlowTimeout(f, state, ts, next_ts) == 0) {
376 
377  counters->flows_notimeout++;
378 
379  f = f->hprev;
380  continue;
381  }
382 
383  /* before grabbing the flow lock, make sure we have at least
384  * 3 packets in the pool */
386 
387  FLOWLOCK_WRLOCK(f);
388 
389  Flow *next_flow = f->hprev;
390 
391  counters->flows_timeout++;
392 
393  /* check if the flow is fully timed out and
394  * ready to be discarded. */
395  if (FlowManagerFlowTimedOut(f, ts, counters) == 1) {
396  /* remove from the hash */
397  if (f->hprev != NULL)
398  f->hprev->hnext = f->hnext;
399  if (f->hnext != NULL)
400  f->hnext->hprev = f->hprev;
401  if (f->fb->head == f)
402  f->fb->head = f->hnext;
403  if (f->fb->tail == f)
404  f->fb->tail = f->hprev;
405 
406  f->hnext = NULL;
407  f->hprev = NULL;
408 
409  if (f->flags & FLOW_TCP_REUSED)
410  counters->tcp_reuse++;
411 
412  if (state == FLOW_STATE_NEW)
414  else if (state == FLOW_STATE_ESTABLISHED)
416  else if (state == FLOW_STATE_CLOSED)
418  else if (state == FLOW_STATE_LOCAL_BYPASSED)
420  else if (state == FLOW_STATE_CAPTURE_BYPASSED)
422 
423  if (emergency)
426 
427  /* no one is referring to this flow, use_cnt 0, removed from hash
428  * so we can unlock it and pass it to the flow recycler */
429  FLOWLOCK_UNLOCK(f);
431 
432  cnt++;
433 
434  switch (state) {
435  case FLOW_STATE_NEW:
436  default:
437  counters->new++;
438  break;
440  counters->est++;
441  break;
442  case FLOW_STATE_CLOSED:
443  counters->clo++;
444  break;
447  counters->byp++;
448  break;
449  }
450  counters->flows_removed++;
451  } else {
452  counters->flows_timeout_inuse++;
453  FLOWLOCK_UNLOCK(f);
454  }
455 
456  f = next_flow;
457  } while (f != NULL);
458 
459  counters->flows_checked += checked;
460  if (checked > counters->rows_maxlen)
461  counters->rows_maxlen = checked;
462 
463  return cnt;
464 }
465 
466 /**
467  * \brief time out flows from the hash
468  *
469  * \param ts timestamp
470  * \param try_cnt number of flows to time out max (0 is unlimited)
471  * \param hash_min min hash index to consider
472  * \param hash_max max hash index to consider
473  * \param counters ptr to FlowTimeoutCounters structure
474  *
475  * \retval cnt number of timed out flow
476  */
477 static uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt,
478  uint32_t hash_min, uint32_t hash_max,
479  FlowTimeoutCounters *counters)
480 {
481  uint32_t idx = 0;
482  uint32_t cnt = 0;
483  int emergency = 0;
484 
485  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
486  emergency = 1;
487 
488  for (idx = hash_min; idx < hash_max; idx++) {
489  FlowBucket *fb = &flow_hash[idx];
490 
491  counters->rows_checked++;
492 
493  int32_t check_ts = SC_ATOMIC_GET(fb->next_ts);
494  if (check_ts > (int32_t)ts->tv_sec) {
495  counters->rows_skipped++;
496  continue;
497  }
498 
499  /* before grabbing the row lock, make sure we have at least
500  * 9 packets in the pool */
502 
503  if (FBLOCK_TRYLOCK(fb) != 0) {
504  counters->rows_busy++;
505  continue;
506  }
507 
508  /* flow hash bucket is now locked */
509 
510  if (fb->tail == NULL) {
511  SC_ATOMIC_SET(fb->next_ts, INT_MAX);
512  counters->rows_empty++;
513  goto next;
514  }
515 
516  int32_t next_ts = 0;
517 
518  /* we have a flow, or more than one */
519  cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters, &next_ts);
520 
521  SC_ATOMIC_SET(fb->next_ts, next_ts);
522 
523 next:
524  FBLOCK_UNLOCK(fb);
525 
526  if (try_cnt > 0 && cnt >= try_cnt)
527  break;
528  }
529 
530  return cnt;
531 }
532 
533 /**
534  * \internal
535  *
536  * \brief move all flows out of a hash row
537  *
538  * \param f last flow in the hash row
539  *
540  * \retval cnt removed out flows
541  */
542 static uint32_t FlowManagerHashRowCleanup(Flow *f)
543 {
544  uint32_t cnt = 0;
545 
546  do {
547  FLOWLOCK_WRLOCK(f);
548 
549  Flow *next_flow = f->hprev;
550 
551  int state = SC_ATOMIC_GET(f->flow_state);
552 
553  /* remove from the hash */
554  if (f->hprev != NULL)
555  f->hprev->hnext = f->hnext;
556  if (f->hnext != NULL)
557  f->hnext->hprev = f->hprev;
558  if (f->fb->head == f)
559  f->fb->head = f->hnext;
560  if (f->fb->tail == f)
561  f->fb->tail = f->hprev;
562 
563  f->hnext = NULL;
564  f->hprev = NULL;
565 
566  if (state == FLOW_STATE_NEW)
568  else if (state == FLOW_STATE_ESTABLISHED)
570  else if (state == FLOW_STATE_CLOSED)
572 
574 
575  /* no one is referring to this flow, use_cnt 0, removed from hash
576  * so we can unlock it and move it to the recycle queue. */
577  FLOWLOCK_UNLOCK(f);
578 
580 
581  cnt++;
582 
583  f = next_flow;
584  } while (f != NULL);
585 
586  return cnt;
587 }
588 
589 /**
590  * \brief remove all flows from the hash
591  *
592  * \retval cnt number of removes out flows
593  */
594 static uint32_t FlowCleanupHash(void){
595  uint32_t idx = 0;
596  uint32_t cnt = 0;
597 
598  for (idx = 0; idx < flow_config.hash_size; idx++) {
599  FlowBucket *fb = &flow_hash[idx];
600 
601  FBLOCK_LOCK(fb);
602 
603  if (fb->tail != NULL) {
604  /* we have a flow, or more than one */
605  cnt += FlowManagerHashRowCleanup(fb->tail);
606  }
607 
608  FBLOCK_UNLOCK(fb);
609  }
610 
611  return cnt;
612 }
613 
614 extern int g_detect_disabled;
615 
616 typedef struct FlowManagerThreadData_ {
617  uint32_t instance;
618  uint32_t min;
619  uint32_t max;
620 
625  uint16_t flow_mgr_spare;
628  uint16_t flow_tcp_reuse;
629 
635 
641 
645 
647 
648 static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
649 {
651  if (ftd == NULL)
652  return TM_ECODE_FAILED;
653 
654  ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
655  SCLogDebug("flow manager instance %u", ftd->instance);
656 
657  /* set the min and max value used for hash row walking
658  * each thread has it's own section of the flow hash */
659  uint32_t range = flow_config.hash_size / flowmgr_number;
660  if (ftd->instance == 1)
661  ftd->max = range;
662  else if (ftd->instance == flowmgr_number) {
663  ftd->min = (range * (ftd->instance - 1));
664  ftd->max = flow_config.hash_size;
665  } else {
666  ftd->min = (range * (ftd->instance - 1));
667  ftd->max = (range * ftd->instance);
668  }
670 
671  SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
672 
673  /* pass thread data back to caller */
674  *data = ftd;
675 
676  ftd->flow_mgr_cnt_clo = StatsRegisterCounter("flow_mgr.closed_pruned", t);
677  ftd->flow_mgr_cnt_new = StatsRegisterCounter("flow_mgr.new_pruned", t);
678  ftd->flow_mgr_cnt_est = StatsRegisterCounter("flow_mgr.est_pruned", t);
679  ftd->flow_mgr_cnt_byp = StatsRegisterCounter("flow_mgr.bypassed_pruned", t);
680  ftd->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
681  ftd->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
682  ftd->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
683  ftd->flow_tcp_reuse = StatsRegisterCounter("flow.tcp_reuse", t);
684 
685  ftd->flow_mgr_flows_checked = StatsRegisterCounter("flow_mgr.flows_checked", t);
686  ftd->flow_mgr_flows_notimeout = StatsRegisterCounter("flow_mgr.flows_notimeout", t);
687  ftd->flow_mgr_flows_timeout = StatsRegisterCounter("flow_mgr.flows_timeout", t);
688  ftd->flow_mgr_flows_timeout_inuse = StatsRegisterCounter("flow_mgr.flows_timeout_inuse", t);
689  ftd->flow_mgr_flows_removed = StatsRegisterCounter("flow_mgr.flows_removed", t);
690 
691  ftd->flow_mgr_rows_checked = StatsRegisterCounter("flow_mgr.rows_checked", t);
692  ftd->flow_mgr_rows_skipped = StatsRegisterCounter("flow_mgr.rows_skipped", t);
693  ftd->flow_mgr_rows_empty = StatsRegisterCounter("flow_mgr.rows_empty", t);
694  ftd->flow_mgr_rows_busy = StatsRegisterCounter("flow_mgr.rows_busy", t);
695  ftd->flow_mgr_rows_maxlen = StatsRegisterCounter("flow_mgr.rows_maxlen", t);
696 
697  ftd->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
698  ftd->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
699  ftd->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
700 
701  PacketPoolInit();
702  return TM_ECODE_OK;
703 }
704 
705 static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
706 {
708  SCFree(data);
709  return TM_ECODE_OK;
710 }
711 
712 
713 /** \brief Thread that manages the flow table and times out flows.
714  *
715  * \param td ThreadVars casted to void ptr
716  *
717  * Keeps an eye on the spare list, alloc flows if needed...
718  */
719 static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
720 {
721  FlowManagerThreadData *ftd = thread_data;
722  struct timeval ts;
723  uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
724  int emerg = FALSE;
725  int prev_emerg = FALSE;
726  struct timespec cond_time;
727  int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
728  int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
729 /* VJ leaving disabled for now, as hosts are only used by tags and the numbers
730  * are really low. Might confuse ppl
731  uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v);
732  uint16_t flow_mgr_host_active = StatsRegisterCounter("hosts.active", th_v);
733  uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v);
734 */
735  memset(&ts, 0, sizeof(ts));
736 
737  while (1)
738  {
739  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
743  }
744 
745  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
746  emerg = TRUE;
747 
748  if (emerg == TRUE && prev_emerg == FALSE) {
749  prev_emerg = TRUE;
750 
751  SCLogDebug("Flow emergency mode entered...");
752 
753  StatsIncr(th_v, ftd->flow_emerg_mode_enter);
754  }
755  }
756 
757  /* Get the time */
758  memset(&ts, 0, sizeof(ts));
759  TimeGet(&ts);
760  SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
761 
762  /* see if we still have enough spare flows */
763  if (ftd->instance == 1)
765 
766  /* try to time out flows */
767  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0};
768  FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters);
769 
770 
771  if (ftd->instance == 1) {
772  DefragTimeoutHash(&ts);
773  //uint32_t hosts_pruned =
774  HostTimeoutHash(&ts);
775  IPPairTimeoutHash(&ts);
776  }
777 /*
778  StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned);
779  uint32_t hosts_active = HostGetActiveCount();
780  StatsSetUI64(th_v, flow_mgr_host_active, (uint64_t)hosts_active);
781  uint32_t hosts_spare = HostGetSpareCount();
782  StatsSetUI64(th_v, flow_mgr_host_spare, (uint64_t)hosts_spare);
783 */
784  StatsAddUI64(th_v, ftd->flow_mgr_cnt_clo, (uint64_t)counters.clo);
785  StatsAddUI64(th_v, ftd->flow_mgr_cnt_new, (uint64_t)counters.new);
786  StatsAddUI64(th_v, ftd->flow_mgr_cnt_est, (uint64_t)counters.est);
787  StatsAddUI64(th_v, ftd->flow_mgr_cnt_byp, (uint64_t)counters.byp);
788  StatsAddUI64(th_v, ftd->flow_tcp_reuse, (uint64_t)counters.tcp_reuse);
789 
790  StatsSetUI64(th_v, ftd->flow_mgr_flows_checked, (uint64_t)counters.flows_checked);
791  StatsSetUI64(th_v, ftd->flow_mgr_flows_notimeout, (uint64_t)counters.flows_notimeout);
792  StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout, (uint64_t)counters.flows_timeout);
793  StatsSetUI64(th_v, ftd->flow_mgr_flows_removed, (uint64_t)counters.flows_removed);
794  StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout_inuse, (uint64_t)counters.flows_timeout_inuse);
795 
796  StatsSetUI64(th_v, ftd->flow_mgr_rows_checked, (uint64_t)counters.rows_checked);
797  StatsSetUI64(th_v, ftd->flow_mgr_rows_skipped, (uint64_t)counters.rows_skipped);
798  StatsSetUI64(th_v, ftd->flow_mgr_rows_maxlen, (uint64_t)counters.rows_maxlen);
799  StatsSetUI64(th_v, ftd->flow_mgr_rows_busy, (uint64_t)counters.rows_busy);
800  StatsSetUI64(th_v, ftd->flow_mgr_rows_empty, (uint64_t)counters.rows_empty);
801 
802  StatsAddUI64(th_v, ftd->flow_bypassed_cnt_clo, (uint64_t)counters.bypassed_count);
803  StatsAddUI64(th_v, ftd->flow_bypassed_pkts, (uint64_t)counters.bypassed_pkts);
804  StatsAddUI64(th_v, ftd->flow_bypassed_bytes, (uint64_t)counters.bypassed_bytes);
805 
806  uint32_t len = 0;
808  len = flow_spare_q.len;
810  StatsSetUI64(th_v, ftd->flow_mgr_spare, (uint64_t)len);
811 
812  /* Don't fear, FlowManagerThread is here...
813  * clear emergency bit if we have at least xx flows pruned. */
814  if (emerg == TRUE) {
815  SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
816  "flow_spare_q status: %"PRIu32"%% flows at the queue",
817  len, flow_config.prealloc, len * 100 / flow_config.prealloc);
818  /* only if we have pruned this "emergency_recovery" percentage
819  * of flows, we will unset the emergency bit */
821  SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
822 
824 
825  emerg = FALSE;
826  prev_emerg = FALSE;
827 
828  flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
829  flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
830  SCLogInfo("Flow emergency mode over, back to normal... unsetting"
831  " FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", "
832  "ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32
833  "%% flows at the queue", (uintmax_t)ts.tv_sec,
834  (uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
835 
836  StatsIncr(th_v, ftd->flow_emerg_mode_over);
837  } else {
838  flow_update_delay_sec = FLOW_EMERG_MODE_UPDATE_DELAY_SEC;
839  flow_update_delay_nsec = FLOW_EMERG_MODE_UPDATE_DELAY_NSEC;
840  }
841  }
842 
843  if (TmThreadsCheckFlag(th_v, THV_KILL)) {
844  StatsSyncCounters(th_v);
845  break;
846  }
847 
848  cond_time.tv_sec = time(NULL) + flow_update_delay_sec;
849  cond_time.tv_nsec = flow_update_delay_nsec;
852  &cond_time);
854 
855  SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
856 
858  }
859 
860  SCLogPerf("%" PRIu32 " new flows, %" PRIu32 " established flows were "
861  "timed out, %"PRIu32" flows in closed state", new_cnt,
862  established_cnt, closing_cnt);
863 
864  return TM_ECODE_OK;
865 }
866 
867 /** \brief spawn the flow manager thread */
869 {
870 #ifdef AFLFUZZ_DISABLE_MGTTHREADS
871  return;
872 #endif
873  intmax_t setting = 1;
874  (void)ConfGetInt("flow.managers", &setting);
875 
876  if (setting < 1 || setting > 1024) {
878  "invalid flow.managers setting %"PRIdMAX, setting);
879  exit(EXIT_FAILURE);
880  }
881  flowmgr_number = (uint32_t)setting;
882 
883  SCLogConfig("using %u flow manager threads", flowmgr_number);
886 
888 
889  uint32_t u;
890  for (u = 0; u < flowmgr_number; u++)
891  {
892  ThreadVars *tv_flowmgr = NULL;
893 
894  char name[TM_THREAD_NAME_MAX];
895  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
896 
897  tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
898  "FlowManager", 0);
899  BUG_ON(tv_flowmgr == NULL);
900 
901  if (tv_flowmgr == NULL) {
902  printf("ERROR: TmThreadsCreate failed\n");
903  exit(1);
904  }
905  if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
906  printf("ERROR: TmThreadSpawn failed\n");
907  exit(1);
908  }
909  }
910  return;
911 }
912 
913 typedef struct FlowRecyclerThreadData_ {
916 
917 static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
918 {
920  if (ftd == NULL)
921  return TM_ECODE_FAILED;
922 
923  if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
924  SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
925  SCFree(ftd);
926  return TM_ECODE_FAILED;
927  }
928  SCLogDebug("output_thread_data %p", ftd->output_thread_data);
929 
930  *data = ftd;
931  return TM_ECODE_OK;
932 }
933 
934 static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
935 {
937  if (ftd->output_thread_data != NULL)
939 
940  SCFree(data);
941  return TM_ECODE_OK;
942 }
943 
944 /** \brief Thread that manages timed out flows.
945  *
946  * \param td ThreadVars casted to void ptr
947  */
948 static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
949 {
950  struct timeval ts;
951  struct timespec cond_time;
952  int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
953  int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
954  uint64_t recycled_cnt = 0;
955  FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
956  BUG_ON(ftd == NULL);
957 
958  memset(&ts, 0, sizeof(ts));
959 
960  while (1)
961  {
962  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
966  }
967 
968  /* Get the time */
969  memset(&ts, 0, sizeof(ts));
970  TimeGet(&ts);
971  SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
972 
973  uint32_t len = 0;
975  len = flow_recycle_q.len;
977 
978  /* Loop through the queue and clean up all flows in it */
979  if (len) {
980  Flow *f;
981 
982  while ((f = FlowDequeue(&flow_recycle_q)) != NULL) {
983  FLOWLOCK_WRLOCK(f);
984 
985  (void)OutputFlowLog(th_v, ftd->output_thread_data, f);
986 
987  FlowClearMemory (f, f->protomap);
988  FLOWLOCK_UNLOCK(f);
989  FlowMoveToSpare(f);
990  recycled_cnt++;
991  }
992  }
993 
994  SCLogDebug("%u flows to recycle", len);
995 
996  if (TmThreadsCheckFlag(th_v, THV_KILL)) {
997  StatsSyncCounters(th_v);
998  break;
999  }
1000 
1001  cond_time.tv_sec = time(NULL) + flow_update_delay_sec;
1002  cond_time.tv_nsec = flow_update_delay_nsec;
1005  &flow_recycler_ctrl_mutex, &cond_time);
1007 
1008  SCLogDebug("woke up...");
1009 
1011  }
1012 
1013  SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
1014 
1015  return TM_ECODE_OK;
1016 }
1017 
1018 static int FlowRecyclerReadyToShutdown(void)
1019 {
1020  uint32_t len = 0;
1022  len = flow_recycle_q.len;
1024 
1025  return ((len == 0));
1026 }
1027 
1028 /** \brief spawn the flow recycler thread */
1030 {
1031 #ifdef AFLFUZZ_DISABLE_MGTTHREADS
1032  return;
1033 #endif
1034  intmax_t setting = 1;
1035  (void)ConfGetInt("flow.recyclers", &setting);
1036 
1037  if (setting < 1 || setting > 1024) {
1039  "invalid flow.recyclers setting %"PRIdMAX, setting);
1040  exit(EXIT_FAILURE);
1041  }
1042  flowrec_number = (uint32_t)setting;
1043 
1044  SCLogConfig("using %u flow recycler threads", flowrec_number);
1045 
1048 
1049 
1050  uint32_t u;
1051  for (u = 0; u < flowrec_number; u++)
1052  {
1053  ThreadVars *tv_flowmgr = NULL;
1054 
1055  char name[TM_THREAD_NAME_MAX];
1056  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
1057 
1058  tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
1059  "FlowRecycler", 0);
1060  BUG_ON(tv_flowmgr == NULL);
1061 
1062  if (tv_flowmgr == NULL) {
1063  printf("ERROR: TmThreadsCreate failed\n");
1064  exit(1);
1065  }
1066  if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
1067  printf("ERROR: TmThreadSpawn failed\n");
1068  exit(1);
1069  }
1070  }
1071  return;
1072 }
1073 
1074 /**
1075  * \brief Used to disable flow recycler thread(s).
1076  *
1077  * \note this should only be called when the flow manager is already gone
1078  *
1079  * \todo Kinda hackish since it uses the tv name to identify flow recycler
1080  * thread. We need an all weather identification scheme.
1081  */
1083 {
1084 #ifdef AFLFUZZ_DISABLE_MGTTHREADS
1085  return;
1086 #endif
1087  ThreadVars *tv = NULL;
1088  int cnt = 0;
1089 
1090  /* move all flows still in the hash to the recycler queue */
1091  FlowCleanupHash();
1092 
1093  /* make sure all flows are processed */
1094  do {
1096  usleep(10);
1097  } while (FlowRecyclerReadyToShutdown() == 0);
1098 
1099  /* wake up threads */
1100  uint32_t u;
1101  for (u = 0; u < flowrec_number; u++)
1103 
1105  /* flow recycler thread(s) is/are a part of mgmt threads */
1106  tv = tv_root[TVT_MGMT];
1107  while (tv != NULL)
1108  {
1109  if (strncasecmp(tv->name, thread_name_flow_rec,
1110  strlen(thread_name_flow_rec)) == 0)
1111  {
1113  cnt++;
1114  }
1115  tv = tv->next;
1116  }
1118 
1119  struct timeval start_ts;
1120  struct timeval cur_ts;
1121  gettimeofday(&start_ts, NULL);
1122 
1123 again:
1124  gettimeofday(&cur_ts, NULL);
1125  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1126  FatalError(SC_ERR_SHUTDOWN, "unable to get all flow recycler "
1127  "threads to shutdown in time");
1128  }
1129 
1131  tv = tv_root[TVT_MGMT];
1132  while (tv != NULL)
1133  {
1134  if (strncasecmp(tv->name, thread_name_flow_rec,
1135  strlen(thread_name_flow_rec)) == 0)
1136  {
1139  /* sleep outside lock */
1140  SleepMsec(1);
1141  goto again;
1142  }
1143  }
1144  tv = tv->next;
1145  }
1147 
1148  /* wake up threads, another try */
1149  for (u = 0; u < flowrec_number; u++)
1151 
1152  /* reset count, so we can kill and respawn (unix socket) */
1153  SC_ATOMIC_SET(flowrec_cnt, 0);
1154  return;
1155 }
1156 
1158 {
1159  tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1160  tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1161  tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1162 // tmm_modules[TMM_FLOWMANAGER].RegisterTests = FlowManagerRegisterTests;
1163  tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1166  SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1167 
1168  SC_ATOMIC_INIT(flowmgr_cnt);
1169  SC_ATOMIC_INIT(flow_timeouts);
1170 }
1171 
1173 {
1174  tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1175  tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1176  tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1177 // tmm_modules[TMM_FLOWRECYCLER].RegisterTests = FlowRecyclerRegisterTests;
1178  tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1181  SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1182 
1183  SC_ATOMIC_INIT(flowrec_cnt);
1184 }
1185 
1186 #ifdef UNITTESTS
1187 
1188 /**
1189  * \test Test the timing out of a flow with a fresh TcpSession
1190  * (just initialized, no data segments) in normal mode.
1191  *
1192  * \retval On success it returns 1 and on failure 0.
1193  */
1194 
1195 static int FlowMgrTest01 (void)
1196 {
1197  TcpSession ssn;
1198  Flow f;
1199  FlowBucket fb;
1200  struct timeval ts;
1201 
1203 
1204  memset(&ssn, 0, sizeof(TcpSession));
1205  memset(&f, 0, sizeof(Flow));
1206  memset(&ts, 0, sizeof(ts));
1207  memset(&fb, 0, sizeof(FlowBucket));
1208 
1209  FBLOCK_INIT(&fb);
1210 
1211  FLOW_INITIALIZE(&f);
1213 
1214  TimeGet(&ts);
1215  f.lastts.tv_sec = ts.tv_sec - 5000;
1216  f.protoctx = &ssn;
1217  f.fb = &fb;
1218 
1219  f.proto = IPPROTO_TCP;
1220 
1221  int32_t next_ts = 0;
1222  int state = SC_ATOMIC_GET(f.flow_state);
1223  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0};
1224  if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts, &counters) != 1) {
1225  FBLOCK_DESTROY(&fb);
1226  FLOW_DESTROY(&f);
1228  return 0;
1229  }
1230 
1231  FBLOCK_DESTROY(&fb);
1232  FLOW_DESTROY(&f);
1233 
1235  return 1;
1236 }
1237 
1238 /**
1239  * \test Test the timing out of a flow with a TcpSession
1240  * (with data segments) in normal mode.
1241  *
1242  * \retval On success it returns 1 and on failure 0.
1243  */
1244 
1245 static int FlowMgrTest02 (void)
1246 {
1247  TcpSession ssn;
1248  Flow f;
1249  FlowBucket fb;
1250  struct timeval ts;
1251  TcpSegment seg;
1252  TcpStream client;
1253 
1255 
1256  memset(&ssn, 0, sizeof(TcpSession));
1257  memset(&f, 0, sizeof(Flow));
1258  memset(&fb, 0, sizeof(FlowBucket));
1259  memset(&ts, 0, sizeof(ts));
1260  memset(&seg, 0, sizeof(TcpSegment));
1261  memset(&client, 0, sizeof(TcpStream));
1262 
1263  FBLOCK_INIT(&fb);
1264  FLOW_INITIALIZE(&f);
1266 
1267  TimeGet(&ts);
1268  TCP_SEG_LEN(&seg) = 3;
1269  TCPSEG_RB_INSERT(&client.seg_tree, &seg);
1270  ssn.client = client;
1271  ssn.server = client;
1272  ssn.state = TCP_ESTABLISHED;
1273  f.lastts.tv_sec = ts.tv_sec - 5000;
1274  f.protoctx = &ssn;
1275  f.fb = &fb;
1276  f.proto = IPPROTO_TCP;
1277 
1278  int32_t next_ts = 0;
1279  int state = SC_ATOMIC_GET(f.flow_state);
1280  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0};
1281  if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts, &counters) != 1) {
1282  FBLOCK_DESTROY(&fb);
1283  FLOW_DESTROY(&f);
1285  return 0;
1286  }
1287  FBLOCK_DESTROY(&fb);
1288  FLOW_DESTROY(&f);
1290  return 1;
1291 
1292 }
1293 
1294 /**
1295  * \test Test the timing out of a flow with a fresh TcpSession
1296  * (just initialized, no data segments) in emergency mode.
1297  *
1298  * \retval On success it returns 1 and on failure 0.
1299  */
1300 
1301 static int FlowMgrTest03 (void)
1302 {
1303  TcpSession ssn;
1304  Flow f;
1305  FlowBucket fb;
1306  struct timeval ts;
1307 
1309 
1310  memset(&ssn, 0, sizeof(TcpSession));
1311  memset(&f, 0, sizeof(Flow));
1312  memset(&ts, 0, sizeof(ts));
1313  memset(&fb, 0, sizeof(FlowBucket));
1314 
1315  FBLOCK_INIT(&fb);
1316  FLOW_INITIALIZE(&f);
1318 
1319  TimeGet(&ts);
1320  ssn.state = TCP_SYN_SENT;
1321  f.lastts.tv_sec = ts.tv_sec - 300;
1322  f.protoctx = &ssn;
1323  f.fb = &fb;
1324  f.proto = IPPROTO_TCP;
1325  f.flags |= FLOW_EMERGENCY;
1326 
1327  int next_ts = 0;
1328  int state = SC_ATOMIC_GET(f.flow_state);
1329  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0};
1330  if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts, &counters) != 1) {
1331  FBLOCK_DESTROY(&fb);
1332  FLOW_DESTROY(&f);
1334  return 0;
1335  }
1336 
1337  FBLOCK_DESTROY(&fb);
1338  FLOW_DESTROY(&f);
1340  return 1;
1341 }
1342 
1343 /**
1344  * \test Test the timing out of a flow with a TcpSession
1345  * (with data segments) in emergency mode.
1346  *
1347  * \retval On success it returns 1 and on failure 0.
1348  */
1349 
1350 static int FlowMgrTest04 (void)
1351 {
1352 
1353  TcpSession ssn;
1354  Flow f;
1355  FlowBucket fb;
1356  struct timeval ts;
1357  TcpSegment seg;
1358  TcpStream client;
1359 
1361 
1362  memset(&ssn, 0, sizeof(TcpSession));
1363  memset(&f, 0, sizeof(Flow));
1364  memset(&fb, 0, sizeof(FlowBucket));
1365  memset(&ts, 0, sizeof(ts));
1366  memset(&seg, 0, sizeof(TcpSegment));
1367  memset(&client, 0, sizeof(TcpStream));
1368 
1369  FBLOCK_INIT(&fb);
1370  FLOW_INITIALIZE(&f);
1372 
1373  TimeGet(&ts);
1374  TCP_SEG_LEN(&seg) = 3;
1375  TCPSEG_RB_INSERT(&client.seg_tree, &seg);
1376  ssn.client = client;
1377  ssn.server = client;
1378  ssn.state = TCP_ESTABLISHED;
1379  f.lastts.tv_sec = ts.tv_sec - 5000;
1380  f.protoctx = &ssn;
1381  f.fb = &fb;
1382  f.proto = IPPROTO_TCP;
1383  f.flags |= FLOW_EMERGENCY;
1384 
1385  int next_ts = 0;
1386  int state = SC_ATOMIC_GET(f.flow_state);
1387  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0};
1388  if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts, &counters) != 1) {
1389  FBLOCK_DESTROY(&fb);
1390  FLOW_DESTROY(&f);
1392  return 0;
1393  }
1394 
1395  FBLOCK_DESTROY(&fb);
1396  FLOW_DESTROY(&f);
1398  return 1;
1399 }
1400 
1401 /**
1402  * \test Test flow allocations when it reach memcap
1403  *
1404  *
1405  * \retval On success it returns 1 and on failure 0.
1406  */
1407 
1408 static int FlowMgrTest05 (void)
1409 {
1410  int result = 0;
1411 
1413  FlowConfig backup;
1414  memcpy(&backup, &flow_config, sizeof(FlowConfig));
1415 
1416  uint32_t ini = 0;
1417  uint32_t end = flow_spare_q.len;
1418  SC_ATOMIC_SET(flow_config.memcap, 10000);
1419  flow_config.prealloc = 100;
1420 
1421  /* Let's get the flow_spare_q empty */
1422  UTHBuildPacketOfFlows(ini, end, 0);
1423 
1424  /* And now let's try to reach the memcap val */
1425  while (FLOW_CHECK_MEMCAP(sizeof(Flow))) {
1426  ini = end + 1;
1427  end = end + 2;
1428  UTHBuildPacketOfFlows(ini, end, 0);
1429  }
1430 
1431  /* should time out normal */
1432  TimeSetIncrementTime(2000);
1433  ini = end + 1;
1434  end = end + 2;;
1435  UTHBuildPacketOfFlows(ini, end, 0);
1436 
1437  struct timeval ts;
1438  TimeGet(&ts);
1439  /* try to time out flows */
1440  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0};
1441  FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters);
1442 
1443  if (flow_recycle_q.len > 0) {
1444  result = 1;
1445  }
1446 
1447  memcpy(&flow_config, &backup, sizeof(FlowConfig));
1448  FlowShutdown();
1449  return result;
1450 }
1451 #endif /* UNITTESTS */
1452 
1453 /**
1454  * \brief Function to register the Flow Unitests.
1455  */
1457 {
1458 #ifdef UNITTESTS
1459  UtRegisterTest("FlowMgrTest01 -- Timeout a flow having fresh TcpSession",
1460  FlowMgrTest01);
1461  UtRegisterTest("FlowMgrTest02 -- Timeout a flow having TcpSession with segments",
1462  FlowMgrTest02);
1463  UtRegisterTest("FlowMgrTest03 -- Timeout a flow in emergency having fresh TcpSession",
1464  FlowMgrTest03);
1465  UtRegisterTest("FlowMgrTest04 -- Timeout a flow in emergency having TcpSession with segments",
1466  FlowMgrTest04);
1467  UtRegisterTest("FlowMgrTest05 -- Test flow Allocations when it reach memcap",
1468  FlowMgrTest05);
1469 #endif /* UNITTESTS */
1470 }
FlowQueue flow_recycle_q
Definition: flow-private.h:94
#define FLOW_TCP_REUSED
Definition: flow.h:51
#define FLOW_IS_IPV4(f)
Definition: flow.h:134
uint16_t flow_mgr_flows_timeout
Definition: flow-manager.c:632
#define FLOW_CHECK_MEMCAP(size)
check if a memory alloc would fit in the memcap
Definition: flow-util.h:135
#define SCLogDebug(...)
Definition: util-debug.h:335
#define FBLOCK_UNLOCK(fb)
Definition: flow-hash.h:70
uint64_t tosrcbytecnt
Definition: flow.h:490
#define FLOW_TIMEOUT_REASSEMBLY_DONE
Definition: flow.h:91
uint16_t flow_mgr_flows_timeout_inuse
Definition: flow-manager.c:633
uint8_t cap_flags
Definition: tm-modules.h:67
uint32_t bypassed_timeout
Definition: flow.h:478
#define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC
Definition: flow-manager.c:101
struct HtpBodyChunk_ * next
struct Flow_ * hnext
Definition: flow.h:451
void FlowDisableFlowManagerThread(void)
Used to disable flow manager thread(s).
Definition: flow-manager.c:138
#define BUG_ON(x)
uint32_t new_timeout
Definition: flow.h:475
uint8_t flags
Definition: tm-modules.h:70
uint8_t proto
Definition: flow.h:344
#define THV_RUNNING_DONE
Definition: threadvars.h:45
#define FALSE
void LiveDevSubBypassStats(LiveDevice *dev, uint64_t cnt, int family)
Definition: util-device.c:522
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:67
#define FLOWLOCK_UNLOCK(fb)
Definition: flow.h:243
#define FLOW_END_FLAG_STATE_ESTABLISHED
Definition: flow.h:210
FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX]
Definition: flow-private.h:86
void PacketPoolWaitForN(int n)
Wait until we have the requested amount of packets in the pool.
#define FLOW_END_FLAG_TIMEOUT
Definition: flow.h:213
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
Definition: flow-private.h:87
FlowProtoTimeout * FlowProtoTimeoutPtr
Definition: flow-manager.c:87
int FlowForceReassemblyNeedReassembly(Flow *f, int *server, int *client)
Check if a flow needs forced reassembly, or any other processing.
Definition: flow-timeout.c:285
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:72
uint16_t flow_emerg_mode_enter
Definition: flow-manager.c:626
#define FlowTimeoutsReset()
Definition: flow-manager.h:27
#define SCCtrlCondInit
uint32_t prealloc
Definition: flow.h:266
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:107
#define FLOW_EMERG_MODE_UPDATE_DELAY_SEC
Definition: flow-manager.c:104
#define FLOW_QUIET
Definition: flow.h:38
#define FLOW_EMERGENCY
Definition: flow-private.h:37
#define FBLOCK_INIT(fb)
Definition: flow-hash.h:66
#define THV_PAUSED
Definition: threadvars.h:38
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value from our atomic variable.
Definition: util-atomic.h:141
#define FLOWLOCK_WRLOCK(fb)
Definition: flow.h:240
SCCtrlMutex flow_recycler_ctrl_mutex
Definition: flow-manager.h:42
void FlowMoveToSpare(Flow *f)
Transfer a flow from a queue to the spare queue.
Definition: flow-queue.c:146
#define FLOW_END_FLAG_STATE_BYPASSED
Definition: flow.h:216
void FlowMgrRegisterTests(void)
Function to register the Flow Unitests.
SCCtrlCondT flow_manager_ctrl_cond
Definition: flow-manager.h:32
#define TRUE
Flow * head
Definition: flow-hash.h:42
#define SCMutexLock(mut)
uint64_t todstpktcnt
Definition: flow.h:491
void * protoctx
Definition: flow.h:400
#define FBLOCK_DESTROY(fb)
Definition: flow-hash.h:67
FlowConfig flow_config
Definition: flow-private.h:97
void FlowQueueDestroy(FlowQueue *q)
Destroy a flow queue.
Definition: flow-queue.c:61
struct Flow_ * hprev
Definition: flow.h:452
uint64_t todstbytecnt
Definition: flow.h:492
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:931
#define SCCtrlCondSignal
#define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC
Definition: flow-manager.c:102
uint32_t HostTimeoutHash(struct timeval *ts)
time out hosts from the hash
Definition: host-timeout.c:156
void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag)
Set a thread flag.
Definition: tm-threads.c:98
#define SC_ATOMIC_INIT(name)
Initialize the previously declared atomic variable and it&#39;s lock.
Definition: util-atomic.h:81
struct FlowManagerThreadData_ FlowManagerThreadData
#define SCCalloc(nm, a)
Definition: util-mem.h:253
#define SCMutexUnlock(mut)
FlowQueue * FlowQueueInit(FlowQueue *q)
Definition: flow-queue.c:47
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:185
#define FLOW_END_FLAG_STATE_NEW
Definition: flow.h:209
int run_mode
Definition: suricata.c:204
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt)
int FlowUpdateSpareFlows(void)
Make sure we have enough spare flows.
Definition: flow.c:144
SCMutex tv_root_lock
Definition: tm-threads.c:82
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
void TimeGet(struct timeval *tv)
Definition: util-time.c:146
#define TCP_SEG_LEN(seg)
void TmModuleFlowManagerRegister(void)
SCCtrlMutex flow_manager_ctrl_mutex
Definition: flow-manager.h:33
uint32_t IPPairTimeoutHash(struct timeval *ts)
time out ippairs from the hash
Data structures and function prototypes for keeping state for the detection engine.
SC_ATOMIC_EXTERN(unsigned int, flow_flags)
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:294
#define FLOW_DESTROY(f)
Definition: flow-util.h:119
struct ThreadVars_ * next
Definition: threadvars.h:111
uint32_t flows_timeout_inuse
Definition: flow-manager.c:118
#define TM_FLAG_MANAGEMENT_TM
Definition: tm-modules.h:36
void UtRegisterTest(const char *name, int(*TestFn)(void))
Register unit test.
#define SleepMsec(msec)
Definition: tm-threads.h:44
uint32_t est_timeout
Definition: flow.h:476
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition: output-flow.c:91
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1283
#define FLOW_END_FLAG_STATE_CLOSED
Definition: flow.h:211
uint16_t flow_mgr_rows_checked
Definition: flow-manager.c:636
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:163
Flow * tail
Definition: flow-hash.h:43
void * bypass_data
Definition: flow.h:488
void TmThreadsUnsetFlag(ThreadVars *tv, uint16_t flag)
Unset a thread flag.
Definition: tm-threads.c:106
#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC
Definition: flow-manager.c:105
uint16_t flow_mgr_flows_removed
Definition: flow-manager.c:634
uint16_t StatsRegisterGlobalCounter(const char *name, uint64_t(*Func)(void))
Registers a counter, which represents a global value.
Definition: counters.c:989
SCCtrlCondT flow_recycler_ctrl_cond
Definition: flow-manager.h:41
void PacketPoolDestroy(void)
struct LiveDevice_ * livedev
Definition: flow.h:350
uint32_t UTHBuildPacketOfFlows(uint32_t start, uint32_t end, uint8_t dir)
#define FLOW_BYPASSED_TIMEOUT
Definition: flow-private.h:65
int FlowForceReassemblyForFlow(Flow *f, int server, int client)
Definition: flow-timeout.c:339
#define THV_PAUSE
Definition: threadvars.h:37
struct timeval lastts
Definition: flow.h:358
struct FlowBucket_ * fb
Definition: flow.h:453
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void *initdata, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
Definition: output-flow.c:129
#define THV_KILL
Definition: threadvars.h:39
uint8_t flow_end_flags
Definition: flow.h:406
void PacketPoolInit(void)
const char * name
Definition: tm-modules.h:44
void TimeSetIncrementTime(uint32_t tv_sec)
increment the time in the engine
Definition: util-time.c:175
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:207
uint16_t flow_mgr_rows_skipped
Definition: flow-manager.c:637
struct TCPSEG seg_tree
Flow * FlowDequeue(FlowQueue *q)
remove a flow from the queue
Definition: flow-queue.c:105
FlowBucket * flow_hash
Definition: flow-private.h:96
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:254
#define SCCtrlMutexLock(mut)
#define SCFree(a)
Definition: util-mem.h:322
#define FLOW_IS_IPV6(f)
Definition: flow.h:136
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.h:73
#define FLOW_END_FLAG_EMERGENCY
Definition: flow.h:212
int ConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition: conf.c:437
uint16_t flow_mgr_flows_checked
Definition: flow-manager.c:630
#define SCCtrlCondTimedwait
uint64_t FlowGetMemuse(void)
Definition: flow.c:119
void FlowShutdown(void)
shutdown the flow engine
Definition: flow.c:667
uint64_t ts
#define FBLOCK_TRYLOCK(fb)
Definition: flow-hash.h:69
struct FlowTimeoutCounters_ FlowTimeoutCounters
#define SCLogPerf(...)
Definition: util-debug.h:261
uint32_t closed_timeout
Definition: flow.h:477
#define FatalError(x,...)
Definition: util-debug.h:539
uint64_t tosrcpktcnt
Definition: flow.h:489
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:136
uint16_t flow_bypassed_cnt_clo
Definition: flow-manager.c:642
#define FLOW_INITIALIZE(f)
Definition: flow-util.h:39
struct FlowRecyclerThreadData_ FlowRecyclerThreadData
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
#define TM_THREAD_NAME_MAX
Definition: tm-threads.h:48
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
Definition: output-flow.c:171
void FlowManagerThreadSpawn()
spawn the flow manager thread
Definition: flow-manager.c:868
int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:90
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:192
void * FlowGetStorageById(Flow *f, int id)
Definition: flow-storage.c:39
#define StatsSyncCounters(tv)
Definition: counters.h:133
#define SCCtrlMutexUnlock(mut)
bool(* BypassUpdate)(Flow *f, void *data, time_t tsec)
Definition: flow.h:486
char name[16]
Definition: threadvars.h:59
int g_detect_disabled
Definition: suricata.c:218
uint32_t len
Definition: flow-queue.h:45
void TmModuleFlowRecyclerRegister(void)
void FlowRecyclerThreadSpawn()
spawn the flow recycler thread
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition: flow.c:1027
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:79
int GetFlowBypassInfoID(void)
Definition: flow-util.c:209
uint32_t DefragTimeoutHash(struct timeval *ts)
time out tracker from the hash
uint8_t len
Per thread variable structure.
Definition: threadvars.h:57
void FlowTimeoutsEmergency(void)
Definition: flow-manager.c:95
#define SCCtrlMutexInit(mut, mutattr)
#define FLOW_END_FLAG_SHUTDOWN
Definition: flow.h:215
struct SCLogConfig_ SCLogConfig
Holds the config state used by the logging api.
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:142
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1855
uint8_t protomap
Definition: flow.h:404
Flow data structure.
Definition: flow.h:325
Definition: flow.h:261
uint32_t flags
Definition: flow.h:379
void FlowTimeoutsInit(void)
Definition: flow-manager.c:90
FlowState
Definition: flow.h:466
#define FBLOCK_LOCK(fb)
Definition: flow-hash.h:68
uint16_t flow_mgr_flows_notimeout
Definition: flow-manager.c:631
const char * thread_name_flow_mgr
Definition: runmodes.c:65
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:69
uint32_t emergency_recovery
Definition: flow.h:273
const char * thread_name_flow_rec
Definition: runmodes.c:66
FlowQueue flow_spare_q
Definition: flow-private.h:91
void FlowDisableFlowRecyclerThread(void)
Used to disable flow recycler thread(s).
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1953
void FlowInitConfig(char quiet)
initialize the configuration
Definition: flow.c:512
uint32_t hash_size
Definition: flow.h:264