suricata
flow-manager.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2020 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22  * \author Victor Julien <victor@inliniac.net>
23  */
24 
25 #include "suricata-common.h"
26 #include "suricata.h"
27 #include "decode.h"
28 #include "conf.h"
29 #include "threadvars.h"
30 #include "tm-threads.h"
31 #include "runmodes.h"
32 
33 #include "util-random.h"
34 #include "util-time.h"
35 
36 #include "flow.h"
37 #include "flow-queue.h"
38 #include "flow-hash.h"
39 #include "flow-util.h"
40 #include "flow-var.h"
41 #include "flow-private.h"
42 #include "flow-timeout.h"
43 #include "flow-manager.h"
44 #include "flow-storage.h"
45 #include "flow-spare-pool.h"
46 
47 #include "stream-tcp-private.h"
48 #include "stream-tcp-reassemble.h"
49 #include "stream-tcp.h"
50 
51 #include "util-unittest.h"
52 #include "util-unittest-helper.h"
53 #include "util-byte.h"
54 
55 #include "util-debug.h"
56 #include "util-privs.h"
57 #include "util-signal.h"
58 
59 #include "threads.h"
60 #include "detect.h"
61 #include "detect-engine-state.h"
62 #include "stream.h"
63 
64 #include "app-layer-parser.h"
65 
66 #include "host-timeout.h"
67 #include "defrag-timeout.h"
68 #include "ippair-timeout.h"
69 #include "app-layer-htp-range.h"
70 
71 #include "output-flow.h"
72 #include "util-validate.h"
73 
74 /* Run mode selected at suricata.c */
75 extern int run_mode;
76 
77 /** queue to pass flows to cleanup/log thread(s) */
79 
80 /* multi flow mananger support */
81 static uint32_t flowmgr_number = 1;
82 /* atomic counter for flow managers, to assign instance id */
83 SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
84 
85 /* multi flow recycler support */
86 static uint32_t flowrec_number = 1;
87 /* atomic counter for flow recyclers, to assign instance id */
88 SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
89 SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
90 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
91 
92 void FlowTimeoutsInit(void)
93 {
94  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
95 }
96 
98 {
99  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
100 }
101 
102 /* 1 seconds */
103 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
104 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
105 /* 0.3 seconds */
106 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
107 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
108 #define NEW_FLOW_COUNT_COND 10
109 
110 typedef struct FlowTimeoutCounters_ {
111  uint32_t new;
112  uint32_t est;
113  uint32_t clo;
114  uint32_t byp;
115 
116  uint32_t rows_checked;
117  uint32_t rows_skipped;
118  uint32_t rows_empty;
119  uint32_t rows_busy;
120  uint32_t rows_maxlen;
121 
122  uint32_t flows_checked;
123  uint32_t flows_notimeout;
124  uint32_t flows_timeout;
126  uint32_t flows_removed;
127  uint32_t flows_aside;
129 
130  uint32_t bypassed_count;
131  uint64_t bypassed_pkts;
132  uint64_t bypassed_bytes;
134 
135 /**
136  * \brief Used to disable flow manager thread(s).
137  *
138  * \todo Kinda hackish since it uses the tv name to identify flow manager
139  * thread. We need an all weather identification scheme.
140  */
142 {
144  /* flow manager thread(s) is/are a part of mgmt threads */
145  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
146  if (strncasecmp(tv->name, thread_name_flow_mgr,
147  strlen(thread_name_flow_mgr)) == 0)
148  {
150  }
151  }
153 
154  struct timeval start_ts;
155  struct timeval cur_ts;
156  gettimeofday(&start_ts, NULL);
157 
158 again:
159  gettimeofday(&cur_ts, NULL);
160  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
161  FatalError(SC_ERR_SHUTDOWN, "unable to get all flow manager "
162  "threads to shutdown in time");
163  }
164 
166  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
167  if (strncasecmp(tv->name, thread_name_flow_mgr,
168  strlen(thread_name_flow_mgr)) == 0)
169  {
172  /* sleep outside lock */
173  SleepMsec(1);
174  goto again;
175  }
176  }
177  }
179 
180  /* reset count, so we can kill and respawn (unix socket) */
181  SC_ATOMIC_SET(flowmgr_cnt, 0);
182  return;
183 }
184 
185 /** \internal
186  * \brief check if a flow is timed out
187  *
188  * \param f flow
189  * \param ts timestamp
190  *
191  * \retval 0 not timed out
192  * \retval 1 timed out
193  */
194 static int FlowManagerFlowTimeout(Flow *f, struct timeval *ts, int32_t *next_ts, const bool emerg)
195 {
196  int32_t flow_times_out_at = f->timeout_at;
197  if (emerg) {
199  flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
200  }
201  if (*next_ts == 0 || flow_times_out_at < *next_ts)
202  *next_ts = flow_times_out_at;
203 
204  /* do the timeout check */
205  if (flow_times_out_at >= ts->tv_sec) {
206  return 0;
207  }
208 
209  return 1;
210 }
211 
212 /** \internal
213  * \brief check timeout of captured bypassed flow by querying capture method
214  *
215  * \param f Flow
216  * \param ts timestamp
217  * \param counters Flow timeout counters
218  *
219  * \retval 0 not timeout
220  * \retval 1 timeout (or not capture bypassed)
221  */
222 static inline int FlowBypassedTimeout(Flow *f, struct timeval *ts,
223  FlowTimeoutCounters *counters)
224 {
225 #ifdef CAPTURE_OFFLOAD
226  if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
227  return 1;
228  }
229 
231  if (fc && fc->BypassUpdate) {
232  /* flow will be possibly updated */
233  uint64_t pkts_tosrc = fc->tosrcpktcnt;
234  uint64_t bytes_tosrc = fc->tosrcbytecnt;
235  uint64_t pkts_todst = fc->todstpktcnt;
236  uint64_t bytes_todst = fc->todstbytecnt;
237  bool update = fc->BypassUpdate(f, fc->bypass_data, ts->tv_sec);
238  if (update) {
239  SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f));
240  pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
241  bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
242  pkts_todst = fc->todstpktcnt - pkts_todst;
243  bytes_todst = fc->todstbytecnt - bytes_todst;
244  if (f->livedev) {
245  SC_ATOMIC_ADD(f->livedev->bypassed,
246  pkts_tosrc + pkts_todst);
247  }
248  counters->bypassed_pkts += pkts_tosrc + pkts_todst;
249  counters->bypassed_bytes += bytes_tosrc + bytes_todst;
250  return 0;
251  } else {
252  SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
253  if (f->livedev) {
254  if (FLOW_IS_IPV4(f)) {
255  LiveDevSubBypassStats(f->livedev, 1, AF_INET);
256  } else if (FLOW_IS_IPV6(f)) {
257  LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
258  }
259  }
260  counters->bypassed_count++;
261  return 1;
262  }
263  }
264 #endif /* CAPTURE_OFFLOAD */
265  return 1;
266 }
267 
268 /** \internal
269  * \brief See if we can really discard this flow. Check use_cnt reference
270  * counter and force reassembly if necessary.
271  *
272  * \param f flow
273  * \param ts timestamp
274  *
275  * \retval 0 not timed out just yet
276  * \retval 1 fully timed out, lets kill it
277  */
278 #if 0
279 static inline int FlowManagerFlowTimedOut(Flow *f, struct timeval *ts,
280  FlowTimeoutCounters *counters)
281 {
282  /* never prune a flow that is used by a packet we
283  * are currently processing in one of the threads */
284  if (f->use_cnt > 0) {
285  return 0;
286  }
287 
288  if (!FlowBypassedTimeout(f, ts, counters)) {
289  return 0;
290  }
291 
292  int server = 0, client = 0;
293 
294  if (!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
295 #ifdef CAPTURE_OFFLOAD
296  f->flow_state != FLOW_STATE_CAPTURE_BYPASSED &&
297 #endif
299  FlowForceReassemblyNeedReassembly(f, &server, &client) == 1) {
300  FlowForceReassemblyForFlow(f, server, client);
301  return 0;
302  }
303 #ifdef DEBUG
304  /* this should not be possible */
305  BUG_ON(f->use_cnt > 0);
306 #endif
307 
308  return 1;
309 }
310 #endif
311 
312 static inline int FMTryLockBucket(FlowBucket *fb)
313 {
314  int r = FBLOCK_TRYLOCK(fb);
315  return r;
316 }
317 static inline void FMFlowLock(Flow *f)
318 {
319  FLOWLOCK_WRLOCK(f);
320 }
321 
322 typedef struct FlowManagerTimeoutThread {
323  /* used to temporarily store flows that have timed out and are
324  * removed from the hash */
327 
328 static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
329 {
330  FlowQueuePrivate recycle = { NULL, NULL, 0 };
331  counters->flows_aside += td->aside_queue.len;
332 
333  uint32_t cnt = 0;
334  Flow *f;
335  while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
336  /* flow is still locked */
337 
338  if (f->proto == IPPROTO_TCP && !(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
339 #ifdef CAPTURE_OFFLOAD
340  f->flow_state != FLOW_STATE_CAPTURE_BYPASSED &&
341 #endif
344  /* Send the flow to its thread */
346  FLOWLOCK_UNLOCK(f);
347  /* flow ownership is passed to the worker thread */
348 
349  counters->flows_aside_needs_work++;
350  continue;
351  }
352  FLOWLOCK_UNLOCK(f);
353 
354  FlowQueuePrivateAppendFlow(&recycle, f);
355  if (recycle.len == 100) {
357  }
358  cnt++;
359  }
360  if (recycle.len) {
362  }
363  return cnt;
364 }
365 
366 /**
367  * \internal
368  *
369  * \brief check all flows in a hash row for timing out
370  *
371  * \param f last flow in the hash row
372  * \param ts timestamp
373  * \param emergency bool indicating emergency mode
374  * \param counters ptr to FlowTimeoutCounters structure
375  */
376 static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td,
377  Flow *f, struct timeval *ts,
378  int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
379 {
380  uint32_t checked = 0;
381  Flow *prev_f = NULL;
382 
383  do {
384  checked++;
385 
386  /* check flow timeout based on lastts and state. Both can be
387  * accessed w/o Flow lock as we do have the hash row lock (so flow
388  * can't disappear) and flow_state is atomic. lastts can only
389  * be modified when we have both the flow and hash row lock */
390 
391  /* timeout logic goes here */
392  if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {
393 
394  counters->flows_notimeout++;
395 
396  prev_f = f;
397  f = f->next;
398  continue;
399  }
400 
401  FMFlowLock(f); //FLOWLOCK_WRLOCK(f);
402 
403  Flow *next_flow = f->next;
404 
405  /* never prune a flow that is used by a packet we
406  * are currently processing in one of the threads */
407  if (f->use_cnt > 0 || !FlowBypassedTimeout(f, ts, counters)) {
408  FLOWLOCK_UNLOCK(f);
409  prev_f = f;
410  if (f->use_cnt > 0) {
411  counters->flows_timeout_inuse++;
412  }
413  f = f->next;
414  continue;
415  }
416 
418 
419  counters->flows_timeout++;
420 
421  RemoveFromHash(f, prev_f);
422 
424  /* flow is still locked in the queue */
425 
426  f = next_flow;
427  } while (f != NULL);
428 
429  counters->flows_checked += checked;
430  if (checked > counters->rows_maxlen)
431  counters->rows_maxlen = checked;
432 }
433 
434 static void FlowManagerHashRowClearEvictedList(FlowManagerTimeoutThread *td,
435  Flow *f, struct timeval *ts, FlowTimeoutCounters *counters)
436 {
437  do {
438  FLOWLOCK_WRLOCK(f);
439  Flow *next_flow = f->next;
440  f->next = NULL;
441  f->fb = NULL;
442 
444 
446  /* flow is still locked in the queue */
447 
448  f = next_flow;
449  } while (f != NULL);
450 }
451 
452 /**
453  * \brief time out flows from the hash
454  *
455  * \param ts timestamp
456  * \param hash_min min hash index to consider
457  * \param hash_max max hash index to consider
458  * \param counters ptr to FlowTimeoutCounters structure
459  *
460  * \retval cnt number of timed out flow
461  */
462 static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td,
463  struct timeval *ts,
464  const uint32_t hash_min, const uint32_t hash_max,
465  FlowTimeoutCounters *counters)
466 {
467  uint32_t cnt = 0;
468  const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
469  const uint32_t rows_checked = hash_max - hash_min;
470  uint32_t rows_skipped = 0;
471  uint32_t rows_busy = 0;
472  uint32_t rows_empty = 0;
473 
474 #if __WORDSIZE==64
475 #define BITS 64
476 #define TYPE uint64_t
477 #else
478 #define BITS 32
479 #define TYPE uint32_t
480 #endif
481 
482  for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
483  TYPE check_bits = 0;
484  const uint32_t check = MIN(BITS, (hash_max - idx));
485  for (uint32_t i = 0; i < check; i++) {
486  FlowBucket *fb = &flow_hash[idx+i];
487  check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= (int32_t)ts->tv_sec) << (TYPE)i;
488  }
489  if (check_bits == 0)
490  continue;
491 
492  for (uint32_t i = 0; i < check; i++) {
493  FlowBucket *fb = &flow_hash[idx+i];
494  if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= (int32_t)ts->tv_sec) {
495  if (FMTryLockBucket(fb) == 0) {
496  Flow *evicted = NULL;
497  if (fb->evicted != NULL || fb->head != NULL) {
498  /* if evicted is set, we only process that list right now.
499  * Since its set we've had traffic that touched this row
500  * very recently, and there is a good chance more of it will
501  * come in in the near future. So unlock the row asap and leave
502  * the possible eviction of flows to the packet lookup path. */
503  if (fb->evicted != NULL) {
504  /* transfer out of bucket so we can do additional work outside
505  * of the bucket lock */
506  evicted = fb->evicted;
507  fb->evicted = NULL;
508  } else if (fb->head != NULL) {
509  int32_t next_ts = 0;
510  FlowManagerHashRowTimeout(td,
511  fb->head, ts, emergency, counters, &next_ts);
512 
513  if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
514  SC_ATOMIC_SET(fb->next_ts, next_ts);
515  }
516  if (fb->evicted == NULL && fb->head == NULL) {
517  SC_ATOMIC_SET(fb->next_ts, INT_MAX);
518  } else if (fb->evicted != NULL && fb->head == NULL) {
519  SC_ATOMIC_SET(fb->next_ts, 0);
520  }
521  } else {
522  SC_ATOMIC_SET(fb->next_ts, INT_MAX);
523  rows_empty++;
524  }
525  FBLOCK_UNLOCK(fb);
526  /* processed evicted list */
527  if (evicted) {
528  FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
529  }
530  } else {
531  rows_busy++;
532  }
533  } else {
534  rows_skipped++;
535  }
536  }
537  if (td->aside_queue.len) {
538  cnt += ProcessAsideQueue(td, counters);
539  }
540  }
541 
542  counters->rows_checked += rows_checked;
543  counters->rows_skipped += rows_skipped;
544  counters->rows_busy += rows_busy;
545  counters->rows_empty += rows_empty;
546 
547  if (td->aside_queue.len) {
548  cnt += ProcessAsideQueue(td, counters);
549  }
550  counters->flows_removed += cnt;
551  /* coverity[missing_unlock : FALSE] */
552  return cnt;
553 }
554 
555 static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td,
556  struct timeval *ts,
557  const uint32_t hash_min, const uint32_t hash_max,
558  FlowTimeoutCounters *counters, uint32_t iter, const uint32_t chunks)
559 {
560  const uint32_t rows = hash_max - hash_min;
561  const uint32_t chunk_size = rows / chunks;
562 
563  const uint32_t min = iter * chunk_size + hash_min;
564  uint32_t max = min + chunk_size;
565  /* we start at beginning of hash at next iteration so let's check
566  * hash till the end */
567  if (iter + 1 == chunks) {
568  max = hash_max;
569  }
570  const uint32_t cnt = FlowTimeoutHash(td, ts, min, max, counters);
571  return cnt;
572 }
573 
574 /**
575  * \internal
576  *
577  * \brief move all flows out of a hash row
578  *
579  * \param f last flow in the hash row
580  *
581  * \retval cnt removed out flows
582  */
583 static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
584 {
585  uint32_t cnt = 0;
586 
587  do {
588  FLOWLOCK_WRLOCK(f);
589 
590  Flow *next_flow = f->next;
591 
592  /* remove from the hash */
593  if (mode == 0) {
594  RemoveFromHash(f, NULL);
595  } else {
596  FlowBucket *fb = f->fb;
597  fb->evicted = f->next;
598  f->next = NULL;
599  f->fb = NULL;
600  }
602 
603  /* no one is referring to this flow, use_cnt 0, removed from hash
604  * so we can unlock it and move it to the recycle queue. */
605  FLOWLOCK_UNLOCK(f);
606  FlowQueuePrivateAppendFlow(recycle_q, f);
607 
608  cnt++;
609 
610  f = next_flow;
611  } while (f != NULL);
612 
613  return cnt;
614 }
615 
616 /**
617  * \brief remove all flows from the hash
618  *
619  * \retval cnt number of removes out flows
620  */
621 static uint32_t FlowCleanupHash(void)
622 {
623  FlowQueuePrivate local_queue = { NULL, NULL, 0 };
624  uint32_t cnt = 0;
625 
626  for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
627  FlowBucket *fb = &flow_hash[idx];
628 
629  FBLOCK_LOCK(fb);
630 
631  if (fb->head != NULL) {
632  /* we have a flow, or more than one */
633  cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
634  }
635  if (fb->evicted != NULL) {
636  /* we have a flow, or more than one */
637  cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
638  }
639 
640  FBLOCK_UNLOCK(fb);
641  if (local_queue.len >= 25) {
642  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
643  }
644  }
645  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
646 
647  return cnt;
648 }
649 
650 static void Recycler(ThreadVars *tv, void *output_thread_data, Flow *f)
651 {
652  FLOWLOCK_WRLOCK(f);
653 
654  (void)OutputFlowLog(tv, output_thread_data, f);
655 
656  FlowClearMemory (f, f->protomap);
657  FLOWLOCK_UNLOCK(f);
659 }
660 
661 typedef struct FlowQueueTimeoutCounters {
662  uint32_t flows_removed;
663  uint32_t flows_timeout;
665 
666 extern int g_detect_disabled;
667 
668 typedef struct FlowCounters_ {
674  uint16_t flow_mgr_spare;
677 
684 
686 
691 
692 typedef struct FlowManagerThreadData_ {
693  uint32_t instance;
694  uint32_t min;
695  uint32_t max;
696 
698 
701 
702 static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
703 {
704  fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t);
705  fc->flow_mgr_cnt_clo = StatsRegisterCounter("flow.mgr.closed_pruned", t);
706  fc->flow_mgr_cnt_new = StatsRegisterCounter("flow.mgr.new_pruned", t);
707  fc->flow_mgr_cnt_est = StatsRegisterCounter("flow.mgr.est_pruned", t);
708  fc->flow_mgr_cnt_byp = StatsRegisterCounter("flow.mgr.bypassed_pruned", t);
709  fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
710  fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
711  fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
712 
713  fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t);
714  fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t);
715  fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t);
716  fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t);
717  fc->flow_mgr_flows_timeout_inuse = StatsRegisterCounter("flow.mgr.flows_timeout_inuse", t);
718  fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t);
719  fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t);
720 
721  fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
722  fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
723  fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
724 }
725 
726 static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
727 {
729  if (ftd == NULL)
730  return TM_ECODE_FAILED;
731 
732  ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
733  SCLogDebug("flow manager instance %u", ftd->instance);
734 
735  /* set the min and max value used for hash row walking
736  * each thread has it's own section of the flow hash */
737  uint32_t range = flow_config.hash_size / flowmgr_number;
738  if (ftd->instance == 0)
739  ftd->max = range;
740  else if ((ftd->instance + 1) == flowmgr_number) {
741  ftd->min = (range * ftd->instance) + 1;
742  ftd->max = flow_config.hash_size;
743  } else {
744  ftd->min = (range * ftd->instance) + 1;
745  ftd->max = (range * (ftd->instance + 1));
746  }
748 
749  SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
750 
751  /* pass thread data back to caller */
752  *data = ftd;
753 
754  FlowCountersInit(t, &ftd->cnt);
755 
756  PacketPoolInit();
757  return TM_ECODE_OK;
758 }
759 
760 static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
761 {
763  SCFree(data);
764  return TM_ECODE_OK;
765 }
766 
767 static uint32_t FlowTimeoutsMin(void)
768 {
769  FlowProtoTimeoutPtr t = SC_ATOMIC_GET(flow_timeouts);
770  uint32_t m = -1;
771  for (unsigned int i = 0; i < FLOW_PROTO_MAX; i++) {
772  m = MIN(m, t[i].new_timeout);
773  m = MIN(m, t[i].est_timeout);
774 
775  if (i == FLOW_PROTO_TCP) {
776  m = MIN(m, t[i].closed_timeout);
777  }
778  if (i == FLOW_PROTO_TCP || i == FLOW_PROTO_UDP) {
779  m = MIN(m, t[i].bypassed_timeout);
780  }
781  }
782  return m;
783 }
784 
785 //#define FM_PROFILE
786 
787 /** \brief Thread that manages the flow table and times out flows.
788  *
789  * \param td ThreadVars casted to void ptr
790  *
791  * Keeps an eye on the spare list, alloc flows if needed...
792  */
793 static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
794 {
795  FlowManagerThreadData *ftd = thread_data;
796  struct timeval ts;
797  uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
798  bool emerg = false;
799  bool prev_emerg = false;
800  uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
801  uint32_t flow_last_sec = 0;
802 /* VJ leaving disabled for now, as hosts are only used by tags and the numbers
803  * are really low. Might confuse ppl
804  uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v);
805  uint16_t flow_mgr_host_active = StatsRegisterCounter("hosts.active", th_v);
806  uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v);
807 */
808  memset(&ts, 0, sizeof(ts));
809  uint32_t hash_passes = 0;
810 #ifdef FM_PROFILE
811  uint32_t hash_row_checks = 0;
812  uint32_t hash_passes_chunks = 0;
813 #endif
814  uint32_t hash_full_passes = 0;
815 
816  const uint32_t min_timeout = FlowTimeoutsMin();
817  const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60;
818 
819  /* don't start our activities until time is setup */
820  while (!TimeModeIsReady()) {
821  if (suricata_ctl_flags != 0)
822  return TM_ECODE_OK;
823  }
824 
825  SCLogDebug("FM %s/%d starting. min_timeout %us. Full hash pass in %us", th_v->name,
826  ftd->instance, min_timeout, pass_in_sec);
827 
828 #ifdef FM_PROFILE
829  struct timeval endts;
830  struct timeval active;
831  struct timeval paused;
832  struct timeval sleeping;
833  memset(&endts, 0, sizeof(endts));
834  memset(&active, 0, sizeof(active));
835  memset(&paused, 0, sizeof(paused));
836  memset(&sleeping, 0, sizeof(sleeping));
837 #endif
838 
839  struct timeval startts;
840  memset(&startts, 0, sizeof(startts));
841  gettimeofday(&startts, NULL);
842 
843  uint32_t hash_pass_iter = 0;
844  uint32_t emerg_over_cnt = 0;
845  uint64_t next_run_ms = 0;
846 
847  while (1)
848  {
849  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
851 #ifdef FM_PROFILE
852  struct timeval pause_startts;
853  memset(&pause_startts, 0, sizeof(pause_startts));
854  gettimeofday(&pause_startts, NULL);
855 #endif
857 #ifdef FM_PROFILE
858  struct timeval pause_endts;
859  memset(&pause_endts, 0, sizeof(pause_endts));
860  gettimeofday(&pause_endts, NULL);
861  struct timeval pause_time;
862  memset(&pause_time, 0, sizeof(pause_time));
863  timersub(&pause_endts, &pause_startts, &pause_time);
864  timeradd(&paused, &pause_time, &paused);
865 #endif
867  }
868 
869  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
870  emerg = true;
871  }
872 #ifdef FM_PROFILE
873  struct timeval run_startts;
874  memset(&run_startts, 0, sizeof(run_startts));
875  gettimeofday(&run_startts, NULL);
876 #endif
877  /* Get the time */
878  memset(&ts, 0, sizeof(ts));
879  TimeGet(&ts);
880  SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
881  const uint64_t ts_ms = ts.tv_sec * 1000 + ts.tv_usec / 1000;
882  const uint32_t rt = (uint32_t)ts.tv_sec;
883  const bool emerge_p = (emerg && !prev_emerg);
884  if (emerge_p) {
885  next_run_ms = 0;
886  prev_emerg = true;
887  SCLogNotice("Flow emergency mode entered...");
888  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter);
889  }
890  if (ts_ms >= next_run_ms) {
891  if (ftd->instance == 0) {
892  const uint32_t sq_len = FlowSpareGetPoolSize();
893  const uint32_t spare_perc = sq_len * 100 / flow_config.prealloc;
894  /* see if we still have enough spare flows */
895  if (spare_perc < 90 || spare_perc > 110) {
896  FlowSparePoolUpdate(sq_len);
897  }
898  }
899  const uint32_t secs_passed = rt - flow_last_sec;
900 
901  /* try to time out flows */
902  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0};
903 
904  if (emerg) {
905  /* in emergency mode, do a full pass of the hash table */
906  FlowTimeoutHash(&ftd->timeout, &ts, ftd->min, ftd->max, &counters);
907  hash_passes++;
908  hash_full_passes++;
909  hash_passes++;
910 #ifdef FM_PROFILE
911  hash_passes_chunks += 1;
912  hash_row_checks += counters.rows_checked;
913 #endif
914  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
915  } else {
916  /* non-emergency mode: scan part of the hash */
917  const uint32_t chunks = MIN(secs_passed, pass_in_sec);
918  for (uint32_t i = 0; i < chunks; i++) {
919  FlowTimeoutHashInChunks(&ftd->timeout, &ts, ftd->min, ftd->max,
920  &counters, hash_pass_iter, pass_in_sec);
921  hash_pass_iter++;
922  if (hash_pass_iter == pass_in_sec) {
923  hash_pass_iter = 0;
924  hash_full_passes++;
925  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
926  }
927  }
928  hash_passes++;
929 #ifdef FM_PROFILE
930  hash_row_checks += counters.rows_checked;
931  hash_passes_chunks += chunks;
932 #endif
933  }
934  flow_last_sec = rt;
935 
936  /*
937  StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned);
938  uint32_t hosts_active = HostGetActiveCount();
939  StatsSetUI64(th_v, flow_mgr_host_active, (uint64_t)hosts_active);
940  uint32_t hosts_spare = HostGetSpareCount();
941  StatsSetUI64(th_v, flow_mgr_host_spare, (uint64_t)hosts_spare);
942  */
943  StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_clo, (uint64_t)counters.clo);
944  StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_new, (uint64_t)counters.new);
945  StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_est, (uint64_t)counters.est);
946  StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_byp, (uint64_t)counters.byp);
947 
948  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters.flows_checked);
949  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters.flows_notimeout);
950 
951  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters.flows_timeout);
952  //StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_removed, (uint64_t)counters.flows_removed);
953  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout_inuse, (uint64_t)counters.flows_timeout_inuse);
954  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters.flows_aside);
956 
957  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters.bypassed_count);
958  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters.bypassed_pkts);
959  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters.bypassed_bytes);
960 
961  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters.rows_maxlen);
962  // TODO AVG MAXLEN
963  // TODO LOOKUP STEPS MAXLEN and AVG LEN
964  /* Don't fear, FlowManagerThread is here...
965  * clear emergency bit if we have at least xx flows pruned. */
966  uint32_t len = FlowSpareGetPoolSize();
967  StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)len);
968  if (emerg == true) {
969  SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
970  "flow_spare_q status: %"PRIu32"%% flows at the queue",
972 
973  /* only if we have pruned this "emergency_recovery" percentage
974  * of flows, we will unset the emergency bit */
976  emerg_over_cnt++;
977  } else {
978  emerg_over_cnt = 0;
979  }
980 
981  if (emerg_over_cnt >= 30) {
982  SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
984 
985  emerg = false;
986  prev_emerg = false;
987  emerg_over_cnt = 0;
988  hash_pass_iter = 0;
989  SCLogNotice("Flow emergency mode over, back to normal... unsetting"
990  " FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", "
991  "ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32
992  "%% flows at the queue", (uintmax_t)ts.tv_sec,
993  (uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
994 
995  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
996  }
997  }
998  next_run_ms = ts_ms + 667;
999  if (emerg)
1000  next_run_ms = ts_ms + 250;
1001  }
1002  if (flow_last_sec == 0) {
1003  flow_last_sec = rt;
1004  }
1005 
1006  if (ftd->instance == 0 &&
1007  (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec)) {
1009  //uint32_t hosts_pruned =
1010  HostTimeoutHash(&ts);
1013  other_last_sec = (uint32_t)ts.tv_sec;
1014  }
1015 
1016 
1017 #ifdef FM_PROFILE
1018  struct timeval run_endts;
1019  memset(&run_endts, 0, sizeof(run_endts));
1020  gettimeofday(&run_endts, NULL);
1021  struct timeval run_time;
1022  memset(&run_time, 0, sizeof(run_time));
1023  timersub(&run_endts, &run_startts, &run_time);
1024  timeradd(&active, &run_time, &active);
1025 #endif
1026 
1027  if (TmThreadsCheckFlag(th_v, THV_KILL)) {
1028  StatsSyncCounters(th_v);
1029  break;
1030  }
1031 
1032 #ifdef FM_PROFILE
1033  struct timeval sleep_startts;
1034  memset(&sleep_startts, 0, sizeof(sleep_startts));
1035  gettimeofday(&sleep_startts, NULL);
1036 #endif
1037  usleep(250);
1038 
1039 #ifdef FM_PROFILE
1040  struct timeval sleep_endts;
1041  memset(&sleep_endts, 0, sizeof(sleep_endts));
1042  gettimeofday(&sleep_endts, NULL);
1043 
1044  struct timeval sleep_time;
1045  memset(&sleep_time, 0, sizeof(sleep_time));
1046  timersub(&sleep_endts, &sleep_startts, &sleep_time);
1047  timeradd(&sleeping, &sleep_time, &sleeping);
1048 #endif
1049  SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
1050 
1052  }
1053  SCLogPerf("%" PRIu32 " new flows, %" PRIu32 " established flows were "
1054  "timed out, %"PRIu32" flows in closed state", new_cnt,
1055  established_cnt, closing_cnt);
1056 
1057 #ifdef FM_PROFILE
1058  SCLogNotice("hash passes %u avg chunks %u full %u rows %u (rows/s %u)",
1059  hash_passes, hash_passes_chunks / (hash_passes ? hash_passes : 1),
1060  hash_full_passes, hash_row_checks,
1061  hash_row_checks / ((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
1062 
1063  gettimeofday(&endts, NULL);
1064  struct timeval total_run_time;
1065  timersub(&endts, &startts, &total_run_time);
1066 
1067  SCLogNotice("FM: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
1068  (uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
1069  (uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
1070  (uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
1071  (uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
1072 #endif
1073  return TM_ECODE_OK;
1074 }
1075 
1076 /** \brief spawn the flow manager thread */
1078 {
1079  intmax_t setting = 1;
1080  (void)ConfGetInt("flow.managers", &setting);
1081 
1082  if (setting < 1 || setting > 1024) {
1084  "invalid flow.managers setting %"PRIdMAX, setting);
1085  }
1086  flowmgr_number = (uint32_t)setting;
1087 
1088  SCLogConfig("using %u flow manager threads", flowmgr_number);
1089  StatsRegisterGlobalCounter("flow.memuse", FlowGetMemuse);
1090 
1091  for (uint32_t u = 0; u < flowmgr_number; u++) {
1092  char name[TM_THREAD_NAME_MAX];
1093  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
1094 
1095  ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
1096  "FlowManager", 0);
1097  BUG_ON(tv_flowmgr == NULL);
1098 
1099  if (tv_flowmgr == NULL) {
1100  FatalError(SC_ERR_FATAL, "flow manager thread creation failed");
1101  }
1102  if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
1103  FatalError(SC_ERR_FATAL, "flow manager thread spawn failed");
1104  }
1105  }
1106  return;
1107 }
1108 
1109 typedef struct FlowRecyclerThreadData_ {
1112 
1113 static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
1114 {
1116  if (ftd == NULL)
1117  return TM_ECODE_FAILED;
1118  if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
1119  SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
1120  SCFree(ftd);
1121  return TM_ECODE_FAILED;
1122  }
1123  SCLogDebug("output_thread_data %p", ftd->output_thread_data);
1124 
1125  *data = ftd;
1126  return TM_ECODE_OK;
1127 }
1128 
1129 static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
1130 {
1132  if (ftd->output_thread_data != NULL)
1134 
1135  SCFree(data);
1136  return TM_ECODE_OK;
1137 }
1138 
1139 /** \brief Thread that manages timed out flows.
1140  *
1141  * \param td ThreadVars casted to void ptr
1142  */
1143 static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
1144 {
1145  struct timeval ts;
1146  uint64_t recycled_cnt = 0;
1147  FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
1148  BUG_ON(ftd == NULL);
1149 
1150  memset(&ts, 0, sizeof(ts));
1151  uint32_t fr_passes = 0;
1152 
1153 #ifdef FM_PROFILE
1154  struct timeval endts;
1155  struct timeval active;
1156  struct timeval paused;
1157  struct timeval sleeping;
1158  memset(&endts, 0, sizeof(endts));
1159  memset(&active, 0, sizeof(active));
1160  memset(&paused, 0, sizeof(paused));
1161  memset(&sleeping, 0, sizeof(sleeping));
1162 #endif
1163  struct timeval startts;
1164  memset(&startts, 0, sizeof(startts));
1165  gettimeofday(&startts, NULL);
1166 
1167  while (1)
1168  {
1169  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
1171 #ifdef FM_PROFILE
1172  struct timeval pause_startts;
1173  memset(&pause_startts, 0, sizeof(pause_startts));
1174  gettimeofday(&pause_startts, NULL);
1175 #endif
1177 
1178 #ifdef FM_PROFILE
1179  struct timeval pause_endts;
1180  memset(&pause_endts, 0, sizeof(pause_endts));
1181  gettimeofday(&pause_endts, NULL);
1182 
1183  struct timeval pause_time;
1184  memset(&pause_time, 0, sizeof(pause_time));
1185  timersub(&pause_endts, &pause_startts, &pause_time);
1186  timeradd(&paused, &pause_time, &paused);
1187 #endif
1189  }
1190  fr_passes++;
1191 #ifdef FM_PROFILE
1192  struct timeval run_startts;
1193  memset(&run_startts, 0, sizeof(run_startts));
1194  gettimeofday(&run_startts, NULL);
1195 #endif
1196  SC_ATOMIC_ADD(flowrec_busy,1);
1198 
1199  const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
1200 
1201  /* Get the time */
1202  memset(&ts, 0, sizeof(ts));
1203  TimeGet(&ts);
1204  SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
1205 
1206  Flow *f;
1207  while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
1208  Recycler(th_v, ftd->output_thread_data, f);
1209  recycled_cnt++;
1210  }
1211  SC_ATOMIC_SUB(flowrec_busy,1);
1212 
1213 #ifdef FM_PROFILE
1214  struct timeval run_endts;
1215  memset(&run_endts, 0, sizeof(run_endts));
1216  gettimeofday(&run_endts, NULL);
1217 
1218  struct timeval run_time;
1219  memset(&run_time, 0, sizeof(run_time));
1220  timersub(&run_endts, &run_startts, &run_time);
1221  timeradd(&active, &run_time, &active);
1222 #endif
1223 
1224  if (bail) {
1225  break;
1226  }
1227 
1228 #ifdef FM_PROFILE
1229  struct timeval sleep_startts;
1230  memset(&sleep_startts, 0, sizeof(sleep_startts));
1231  gettimeofday(&sleep_startts, NULL);
1232 #endif
1233  usleep(250);
1234 #ifdef FM_PROFILE
1235  struct timeval sleep_endts;
1236  memset(&sleep_endts, 0, sizeof(sleep_endts));
1237  gettimeofday(&sleep_endts, NULL);
1238  struct timeval sleep_time;
1239  memset(&sleep_time, 0, sizeof(sleep_time));
1240  timersub(&sleep_endts, &sleep_startts, &sleep_time);
1241  timeradd(&sleeping, &sleep_time, &sleeping);
1242 #endif
1243 
1244  SCLogDebug("woke up...");
1245 
1247  }
1248  StatsSyncCounters(th_v);
1249 #ifdef FM_PROFILE
1250  gettimeofday(&endts, NULL);
1251  struct timeval total_run_time;
1252  timersub(&endts, &startts, &total_run_time);
1253  SCLogNotice("FR: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
1254  (uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
1255  (uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
1256  (uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
1257  (uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
1258 
1259  SCLogNotice("FR passes %u passes/s %u", fr_passes,
1260  (uint32_t)fr_passes/((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
1261 #endif
1262  SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
1263  return TM_ECODE_OK;
1264 }
1265 
1266 static bool FlowRecyclerReadyToShutdown(void)
1267 {
1268  if (SC_ATOMIC_GET(flowrec_busy) != 0) {
1269  return false;
1270  }
1271  uint32_t len = 0;
1273  len = flow_recycle_q.qlen;
1275 
1276  return ((len == 0));
1277 }
1278 
1279 /** \brief spawn the flow recycler thread */
1281 {
1282  intmax_t setting = 1;
1283  (void)ConfGetInt("flow.recyclers", &setting);
1284 
1285  if (setting < 1 || setting > 1024) {
1287  "invalid flow.recyclers setting %"PRIdMAX, setting);
1288  }
1289  flowrec_number = (uint32_t)setting;
1290 
1291  SCLogConfig("using %u flow recycler threads", flowrec_number);
1292 
1293  for (uint32_t u = 0; u < flowrec_number; u++) {
1294  char name[TM_THREAD_NAME_MAX];
1295  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
1296 
1297  ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name,
1298  "FlowRecycler", 0);
1299 
1300  if (tv_flowrec == NULL) {
1301  FatalError(SC_ERR_FATAL, "flow recycler thread creation failed");
1302  }
1303  if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) {
1304  FatalError(SC_ERR_FATAL, "flow recycler thread spawn failed");
1305  }
1306  }
1307  return;
1308 }
1309 
1310 /**
1311  * \brief Used to disable flow recycler thread(s).
1312  *
1313  * \note this should only be called when the flow manager is already gone
1314  *
1315  * \todo Kinda hackish since it uses the tv name to identify flow recycler
1316  * thread. We need an all weather identification scheme.
1317  */
1319 {
1320  int cnt = 0;
1321 
1322  /* move all flows still in the hash to the recycler queue */
1323 #ifndef DEBUG
1324  (void)FlowCleanupHash();
1325 #else
1326  uint32_t flows = FlowCleanupHash();
1327  SCLogDebug("flows to progress: %u", flows);
1328 #endif
1329 
1330  /* make sure all flows are processed */
1331  do {
1332  usleep(10);
1333  } while (FlowRecyclerReadyToShutdown() == false);
1334 
1336  /* flow recycler thread(s) is/are a part of mgmt threads */
1337  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1338  if (strncasecmp(tv->name, thread_name_flow_rec,
1339  strlen(thread_name_flow_rec)) == 0)
1340  {
1342  cnt++;
1343  }
1344  }
1346 
1347  struct timeval start_ts;
1348  struct timeval cur_ts;
1349  gettimeofday(&start_ts, NULL);
1350 
1351 again:
1352  gettimeofday(&cur_ts, NULL);
1353  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1354  FatalError(SC_ERR_SHUTDOWN, "unable to get all flow recycler "
1355  "threads to shutdown in time");
1356  }
1357 
1359  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1360  if (strncasecmp(tv->name, thread_name_flow_rec,
1361  strlen(thread_name_flow_rec)) == 0)
1362  {
1365  /* sleep outside lock */
1366  SleepMsec(1);
1367  goto again;
1368  }
1369  }
1370  }
1372 
1373  /* reset count, so we can kill and respawn (unix socket) */
1374  SC_ATOMIC_SET(flowrec_cnt, 0);
1375  return;
1376 }
1377 
1379 {
1380  tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1381  tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1382  tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1383  tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1386  SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1387 
1388  SC_ATOMIC_INIT(flowmgr_cnt);
1389  SC_ATOMIC_INITPTR(flow_timeouts);
1390 }
1391 
1393 {
1394  tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1395  tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1396  tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1397  tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1400  SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1401 
1402  SC_ATOMIC_INIT(flowrec_cnt);
1403  SC_ATOMIC_INIT(flowrec_busy);
1404 }
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:67
FlowTimeoutCounters_::rows_empty
uint32_t rows_empty
Definition: flow-manager.c:118
FlowSparePoolUpdate
void FlowSparePoolUpdate(uint32_t size)
Definition: flow-spare-pool.c:172
util-byte.h
tm-threads.h
ConfGetInt
int ConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition: conf.c:436
OutputFlowLog
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition: output-flow.c:91
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:54
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1720
app-layer-htp-range.h
FlowTimeoutCounters
Definition: flow-worker.c:54
FlowForceReassemblyForFlow
void FlowForceReassemblyForFlow(Flow *f)
Definition: flow-timeout.c:349
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1137
run_mode
int run_mode
Definition: suricata.c:199
FlowBucket_::evicted
Flow * evicted
Definition: flow-hash.h:46
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:169
FLOW_IS_IPV6
#define FLOW_IS_IPV6(f)
Definition: flow.h:160
FlowManagerThreadData_::max
uint32_t max
Definition: flow-manager.c:695
FlowManagerTimeoutThread
Definition: flow-manager.c:322
FlowSparePoolReturnFlow
void FlowSparePoolReturnFlow(Flow *f)
Definition: flow-spare-pool.c:101
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:67
FlowSpareGetPoolSize
uint32_t FlowSpareGetPoolSize(void)
Definition: flow-spare-pool.c:48
flow-util.h
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:315
FlowManagerThreadData_::cnt
FlowCounters cnt
Definition: flow-manager.c:697
FBLOCK_LOCK
#define FBLOCK_LOCK(fb)
Definition: flow-hash.h:71
TMM_FLOWRECYCLER
@ TMM_FLOWRECYCLER
Definition: tm-threads-common.h:69
stream-tcp.h
GetFlowBypassInfoID
FlowStorageId GetFlowBypassInfoID(void)
Definition: flow-util.c:220
FlowBypassInfo_
Definition: flow.h:529
FlowCnf_::emergency_recovery
uint32_t emergency_recovery
Definition: flow.h:300
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:387
FlowCnf_::hash_size
uint32_t hash_size
Definition: flow.h:291
FlowManagerThreadData_::instance
uint32_t instance
Definition: flow-manager.c:693
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:298
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:97
StatsRegisterGlobalCounter
uint16_t StatsRegisterGlobalCounter(const char *name, uint64_t(*Func)(void))
Registers a counter, which represents a global value.
Definition: counters.c:1000
Flow_::proto
uint8_t proto
Definition: flow.h:375
SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt)
threads.h
flow-private.h
FlowCounters_::flow_mgr_flows_notimeout
uint16_t flow_mgr_flows_notimeout
Definition: flow-manager.c:679
Flow_
Flow data structure.
Definition: flow.h:353
TYPE
#define TYPE
Flow_::protomap
uint8_t protomap
Definition: flow.h:455
SC_ATOMIC_EXTERN
SC_ATOMIC_EXTERN(unsigned int, flow_flags)
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:333
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:68
FlowProtoTimeout_
Definition: flow.h:518
StatsSetUI64
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:191
flow-hash.h
TmModuleFlowRecyclerRegister
void TmModuleFlowRecyclerRegister(void)
Definition: flow-manager.c:1392
FlowTimeoutCounters_::flows_removed
uint32_t flows_removed
Definition: flow-manager.c:126
Flow_::use_cnt
FlowRefCount use_cnt
Definition: flow.h:383
FlowBypassInfo_::tosrcbytecnt
uint64_t tosrcbytecnt
Definition: flow.h:534
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
FlowGetMemuse
uint64_t FlowGetMemuse(void)
Definition: flow.c:136
MIN
#define MIN(x, y)
Definition: suricata-common.h:368
FBLOCK_TRYLOCK
#define FBLOCK_TRYLOCK(fb)
Definition: flow-hash.h:72
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:78
FlowTimeoutCounters_::flows_timeout
uint32_t flows_timeout
Definition: flow-manager.c:124
util-privs.h
defrag-timeout.h
stream-tcp-reassemble.h
StatsSyncCountersIfSignalled
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:137
FlowCounters_::flow_mgr_flows_timeout_inuse
uint16_t flow_mgr_flows_timeout_inuse
Definition: flow-manager.c:681
m
SCMutex m
Definition: flow-hash.h:6
LiveDevSubBypassStats
void LiveDevSubBypassStats(LiveDevice *dev, uint64_t cnt, int family)
Definition: util-device.c:561
SC_ERR_SHUTDOWN
@ SC_ERR_SHUTDOWN
Definition: util-error.h:220
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:81
FlowQueuePrivate_::len
uint32_t len
Definition: flow-queue.h:44
FlowCounters_
Definition: flow-manager.c:668
FlowManagerTimeoutThread
struct FlowManagerTimeoutThread FlowManagerTimeoutThread
SC_ERR_INVALID_ARGUMENTS
@ SC_ERR_INVALID_ARGUMENTS
Definition: util-error.h:82
FlowCounters_::flow_mgr_flows_aside
uint16_t flow_mgr_flows_aside
Definition: flow-manager.c:682
FLOW_TIMEOUT_REASSEMBLY_DONE
#define FLOW_TIMEOUT_REASSEMBLY_DONE
Definition: flow.h:95
util-unittest.h
FlowTimeoutCounters_::bypassed_count
uint32_t bypassed_count
Definition: flow-manager.c:130
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:39
TM_THREAD_NAME_MAX
#define TM_THREAD_NAME_MAX
Definition: tm-threads.h:48
util-unittest-helper.h
FlowCnf_::prealloc
uint32_t prealloc
Definition: flow.h:293
FlowQueueTimeoutCounters::flows_removed
uint32_t flows_removed
Definition: flow-manager.c:662
FLOWLOCK_UNLOCK
#define FLOWLOCK_UNLOCK(fb)
Definition: flow.h:270
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:80
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:302
FlowCounters_::flow_mgr_cnt_new
uint16_t flow_mgr_cnt_new
Definition: flow-manager.c:671
Flow_::flow_state
FlowStateType flow_state
Definition: flow.h:422
FQLOCK_LOCK
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:73
FlowDisableFlowRecyclerThread
void FlowDisableFlowRecyclerThread(void)
Used to disable flow recycler thread(s).
Definition: flow-manager.c:1318
FlowCounters_::flow_mgr_cnt_byp
uint16_t flow_mgr_cnt_byp
Definition: flow-manager.c:673
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
FlowTimeoutCounters_::rows_maxlen
uint32_t rows_maxlen
Definition: flow-manager.c:120
FlowCounters_::flow_bypassed_bytes
uint16_t flow_bypassed_bytes
Definition: flow-manager.c:689
FlowTimeoutCounters_::flows_checked
uint32_t flows_checked
Definition: flow-manager.c:122
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:47
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:105
util-signal.h
SC_ERR_THREAD_INIT
@ SC_ERR_THREAD_INIT
Definition: util-error.h:79
flow-spare-pool.h
FlowTimeoutCounters_::byp
uint32_t byp
Definition: flow-manager.c:114
Flow_::fb
struct FlowBucket_ * fb
Definition: flow.h:498
FlowCounters_::flow_mgr_full_pass
uint16_t flow_mgr_full_pass
Definition: flow-manager.c:669
StatsRegisterMaxCounter
uint16_t StatsRegisterMaxCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the maximum of all the values assigned to it.
Definition: counters.c:982
SC_ATOMIC_MEMORY_ORDER_RELAXED
#define SC_ATOMIC_MEMORY_ORDER_RELAXED
Definition: util-atomic.h:166
FLOW_STATE_LOCAL_BYPASSED
@ FLOW_STATE_LOCAL_BYPASSED
Definition: flow.h:512
decode.h
util-debug.h
FlowBypassInfo_::todstbytecnt
uint64_t todstbytecnt
Definition: flow.h:536
DefragTimeoutHash
uint32_t DefragTimeoutHash(struct timeval *ts)
time out tracker from the hash
Definition: defrag-timeout.c:119
FlowBypassInfo_::BypassUpdate
bool(* BypassUpdate)(Flow *f, void *data, time_t tsec)
Definition: flow.h:530
OutputFlowLogThreadInit
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void *initdata, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
Definition: output-flow.c:131
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
FlowCounters_::flow_mgr_rows_maxlen
uint16_t flow_mgr_rows_maxlen
Definition: flow-manager.c:685
FLOWLOCK_WRLOCK
#define FLOWLOCK_WRLOCK(fb)
Definition: flow.h:267
FlowTimeoutsReset
#define FlowTimeoutsReset()
Definition: flow-manager.h:27
FlowForceReassemblyNeedReassembly
int FlowForceReassemblyNeedReassembly(Flow *f)
Check if a flow needs forced reassembly, or any other processing.
Definition: flow-timeout.c:295
FlowDisableFlowManagerThread
void FlowDisableFlowManagerThread(void)
Used to disable flow manager thread(s).
Definition: flow-manager.c:141
detect.h
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1801
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
TimeModeIsReady
bool TimeModeIsReady(void)
Definition: util-time.c:90
Flow_::flow_end_flags
uint8_t flow_end_flags
Definition: flow.h:457
THV_KILL
#define THV_KILL
Definition: threadvars.h:41
FlowBypassInfo_::todstpktcnt
uint64_t todstpktcnt
Definition: flow.h:535
util-time.h
FlowQueuePrivateGetFromTop
Flow * FlowQueuePrivateGetFromTop(FlowQueuePrivate *fqc)
Definition: flow-queue.c:152
FlowBypassInfo_::bypass_data
void * bypass_data
Definition: flow.h:532
FlowQueueAppendPrivate
void FlowQueueAppendPrivate(FlowQueue *fq, FlowQueuePrivate *fqc)
Definition: flow-queue.c:120
FlowQueueTimeoutCounters
struct FlowQueueTimeoutCounters FlowQueueTimeoutCounters
FlowCounters_::flow_mgr_flows_aside_needs_work
uint16_t flow_mgr_flows_aside_needs_work
Definition: flow-manager.c:683
FLOW_PROTO_MAX
@ FLOW_PROTO_MAX
Definition: flow-private.h:76
app-layer-parser.h
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:123
FlowRecyclerThreadData_
Definition: flow-manager.c:1109
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:277
FLOW_IS_IPV4
#define FLOW_IS_IPV4(f)
Definition: flow.h:158
FlowCounters_::flow_bypassed_cnt_clo
uint16_t flow_bypassed_cnt_clo
Definition: flow-manager.c:687
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:81
SC_ATOMIC_SUB
#define SC_ATOMIC_SUB(name, val)
sub a value from our atomic variable
Definition: util-atomic.h:342
flow_timeouts_emerg
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
Definition: flow.c:94
stream.h
FlowTimeoutsEmergency
void FlowTimeoutsEmergency(void)
Definition: flow-manager.c:97
FlowTimeoutCounters
struct FlowTimeoutCounters_ FlowTimeoutCounters
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:33
FlowManagerThreadSpawn
void FlowManagerThreadSpawn()
spawn the flow manager thread
Definition: flow-manager.c:1077
stream-tcp-private.h
conf.h
FlowRecyclerThreadData_::output_thread_data
void * output_thread_data
Definition: flow-manager.c:1110
FlowTimeoutCounters_::rows_checked
uint32_t rows_checked
Definition: flow-manager.c:116
FBLOCK_UNLOCK
#define FBLOCK_UNLOCK(fb)
Definition: flow-hash.h:73
TmEcode
TmEcode
Definition: tm-threads-common.h:79
FlowClearMemory
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition: flow.c:1067
flow_timeouts_delta
FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]
Definition: flow.c:95
output-flow.h
detect-engine-state.h
Data structures and function prototypes for keeping state for the detection engine.
flow-timeout.h
g_detect_disabled
int g_detect_disabled
Definition: suricata.c:213
FlowCounters_::flow_mgr_cnt_est
uint16_t flow_mgr_cnt_est
Definition: flow-manager.c:672
flow-queue.h
TmModule_::name
const char * name
Definition: tm-modules.h:44
FlowRecyclerThreadData
struct FlowRecyclerThreadData_ FlowRecyclerThreadData
FlowGetStorageById
void * FlowGetStorageById(Flow *f, FlowStorageId id)
Definition: flow-storage.c:39
FlowBypassInfo_::tosrcpktcnt
uint64_t tosrcpktcnt
Definition: flow.h:533
host-timeout.h
FlowRecyclerThreadSpawn
void FlowRecyclerThreadSpawn()
spawn the flow recycler thread
Definition: flow-manager.c:1280
runmodes.h
FlowTimeoutCounters_::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-manager.c:128
HttpRangeContainersTimeoutHash
uint32_t HttpRangeContainersTimeoutHash(struct timeval *ts)
Definition: app-layer-htp-range.c:189
FLOW_PROTO_TCP
@ FLOW_PROTO_TCP
Definition: flow-private.h:70
Flow_::next
struct Flow_ * next
Definition: flow.h:404
FlowQueue_
Definition: flow-queue.h:49
FlowCounters_::flow_bypassed_pkts
uint16_t flow_bypassed_pkts
Definition: flow-manager.c:688
TMM_FLOWMANAGER
@ TMM_FLOWMANAGER
Definition: tm-threads-common.h:68
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:40
HostTimeoutHash
uint32_t HostTimeoutHash(struct timeval *ts)
time out hosts from the hash
Definition: host-timeout.c:156
flow_hash
FlowBucket * flow_hash
Definition: flow-hash.c:56
FlowQueueExtractPrivate
FlowQueuePrivate FlowQueueExtractPrivate(FlowQueue *fq)
Definition: flow-queue.c:141
flow-storage.h
FlowTimeoutCounters_::rows_busy
uint32_t rows_busy
Definition: flow-manager.c:119
FlowTimeoutCounters_
Definition: flow-manager.c:110
FlowManagerThreadData_
Definition: flow-manager.c:692
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:44
flow-manager.h
suricata-common.h
FlowManagerThreadData
struct FlowManagerThreadData_ FlowManagerThreadData
flow_config
FlowConfig flow_config
Definition: flow.c:98
FlowQueueTimeoutCounters::flows_timeout
uint32_t flows_timeout
Definition: flow-manager.c:663
TmModuleFlowManagerRegister
void TmModuleFlowManagerRegister(void)
Definition: flow-manager.c:1378
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:224
FlowTimeoutsInit
void FlowTimeoutsInit(void)
Definition: flow-manager.c:92
SC_ATOMIC_LOAD_EXPLICIT
#define SC_ATOMIC_LOAD_EXPLICIT(name, order)
Definition: util-atomic.h:379
SCLogError
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:257
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
FLOW_PROTO_UDP
@ FLOW_PROTO_UDP
Definition: flow-private.h:71
FatalError
#define FatalError(x,...)
Definition: util-debug.h:532
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:29
FlowTimeoutCounters_::est
uint32_t est
Definition: flow-manager.c:112
Flow_::livedev
struct LiveDevice_ * livedev
Definition: flow.h:406
threadvars.h
util-validate.h
FlowQueuePrivate_
Definition: flow-queue.h:41
StatsAddUI64
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:148
SCLogConfig
struct SCLogConfig_ SCLogConfig
Holds the config state used by the logging api.
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:88
FlowTimeoutCounters_::bypassed_bytes
uint64_t bypassed_bytes
Definition: flow-manager.c:132
FlowManagerThreadData_::min
uint32_t min
Definition: flow-manager.c:694
FlowCounters_::flow_mgr_cnt_clo
uint16_t flow_mgr_cnt_clo
Definition: flow-manager.c:670
OutputFlowLogThreadDeinit
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
Definition: output-flow.c:173
flow_recycle_q
FlowQueue flow_recycle_q
Definition: flow-manager.c:78
SCFree
#define SCFree(p)
Definition: util-mem.h:61
FlowTimeoutCounters_::bypassed_pkts
uint64_t bypassed_pkts
Definition: flow-manager.c:131
SC_ERR_FATAL
@ SC_ERR_FATAL
Definition: util-error.h:203
Flow_::flags
uint32_t flags
Definition: flow.h:431
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:318
FlowManagerThreadData_::timeout
FlowManagerTimeoutThread timeout
Definition: flow-manager.c:699
util-random.h
FlowCounters
struct FlowCounters_ FlowCounters
FlowCounters_::flow_emerg_mode_enter
uint16_t flow_emerg_mode_enter
Definition: flow-manager.c:675
TimeGet
void TimeGet(struct timeval *tv)
Definition: util-time.c:153
FLOW_END_FLAG_SHUTDOWN
#define FLOW_END_FLAG_SHUTDOWN
Definition: flow.h:242
FLOW_EMERGENCY
#define FLOW_EMERGENCY
Definition: flow-private.h:37
IPPairTimeoutHash
uint32_t IPPairTimeoutHash(struct timeval *ts)
time out ippairs from the hash
Definition: ippair-timeout.c:142
suricata.h
ippair-timeout.h
FlowCounters_::flow_mgr_flows_timeout
uint16_t flow_mgr_flows_timeout
Definition: flow-manager.c:680
FlowManagerTimeoutThread::aside_queue
FlowQueuePrivate aside_queue
Definition: flow-manager.c:325
FLOW_END_FLAG_TIMEOUT
#define FLOW_END_FLAG_TIMEOUT
Definition: flow.h:240
StatsSyncCounters
#define StatsSyncCounters(tv)
Definition: counters.h:134
FlowTimeoutCounters_::flows_notimeout
uint32_t flows_notimeout
Definition: flow-manager.c:123
flow_timeouts_normal
FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX]
Definition: flow.c:93
FlowCounters_::flow_mgr_spare
uint16_t flow_mgr_spare
Definition: flow-manager.c:674
FlowTimeoutCounters_::flows_aside
uint32_t flows_aside
Definition: flow-manager.c:127
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:376
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:335
FlowTimeoutCounters_::clo
uint32_t clo
Definition: flow-manager.c:113
flow.h
FlowQueuePrivateAppendFlow
void FlowQueuePrivateAppendFlow(FlowQueuePrivate *fqc, Flow *f)
Definition: flow-queue.c:66
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:89
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:232
evicted
Flow * evicted
Definition: flow-hash.h:4
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:942
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
BITS
#define BITS
FlowTimeoutCounters_::flows_timeout_inuse
uint32_t flows_timeout_inuse
Definition: flow-manager.c:125
Flow_::timeout_at
uint32_t timeout_at
Definition: flow.h:399
flow-var.h
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:111
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:70
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:360
FlowCounters_::flow_emerg_mode_over
uint16_t flow_emerg_mode_over
Definition: flow-manager.c:676
TM_FLAG_MANAGEMENT_TM
#define TM_FLAG_MANAGEMENT_TM
Definition: tm-modules.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:196
FlowTimeoutCounters_::rows_skipped
uint32_t rows_skipped
Definition: flow-manager.c:117
FlowCounters_::flow_mgr_flows_checked
uint16_t flow_mgr_flows_checked
Definition: flow-manager.c:678
FlowQueueTimeoutCounters
Definition: flow-manager.c:661
FlowTimeoutCounters::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-worker.c:55
FQLOCK_UNLOCK
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:75