suricata
flow-manager.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2022 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22  * \author Victor Julien <victor@inliniac.net>
23  */
24 
25 #include "suricata-common.h"
26 #include "suricata.h"
27 #include "decode.h"
28 #include "conf.h"
29 #include "threadvars.h"
30 #include "tm-threads.h"
31 #include "runmodes.h"
32 
33 #include "util-random.h"
34 #include "util-time.h"
35 
36 #include "flow.h"
37 #include "flow-queue.h"
38 #include "flow-hash.h"
39 #include "flow-util.h"
40 #include "flow-var.h"
41 #include "flow-private.h"
42 #include "flow-timeout.h"
43 #include "flow-manager.h"
44 #include "flow-storage.h"
45 #include "flow-spare-pool.h"
46 
47 #include "stream-tcp-private.h"
48 #include "stream-tcp-reassemble.h"
49 #include "stream-tcp.h"
50 
51 #include "util-unittest.h"
52 #include "util-unittest-helper.h"
53 #include "util-byte.h"
54 #include "util-device.h"
55 
56 #include "util-debug.h"
57 #include "util-privs.h"
58 #include "util-signal.h"
59 
60 #include "threads.h"
61 #include "detect.h"
62 #include "detect-engine-state.h"
63 #include "stream.h"
64 
65 #include "app-layer-parser.h"
66 
67 #include "host-timeout.h"
68 #include "defrag-timeout.h"
69 #include "ippair-timeout.h"
70 #include "app-layer-htp-range.h"
71 
72 #include "output-flow.h"
73 #include "util-validate.h"
74 
75 #include "runmode-unix-socket.h"
76 
77 /* Run mode selected at suricata.c */
78 extern int run_mode;
79 
80 /** queue to pass flows to cleanup/log thread(s) */
82 
83 /* multi flow mananger support */
84 static uint32_t flowmgr_number = 1;
85 /* atomic counter for flow managers, to assign instance id */
86 SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
87 
88 /* multi flow recycler support */
89 static uint32_t flowrec_number = 1;
90 /* atomic counter for flow recyclers, to assign instance id */
91 SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
92 SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
93 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
94 
99 
101 {
102  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
103 }
104 
106 {
107  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
108 }
109 
110 /* 1 seconds */
111 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
112 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
113 /* 0.3 seconds */
114 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
115 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
116 #define NEW_FLOW_COUNT_COND 10
117 
118 typedef struct FlowTimeoutCounters_ {
119  uint32_t rows_checked;
120  uint32_t rows_skipped;
121  uint32_t rows_empty;
122  uint32_t rows_maxlen;
123 
124  uint32_t flows_checked;
125  uint32_t flows_notimeout;
126  uint32_t flows_timeout;
127  uint32_t flows_removed;
128  uint32_t flows_aside;
130 
131  uint32_t bypassed_count;
132  uint64_t bypassed_pkts;
133  uint64_t bypassed_bytes;
135 
136 /**
137  * \brief Used to disable flow manager thread(s).
138  *
139  * \todo Kinda hackish since it uses the tv name to identify flow manager
140  * thread. We need an all weather identification scheme.
141  */
143 {
145  /* flow manager thread(s) is/are a part of mgmt threads */
146  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
147  if (strncasecmp(tv->name, thread_name_flow_mgr,
148  strlen(thread_name_flow_mgr)) == 0)
149  {
151  }
152  }
154 
155  struct timeval start_ts;
156  struct timeval cur_ts;
157  gettimeofday(&start_ts, NULL);
158 
159 again:
160  gettimeofday(&cur_ts, NULL);
161  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
162  FatalError("unable to get all flow manager "
163  "threads to shutdown in time");
164  }
165 
167  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
168  if (strncasecmp(tv->name, thread_name_flow_mgr,
169  strlen(thread_name_flow_mgr)) == 0)
170  {
173  /* sleep outside lock */
174  SleepMsec(1);
175  goto again;
176  }
177  }
178  }
180 
181  /* reset count, so we can kill and respawn (unix socket) */
182  SC_ATOMIC_SET(flowmgr_cnt, 0);
183  return;
184 }
185 
186 /** \internal
187  * \brief check if a flow is timed out
188  *
189  * \param f flow
190  * \param ts timestamp
191  *
192  * \retval 0 not timed out
193  * \retval 1 timed out
194  */
195 static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
196 {
197  uint32_t flow_times_out_at = f->timeout_at;
198  if (emerg) {
200  flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
201  }
202  if (*next_ts == 0 || flow_times_out_at < *next_ts)
203  *next_ts = flow_times_out_at;
204 
205  /* do the timeout check */
206  if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) {
207  return 0;
208  }
209 
210  return 1;
211 }
212 
213 /** \internal
214  * \brief check timeout of captured bypassed flow by querying capture method
215  *
216  * \param f Flow
217  * \param ts timestamp
218  * \param counters Flow timeout counters
219  *
220  * \retval 0 not timeout
221  * \retval 1 timeout (or not capture bypassed)
222  */
223 static inline int FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
224 {
225 #ifdef CAPTURE_OFFLOAD
226  if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
227  return 1;
228  }
229 
231  if (fc && fc->BypassUpdate) {
232  /* flow will be possibly updated */
233  uint64_t pkts_tosrc = fc->tosrcpktcnt;
234  uint64_t bytes_tosrc = fc->tosrcbytecnt;
235  uint64_t pkts_todst = fc->todstpktcnt;
236  uint64_t bytes_todst = fc->todstbytecnt;
237  bool update = fc->BypassUpdate(f, fc->bypass_data, SCTIME_SECS(ts));
238  if (update) {
239  SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f));
240  pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
241  bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
242  pkts_todst = fc->todstpktcnt - pkts_todst;
243  bytes_todst = fc->todstbytecnt - bytes_todst;
244  if (f->livedev) {
245  SC_ATOMIC_ADD(f->livedev->bypassed,
246  pkts_tosrc + pkts_todst);
247  }
248  counters->bypassed_pkts += pkts_tosrc + pkts_todst;
249  counters->bypassed_bytes += bytes_tosrc + bytes_todst;
250  return 0;
251  } else {
252  SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
253  if (f->livedev) {
254  if (FLOW_IS_IPV4(f)) {
255  LiveDevSubBypassStats(f->livedev, 1, AF_INET);
256  } else if (FLOW_IS_IPV6(f)) {
257  LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
258  }
259  }
260  counters->bypassed_count++;
261  return 1;
262  }
263  }
264 #endif /* CAPTURE_OFFLOAD */
265  return 1;
266 }
267 
268 typedef struct FlowManagerTimeoutThread {
269  /* used to temporarily store flows that have timed out and are
270  * removed from the hash */
273 
274 static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
275 {
276  FlowQueuePrivate recycle = { NULL, NULL, 0 };
277  counters->flows_aside += td->aside_queue.len;
278 
279  uint32_t cnt = 0;
280  Flow *f;
281  while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
282  /* flow is still locked */
283 
284  if (f->proto == IPPROTO_TCP && !(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
285  !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1) {
286  /* Send the flow to its thread */
288  FLOWLOCK_UNLOCK(f);
289  /* flow ownership is passed to the worker thread */
290 
291  counters->flows_aside_needs_work++;
292  continue;
293  }
294  FLOWLOCK_UNLOCK(f);
295 
296  FlowQueuePrivateAppendFlow(&recycle, f);
297  if (recycle.len == 100) {
300  }
301  cnt++;
302  }
303  if (recycle.len) {
306  }
307  return cnt;
308 }
309 
310 /**
311  * \internal
312  *
313  * \brief check all flows in a hash row for timing out
314  *
315  * \param f last flow in the hash row
316  * \param ts timestamp
317  * \param emergency bool indicating emergency mode
318  * \param counters ptr to FlowTimeoutCounters structure
319  */
320 static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts,
321  int emergency, FlowTimeoutCounters *counters, uint32_t *next_ts)
322 {
323  uint32_t checked = 0;
324  Flow *prev_f = NULL;
325 
326  do {
327  checked++;
328 
329  /* check flow timeout based on lastts and state. Both can be
330  * accessed w/o Flow lock as we do have the hash row lock (so flow
331  * can't disappear) and flow_state is atomic. lastts can only
332  * be modified when we have both the flow and hash row lock */
333 
334  /* timeout logic goes here */
335  if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {
336 
337  counters->flows_notimeout++;
338 
339  prev_f = f;
340  f = f->next;
341  continue;
342  }
343 
344  FLOWLOCK_WRLOCK(f);
345 
346  Flow *next_flow = f->next;
347 
348  /* never prune a flow that is used by a packet we
349  * are currently processing in one of the threads */
350  if (!FlowBypassedTimeout(f, ts, counters)) {
351  FLOWLOCK_UNLOCK(f);
352  prev_f = f;
353  f = f->next;
354  continue;
355  }
356 
358 
359  counters->flows_timeout++;
360 
361  RemoveFromHash(f, prev_f);
362 
364  /* flow is still locked in the queue */
365 
366  f = next_flow;
367  } while (f != NULL);
368 
369  counters->flows_checked += checked;
370  if (checked > counters->rows_maxlen)
371  counters->rows_maxlen = checked;
372 }
373 
374 static void FlowManagerHashRowClearEvictedList(
376 {
377  do {
378  FLOWLOCK_WRLOCK(f);
379  Flow *next_flow = f->next;
380  f->next = NULL;
381  f->fb = NULL;
382 
384  /* flow is still locked in the queue */
385 
386  f = next_flow;
387  } while (f != NULL);
388 }
389 
390 /**
391  * \brief time out flows from the hash
392  *
393  * \param ts timestamp
394  * \param hash_min min hash index to consider
395  * \param hash_max max hash index to consider
396  * \param counters ptr to FlowTimeoutCounters structure
397  *
398  * \retval cnt number of timed out flow
399  */
400 static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const uint32_t hash_min,
401  const uint32_t hash_max, FlowTimeoutCounters *counters)
402 {
403  uint32_t cnt = 0;
404  const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
405  const uint32_t rows_checked = hash_max - hash_min;
406  uint32_t rows_skipped = 0;
407  uint32_t rows_empty = 0;
408 
409 #if __WORDSIZE==64
410 #define BITS 64
411 #define TYPE uint64_t
412 #else
413 #define BITS 32
414 #define TYPE uint32_t
415 #endif
416 
417  const uint32_t ts_secs = SCTIME_SECS(ts);
418  for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
419  TYPE check_bits = 0;
420  const uint32_t check = MIN(BITS, (hash_max - idx));
421  for (uint32_t i = 0; i < check; i++) {
422  FlowBucket *fb = &flow_hash[idx+i];
423  check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(
424  fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs)
425  << (TYPE)i;
426  }
427  if (check_bits == 0)
428  continue;
429 
430  for (uint32_t i = 0; i < check; i++) {
431  FlowBucket *fb = &flow_hash[idx+i];
432  if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) {
433  FBLOCK_LOCK(fb);
434  Flow *evicted = NULL;
435  if (fb->evicted != NULL || fb->head != NULL) {
436  if (fb->evicted != NULL) {
437  /* transfer out of bucket so we can do additional work outside
438  * of the bucket lock */
439  evicted = fb->evicted;
440  fb->evicted = NULL;
441  }
442  if (fb->head != NULL) {
443  uint32_t next_ts = 0;
444  FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
445 
446  if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
447  SC_ATOMIC_SET(fb->next_ts, next_ts);
448  }
449  if (fb->evicted == NULL && fb->head == NULL) {
450  SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
451  }
452  } else {
453  SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
454  rows_empty++;
455  }
456  FBLOCK_UNLOCK(fb);
457  /* processed evicted list */
458  if (evicted) {
459  FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
460  }
461  } else {
462  rows_skipped++;
463  }
464  }
465  if (td->aside_queue.len) {
466  cnt += ProcessAsideQueue(td, counters);
467  }
468  }
469 
470  counters->rows_checked += rows_checked;
471  counters->rows_skipped += rows_skipped;
472  counters->rows_empty += rows_empty;
473 
474  if (td->aside_queue.len) {
475  cnt += ProcessAsideQueue(td, counters);
476  }
477  counters->flows_removed += cnt;
478  /* coverity[missing_unlock : FALSE] */
479  return cnt;
480 }
481 
482 /** \internal
483  * \brief handle timeout for a slice of hash rows
484  * If we wrap around we call FlowTimeoutHash twice */
485 static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts,
486  const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
487  const uint32_t rows, uint32_t *pos)
488 {
489  uint32_t start = 0;
490  uint32_t end = 0;
491  uint32_t cnt = 0;
492  uint32_t rows_left = rows;
493 
494 again:
495  start = hash_min + (*pos);
496  if (start >= hash_max) {
497  start = hash_min;
498  }
499  end = start + rows_left;
500  if (end > hash_max) {
501  end = hash_max;
502  }
503  *pos = (end == hash_max) ? hash_min : end;
504  rows_left = rows_left - (end - start);
505 
506  cnt += FlowTimeoutHash(td, ts, start, end, counters);
507  if (rows_left) {
508  goto again;
509  }
510  return cnt;
511 }
512 
513 /**
514  * \internal
515  *
516  * \brief move all flows out of a hash row
517  *
518  * \param f last flow in the hash row
519  *
520  * \retval cnt removed out flows
521  */
522 static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
523 {
524  uint32_t cnt = 0;
525 
526  do {
527  FLOWLOCK_WRLOCK(f);
528 
529  Flow *next_flow = f->next;
530 
531  /* remove from the hash */
532  if (mode == 0) {
533  RemoveFromHash(f, NULL);
534  } else {
535  FlowBucket *fb = f->fb;
536  fb->evicted = f->next;
537  f->next = NULL;
538  f->fb = NULL;
539  }
541 
542  /* no one is referring to this flow, removed from hash
543  * so we can unlock it and move it to the recycle queue. */
544  FLOWLOCK_UNLOCK(f);
545  FlowQueuePrivateAppendFlow(recycle_q, f);
546 
547  cnt++;
548 
549  f = next_flow;
550  } while (f != NULL);
551 
552  return cnt;
553 }
554 
555 /**
556  * \brief remove all flows from the hash
557  *
558  * \retval cnt number of removes out flows
559  */
560 static uint32_t FlowCleanupHash(void)
561 {
562  FlowQueuePrivate local_queue = { NULL, NULL, 0 };
563  uint32_t cnt = 0;
564 
565  for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
566  FlowBucket *fb = &flow_hash[idx];
567 
568  FBLOCK_LOCK(fb);
569 
570  if (fb->head != NULL) {
571  /* we have a flow, or more than one */
572  cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
573  }
574  if (fb->evicted != NULL) {
575  /* we have a flow, or more than one */
576  cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
577  }
578 
579  FBLOCK_UNLOCK(fb);
580  if (local_queue.len >= 25) {
581  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
583  }
584  }
585  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
587 
588  return cnt;
589 }
590 
591 typedef struct FlowQueueTimeoutCounters {
592  uint32_t flows_removed;
593  uint32_t flows_timeout;
595 
596 typedef struct FlowCounters_ {
599 
600  uint16_t flow_mgr_spare;
603 
609 
611 
615 
616  uint16_t memcap_pressure;
619 
620 typedef struct FlowManagerThreadData_ {
621  uint32_t instance;
622  uint32_t min;
623  uint32_t max;
624 
626 
629 
630 static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
631 {
632  fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t);
633  fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", t);
634 
635  fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
636  fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
637  fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
638 
639  fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t);
640  fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t);
641  fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t);
642  fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t);
643  fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t);
644  fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t);
645 
646  fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
647  fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
648  fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
649 
650  fc->memcap_pressure = StatsRegisterCounter("memcap_pressure", t);
651  fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap_pressure_max", t);
652 }
653 
654 static void FlowCountersUpdate(
655  ThreadVars *th_v, const FlowManagerThreadData *ftd, const FlowTimeoutCounters *counters)
656 {
657  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters->flows_checked);
658  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters->flows_notimeout);
659 
660  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters->flows_timeout);
661  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters->flows_aside);
663  (uint64_t)counters->flows_aside_needs_work);
664 
665  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters->bypassed_count);
666  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters->bypassed_pkts);
667  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters->bypassed_bytes);
668 
669  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters->rows_maxlen);
670 }
671 
672 static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
673 {
675  if (ftd == NULL)
676  return TM_ECODE_FAILED;
677 
678  ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
679  SCLogDebug("flow manager instance %u", ftd->instance);
680 
681  /* set the min and max value used for hash row walking
682  * each thread has it's own section of the flow hash */
683  uint32_t range = flow_config.hash_size / flowmgr_number;
684 
685  ftd->min = ftd->instance * range;
686  ftd->max = (ftd->instance + 1) * range;
687 
688  /* last flow-manager takes on hash_size % flowmgr_number extra rows */
689  if ((ftd->instance + 1) == flowmgr_number) {
690  ftd->max = flow_config.hash_size;
691  }
693 
694  SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
695 
696  /* pass thread data back to caller */
697  *data = ftd;
698 
699  FlowCountersInit(t, &ftd->cnt);
700 
701  PacketPoolInit();
702  return TM_ECODE_OK;
703 }
704 
705 static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
706 {
709  SCFree(data);
710  return TM_ECODE_OK;
711 }
712 
713 /** \internal
714  * \brief calculate number of rows to scan and how much time to sleep
715  * based on the busy score `mp` (0 idle, 100 max busy).
716  *
717  * We try to to make sure we scan the hash once a second. The number size
718  * of the slice of the hash scanned is determined by our busy score 'mp'.
719  * We sleep for the remainder of the second after processing the slice,
720  * or at least an approximation of it.
721  * A minimum busy score of 10 is assumed to avoid a longer than 10 second
722  * full hash pass. This is to avoid burstiness in scanning when there is
723  * a rapid increase of the busy score, which could lead to the flow manager
724  * suddenly scanning a much larger slice of the hash leading to a burst
725  * in scan/eviction work.
726  */
727 static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
728  uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
729 {
730  if (emergency) {
731  *wu_rows = rows;
732  *wu_sleep = 250;
733  return;
734  }
735  /* minimum busy score is 10 */
736  const uint32_t emp = MAX(mp, 10);
737  const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));
738  /* calc how much time we estimate the work will take, in ms. We assume
739  * each row takes an average of 1usec. Maxing out at 1sec. */
740  const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);
741  /* calc how much time we need to sleep to get to the per second cadence
742  * but sleeping for at least 250ms. */
743  const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
744  SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
745  sleep_per_unit);
746 
747  *wu_sleep = sleep_per_unit;
748  *wu_rows = rows_per_sec;
749  *rows_sec = rows_per_sec;
750 }
751 
752 /** \brief Thread that manages the flow table and times out flows.
753  *
754  * \param td ThreadVars casted to void ptr
755  *
756  * Keeps an eye on the spare list, alloc flows if needed...
757  */
758 static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
759 {
760  FlowManagerThreadData *ftd = thread_data;
761  const uint32_t rows = ftd->max - ftd->min;
762  const bool time_is_live = TimeModeIsLive();
763 
764  uint32_t emerg_over_cnt = 0;
765  uint64_t next_run_ms = 0;
766  uint32_t pos = 0;
767  uint32_t rows_sec = 0;
768  uint32_t rows_per_wu = 0;
769  uint64_t sleep_per_wu = 0;
770  bool emerg = false;
771  bool prev_emerg = false;
772  uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
773  SCTime_t ts;
774 
775  /* don't start our activities until time is setup */
776  while (!TimeModeIsReady()) {
777  if (suricata_ctl_flags != 0)
778  return TM_ECODE_OK;
779  usleep(10);
780  }
781 
782  uint32_t mp = MemcapsGetPressure() * 100;
783  if (ftd->instance == 0) {
784  StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
785  StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
786  }
787  GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
788  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
789 
791 
792  while (1)
793  {
794  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
798  }
799 
800  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
801  emerg = true;
802  }
803  /* Get the time */
804  ts = TimeGet();
805  SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts));
806  uint64_t ts_ms = SCTIME_MSECS(ts);
807  const bool emerge_p = (emerg && !prev_emerg);
808  if (emerge_p) {
809  next_run_ms = 0;
810  prev_emerg = true;
811  SCLogNotice("Flow emergency mode entered...");
812  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter);
813  }
814  if (ts_ms >= next_run_ms) {
815  if (ftd->instance == 0) {
816  const uint32_t sq_len = FlowSpareGetPoolSize();
817  const uint32_t spare_perc = sq_len * 100 / flow_config.prealloc;
818  /* see if we still have enough spare flows */
819  if (spare_perc < 90 || spare_perc > 110) {
820  FlowSparePoolUpdate(sq_len);
821  }
822  }
823 
824  /* try to time out flows */
825  // clang-format off
826  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, };
827  // clang-format on
828 
829  if (emerg) {
830  /* in emergency mode, do a full pass of the hash table */
831  FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters);
832  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
833  } else {
834  SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,
835  rows_per_wu);
836 
837  const uint32_t ppos = pos;
838  FlowTimeoutHashInChunks(
839  &ftd->timeout, ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos);
840  if (ppos > pos) {
841  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
842  }
843  }
844 
845  const uint32_t spare_pool_len = FlowSpareGetPoolSize();
846  StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len);
847 
848  FlowCountersUpdate(th_v, ftd, &counters);
849 
850  if (emerg == true) {
851  SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32
852  "flow_spare_q status: %" PRIu32 "%% flows at the queue",
853  spare_pool_len, flow_config.prealloc,
854  spare_pool_len * 100 / flow_config.prealloc);
855 
856  /* only if we have pruned this "emergency_recovery" percentage
857  * of flows, we will unset the emergency bit */
858  if (spare_pool_len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
859  emerg_over_cnt++;
860  } else {
861  emerg_over_cnt = 0;
862  }
863 
864  if (emerg_over_cnt >= 30) {
865  SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
867 
868  emerg = false;
869  prev_emerg = false;
870  emerg_over_cnt = 0;
871  SCLogNotice("Flow emergency mode over, back to normal... unsetting"
872  " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", "
873  "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32
874  "%% flows at the queue",
875  (uintmax_t)SCTIME_SECS(ts), (uintmax_t)SCTIME_USECS(ts),
876  spare_pool_len * 100 / flow_config.prealloc);
877 
878  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
879  }
880  }
881 
882  /* update work units */
883  const uint32_t pmp = mp;
884  mp = MemcapsGetPressure() * 100;
885  if (ftd->instance == 0) {
886  StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
887  StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
888  }
889  GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
890  if (pmp != mp) {
891  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
892  }
893 
894  next_run_ms = ts_ms + sleep_per_wu;
895  }
896  if (other_last_sec == 0 || other_last_sec < (uint32_t)SCTIME_SECS(ts)) {
897  if (ftd->instance == 0) {
902  other_last_sec = (uint32_t)SCTIME_SECS(ts);
903  }
904  }
905 
906  if (TmThreadsCheckFlag(th_v, THV_KILL)) {
907  StatsSyncCounters(th_v);
908  break;
909  }
910 
911  if (emerg || !time_is_live) {
912  usleep(250);
913  } else {
914  struct timeval cond_tv;
915  gettimeofday(&cond_tv, NULL);
916  struct timeval add_tv;
917  add_tv.tv_sec = 0;
918  add_tv.tv_usec = (sleep_per_wu * 1000);
919  timeradd(&cond_tv, &add_tv, &cond_tv);
920 
921  struct timespec cond_time = FROM_TIMEVAL(cond_tv);
923  while (1) {
924  int rc = SCCtrlCondTimedwait(
926  if (rc == ETIMEDOUT || rc < 0)
927  break;
928  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
929  break;
930  }
931  }
933  }
934 
935  SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
936 
938  }
939  return TM_ECODE_OK;
940 }
941 
942 /** \brief spawn the flow manager thread */
944 {
945  intmax_t setting = 1;
946  (void)ConfGetInt("flow.managers", &setting);
947 
948  if (setting < 1 || setting > 1024) {
949  FatalError("invalid flow.managers setting %" PRIdMAX, setting);
950  }
951  flowmgr_number = (uint32_t)setting;
952 
955 
956  SCLogConfig("using %u flow manager threads", flowmgr_number);
958 
959  for (uint32_t u = 0; u < flowmgr_number; u++) {
960  char name[TM_THREAD_NAME_MAX];
961  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
962 
963  ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
964  "FlowManager", 0);
965  BUG_ON(tv_flowmgr == NULL);
966 
967  if (tv_flowmgr == NULL) {
968  FatalError("flow manager thread creation failed");
969  }
970  if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
971  FatalError("flow manager thread spawn failed");
972  }
973  }
974  return;
975 }
976 
977 typedef struct FlowRecyclerThreadData_ {
979 
980  uint16_t counter_flows;
983 
988 
989 static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
990 {
992  if (ftd == NULL)
993  return TM_ECODE_FAILED;
994  if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
995  SCLogError("initializing flow log API for thread failed");
996  SCFree(ftd);
997  return TM_ECODE_FAILED;
998  }
999  SCLogDebug("output_thread_data %p", ftd->output_thread_data);
1000 
1001  ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t);
1002  ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t);
1003  ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t);
1004 
1005  ftd->counter_flow_active = StatsRegisterCounter("flow.active", t);
1006  ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t);
1007 
1008  FlowEndCountersRegister(t, &ftd->fec);
1009 
1010  *data = ftd;
1011  return TM_ECODE_OK;
1012 }
1013 
1014 static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
1015 {
1017 
1019  if (ftd->output_thread_data != NULL)
1021 
1022  SCFree(data);
1023  return TM_ECODE_OK;
1024 }
1025 
1026 static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
1027 {
1028  FLOWLOCK_WRLOCK(f);
1029 
1030  (void)OutputFlowLog(tv, ftd->output_thread_data, f);
1031 
1032  FlowEndCountersUpdate(tv, &ftd->fec, f);
1033  if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
1035  }
1037 
1038  FlowClearMemory(f, f->protomap);
1039  FLOWLOCK_UNLOCK(f);
1041 }
1042 
1043 /** \brief Thread that manages timed out flows.
1044  *
1045  * \param td ThreadVars casted to void ptr
1046  */
1047 static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
1048 {
1049  FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
1050  BUG_ON(ftd == NULL);
1051  const bool time_is_live = TimeModeIsLive();
1052  uint64_t recycled_cnt = 0;
1053 
1055 
1056  while (1)
1057  {
1058  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
1062  }
1063  SC_ATOMIC_ADD(flowrec_busy,1);
1065 
1066  StatsAddUI64(th_v, ftd->counter_queue_avg, list.len);
1067  StatsSetUI64(th_v, ftd->counter_queue_max, list.len);
1068 
1069  const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
1070 
1071  /* Get the time */
1072  SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(TimeGet()));
1073 
1074  Flow *f;
1075  while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
1076  Recycler(th_v, ftd, f);
1077  recycled_cnt++;
1078  StatsIncr(th_v, ftd->counter_flows);
1079  }
1080  SC_ATOMIC_SUB(flowrec_busy,1);
1081 
1082  if (bail) {
1083  break;
1084  }
1085 
1086  const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY);
1087  if (emerg || !time_is_live) {
1088  usleep(250);
1089  } else {
1090  struct timeval cond_tv;
1091  gettimeofday(&cond_tv, NULL);
1092  cond_tv.tv_sec += 1;
1093  struct timespec cond_time = FROM_TIMEVAL(cond_tv);
1095  while (1) {
1096  int rc = SCCtrlCondTimedwait(
1098  if (rc == ETIMEDOUT || rc < 0) {
1099  break;
1100  }
1101  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
1102  break;
1103  }
1104  if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) {
1105  break;
1106  }
1107  }
1109  }
1110 
1111  SCLogDebug("woke up...");
1112 
1114  }
1115  StatsSyncCounters(th_v);
1116  SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
1117  return TM_ECODE_OK;
1118 }
1119 
1120 static bool FlowRecyclerReadyToShutdown(void)
1121 {
1122  if (SC_ATOMIC_GET(flowrec_busy) != 0) {
1123  return false;
1124  }
1125  uint32_t len = 0;
1127  len = flow_recycle_q.qlen;
1129 
1130  return ((len == 0));
1131 }
1132 
1133 /** \brief spawn the flow recycler thread */
1135 {
1136  intmax_t setting = 1;
1137  (void)ConfGetInt("flow.recyclers", &setting);
1138 
1139  if (setting < 1 || setting > 1024) {
1140  FatalError("invalid flow.recyclers setting %" PRIdMAX, setting);
1141  }
1142  flowrec_number = (uint32_t)setting;
1143 
1146 
1147  SCLogConfig("using %u flow recycler threads", flowrec_number);
1148 
1149  for (uint32_t u = 0; u < flowrec_number; u++) {
1150  char name[TM_THREAD_NAME_MAX];
1151  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
1152 
1153  ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name,
1154  "FlowRecycler", 0);
1155 
1156  if (tv_flowrec == NULL) {
1157  FatalError("flow recycler thread creation failed");
1158  }
1159  if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) {
1160  FatalError("flow recycler thread spawn failed");
1161  }
1162  }
1163  return;
1164 }
1165 
1166 /**
1167  * \brief Used to disable flow recycler thread(s).
1168  *
1169  * \note this should only be called when the flow manager is already gone
1170  *
1171  * \todo Kinda hackish since it uses the tv name to identify flow recycler
1172  * thread. We need an all weather identification scheme.
1173  */
1175 {
1176  /* move all flows still in the hash to the recycler queue */
1177 #ifndef DEBUG
1178  (void)FlowCleanupHash();
1179 #else
1180  uint32_t flows = FlowCleanupHash();
1181  SCLogDebug("flows to progress: %u", flows);
1182 #endif
1183 
1184  /* make sure all flows are processed */
1185  do {
1187  usleep(10);
1188  } while (FlowRecyclerReadyToShutdown() == false);
1189 
1191  /* flow recycler thread(s) is/are a part of mgmt threads */
1192  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1193  if (strncasecmp(tv->name, thread_name_flow_rec,
1194  strlen(thread_name_flow_rec)) == 0)
1195  {
1197  }
1198  }
1200 
1201  struct timeval start_ts;
1202  struct timeval cur_ts;
1203  gettimeofday(&start_ts, NULL);
1204 
1205 again:
1206  gettimeofday(&cur_ts, NULL);
1207  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1208  FatalError("unable to get all flow recycler "
1209  "threads to shutdown in time");
1210  }
1211 
1213  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1214  if (strncasecmp(tv->name, thread_name_flow_rec,
1215  strlen(thread_name_flow_rec)) == 0)
1216  {
1220  /* sleep outside lock */
1221  SleepMsec(1);
1222  goto again;
1223  }
1224  }
1225  }
1227 
1228  /* reset count, so we can kill and respawn (unix socket) */
1229  SC_ATOMIC_SET(flowrec_cnt, 0);
1230  return;
1231 }
1232 
1234 {
1235  tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1236  tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1237  tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1238  tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1241  SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1242 
1243  SC_ATOMIC_INIT(flowmgr_cnt);
1244  SC_ATOMIC_INITPTR(flow_timeouts);
1245 }
1246 
1248 {
1249  tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1250  tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1251  tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1252  tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1255  SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1256 
1257  SC_ATOMIC_INIT(flowrec_cnt);
1258  SC_ATOMIC_INIT(flowrec_busy);
1259 }
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:67
FlowTimeoutCounters_::rows_empty
uint32_t rows_empty
Definition: flow-manager.c:121
FlowSparePoolUpdate
void FlowSparePoolUpdate(uint32_t size)
Definition: flow-spare-pool.c:171
FlowManagerThreadSpawn
void FlowManagerThreadSpawn(void)
spawn the flow manager thread
Definition: flow-manager.c:943
util-byte.h
tm-threads.h
ConfGetInt
int ConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition: conf.c:397
FROM_TIMEVAL
#define FROM_TIMEVAL(timev)
initialize a 'struct timespec' from a 'struct timeval'.
Definition: util-time.h:101
OutputFlowLog
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition: output-flow.c:86
flow_manager_ctrl_mutex
SCCtrlMutex flow_manager_ctrl_mutex
Definition: flow-manager.c:96
FlowRecyclerThreadData_::fec
FlowEndCounters fec
Definition: flow-manager.c:986
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1646
app-layer-htp-range.h
FlowTimeoutCounters
Definition: flow-worker.c:59
FlowForceReassemblyForFlow
void FlowForceReassemblyForFlow(Flow *f)
Definition: flow-timeout.c:349
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1089
run_mode
int run_mode
Definition: suricata.c:167
FlowBucket_::evicted
Flow * evicted
Definition: flow-hash.h:48
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:167
FLOW_IS_IPV6
#define FLOW_IS_IPV6(f)
Definition: flow.h:162
FlowManagerThreadData_::max
uint32_t max
Definition: flow-manager.c:623
FlowManagerTimeoutThread
Definition: flow-manager.c:268
FlowSparePoolReturnFlow
void FlowSparePoolReturnFlow(Flow *f)
Definition: flow-spare-pool.c:100
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:83
FlowSpareGetPoolSize
uint32_t FlowSpareGetPoolSize(void)
Definition: flow-spare-pool.c:47
flow-util.h
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:315
FlowManagerThreadData_::cnt
FlowCounters cnt
Definition: flow-manager.c:625
FBLOCK_LOCK
#define FBLOCK_LOCK(fb)
Definition: flow-hash.h:73
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:92
FlowWakeupFlowRecyclerThread
#define FlowWakeupFlowRecyclerThread()
Definition: flow-manager.h:33
TMM_FLOWRECYCLER
@ TMM_FLOWRECYCLER
Definition: tm-threads-common.h:73
stream-tcp.h
GetFlowBypassInfoID
FlowStorageId GetFlowBypassInfoID(void)
Definition: flow-util.c:218
FlowBypassInfo_
Definition: flow.h:530
FlowCnf_::emergency_recovery
uint32_t emergency_recovery
Definition: flow.h:302
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:387
FlowCnf_::hash_size
uint32_t hash_size
Definition: flow.h:293
FlowManagerThreadData_::instance
uint32_t instance
Definition: flow-manager.c:621
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
IPPairTimeoutHash
uint32_t IPPairTimeoutHash(SCTime_t ts)
time out ippairs from the hash
Definition: ippair-timeout.c:142
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:99
StatsRegisterGlobalCounter
uint16_t StatsRegisterGlobalCounter(const char *name, uint64_t(*Func)(void))
Registers a counter, which represents a global value.
Definition: counters.c:1013
Flow_::proto
uint8_t proto
Definition: flow.h:379
SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt)
flow_manager_ctrl_cond
SCCtrlCondT flow_manager_ctrl_cond
Definition: flow-manager.c:95
threads.h
flow-private.h
FlowCounters_::flow_mgr_flows_notimeout
uint16_t flow_mgr_flows_notimeout
Definition: flow-manager.c:605
Flow_
Flow data structure.
Definition: flow.h:357
TYPE
#define TYPE
flow_recycler_ctrl_cond
SCCtrlCondT flow_recycler_ctrl_cond
Definition: flow-manager.c:97
Flow_::protomap
uint8_t protomap
Definition: flow.h:451
SC_ATOMIC_EXTERN
SC_ATOMIC_EXTERN(unsigned int, flow_flags)
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:333
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:84
FlowProtoTimeout_
Definition: flow.h:519
StatsSetUI64
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:210
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
flow-hash.h
TmModuleFlowRecyclerRegister
void TmModuleFlowRecyclerRegister(void)
Definition: flow-manager.c:1247
FlowTimeoutCounters_::flows_removed
uint32_t flows_removed
Definition: flow-manager.c:127
FlowBypassInfo_::tosrcbytecnt
uint64_t tosrcbytecnt
Definition: flow.h:535
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
FlowGetMemuse
uint64_t FlowGetMemuse(void)
Definition: flow.c:137
MIN
#define MIN(x, y)
Definition: suricata-common.h:380
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:80
FlowTimeoutCounters_::flows_timeout
uint32_t flows_timeout
Definition: flow-manager.c:126
util-privs.h
defrag-timeout.h
FlowRecyclerThreadData_::counter_flows
uint16_t counter_flows
Definition: flow-manager.c:980
stream-tcp-reassemble.h
StatsSyncCountersIfSignalled
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:140
LiveDevSubBypassStats
void LiveDevSubBypassStats(LiveDevice *dev, uint64_t cnt, int family)
Definition: util-device.c:522
MAX
#define MAX(x, y)
Definition: suricata-common.h:384
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:85
FlowQueuePrivate_::len
uint32_t len
Definition: flow-queue.h:44
Flow_::protoctx
void * protoctx
Definition: flow.h:447
FlowCounters_
Definition: flow-manager.c:596
FlowManagerTimeoutThread
struct FlowManagerTimeoutThread FlowManagerTimeoutThread
FlowCounters_::flow_mgr_flows_aside
uint16_t flow_mgr_flows_aside
Definition: flow-manager.c:607
FLOW_TIMEOUT_REASSEMBLY_DONE
#define FLOW_TIMEOUT_REASSEMBLY_DONE
Definition: flow.h:94
runmode-unix-socket.h
util-unittest.h
FlowTimeoutCounters_::bypassed_count
uint32_t bypassed_count
Definition: flow-manager.c:131
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:37
TM_THREAD_NAME_MAX
#define TM_THREAD_NAME_MAX
Definition: tm-threads.h:49
util-unittest-helper.h
FlowCnf_::prealloc
uint32_t prealloc
Definition: flow.h:295
FlowQueueTimeoutCounters::flows_removed
uint32_t flows_removed
Definition: flow-manager.c:592
FLOWLOCK_UNLOCK
#define FLOWLOCK_UNLOCK(fb)
Definition: flow.h:272
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:84
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:284
Flow_::flow_state
FlowStateType flow_state
Definition: flow.h:418
FQLOCK_LOCK
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:73
FlowDisableFlowRecyclerThread
void FlowDisableFlowRecyclerThread(void)
Used to disable flow recycler thread(s).
Definition: flow-manager.c:1174
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
FlowTimeoutCounters_::rows_maxlen
uint32_t rows_maxlen
Definition: flow-manager.c:122
FlowCounters_::flow_bypassed_bytes
uint16_t flow_bypassed_bytes
Definition: flow-manager.c:614
FlowTimeoutCounters_::flows_checked
uint32_t flows_checked
Definition: flow-manager.c:124
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:45
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:107
util-signal.h
flow-spare-pool.h
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
StatsDecr
void StatsDecr(ThreadVars *tv, uint16_t id)
Decrements the local counter.
Definition: counters.c:188
Flow_::fb
struct FlowBucket_ * fb
Definition: flow.h:494
FlowCounters_::flow_mgr_full_pass
uint16_t flow_mgr_full_pass
Definition: flow-manager.c:597
StatsRegisterMaxCounter
uint16_t StatsRegisterMaxCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the maximum of all the values assigned to it.
Definition: counters.c:995
SC_ATOMIC_MEMORY_ORDER_RELAXED
#define SC_ATOMIC_MEMORY_ORDER_RELAXED
Definition: util-atomic.h:166
decode.h
util-device.h
util-debug.h
FlowBypassInfo_::todstbytecnt
uint64_t todstbytecnt
Definition: flow.h:537
FlowBypassInfo_::BypassUpdate
bool(* BypassUpdate)(Flow *f, void *data, time_t tsec)
Definition: flow.h:531
FlowRecyclerThreadData_::counter_queue_avg
uint16_t counter_queue_avg
Definition: flow-manager.c:981
OutputFlowLogThreadInit
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void *initdata, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
Definition: output-flow.c:124
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
FlowCounters_::flow_mgr_rows_maxlen
uint16_t flow_mgr_rows_maxlen
Definition: flow-manager.c:610
FLOWLOCK_WRLOCK
#define FLOWLOCK_WRLOCK(fb)
Definition: flow.h:269
FlowTimeoutsReset
#define FlowTimeoutsReset()
Definition: flow-manager.h:35
FlowForceReassemblyNeedReassembly
int FlowForceReassemblyNeedReassembly(Flow *f)
Check if a flow needs forced reassembly, or any other processing.
Definition: flow-timeout.c:295
FlowDisableFlowManagerThread
void FlowDisableFlowManagerThread(void)
Used to disable flow manager thread(s).
Definition: flow-manager.c:142
SCCtrlCondT
#define SCCtrlCondT
Definition: threads-debug.h:382
detect.h
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1747
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
TimeModeIsReady
bool TimeModeIsReady(void)
Definition: util-time.c:92
Flow_::flow_end_flags
uint8_t flow_end_flags
Definition: flow.h:453
THV_KILL
#define THV_KILL
Definition: threadvars.h:39
MemcapsGetPressure
float MemcapsGetPressure(void)
Definition: runmode-unix-socket.c:134
FlowBypassInfo_::todstpktcnt
uint64_t todstpktcnt
Definition: flow.h:536
util-time.h
FlowQueuePrivateGetFromTop
Flow * FlowQueuePrivateGetFromTop(FlowQueuePrivate *fqc)
Definition: flow-queue.c:151
FlowBypassInfo_::bypass_data
void * bypass_data
Definition: flow.h:533
FlowQueueAppendPrivate
void FlowQueueAppendPrivate(FlowQueue *fq, FlowQueuePrivate *fqc)
Definition: flow-queue.c:119
FlowQueueTimeoutCounters
struct FlowQueueTimeoutCounters FlowQueueTimeoutCounters
FlowCounters_::flow_mgr_flows_aside_needs_work
uint16_t flow_mgr_flows_aside_needs_work
Definition: flow-manager.c:608
app-layer-parser.h
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
FlowRecyclerThreadData_
Definition: flow-manager.c:977
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:289
FLOW_IS_IPV4
#define FLOW_IS_IPV4(f)
Definition: flow.h:160
FlowRecyclerThreadData_::counter_tcp_active_sessions
uint16_t counter_tcp_active_sessions
Definition: flow-manager.c:985
FlowCounters_::flow_bypassed_cnt_clo
uint16_t flow_bypassed_cnt_clo
Definition: flow-manager.c:612
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:83
SC_ATOMIC_SUB
#define SC_ATOMIC_SUB(name, val)
sub a value from our atomic variable
Definition: util-atomic.h:342
flow_timeouts_emerg
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
Definition: flow.c:95
TimeModeIsLive
bool TimeModeIsLive(void)
Definition: util-time.c:111
stream.h
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:376
FlowTimeoutsEmergency
void FlowTimeoutsEmergency(void)
Definition: flow-manager.c:105
FlowTimeoutCounters
struct FlowTimeoutCounters_ FlowTimeoutCounters
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:33
TimeGet
SCTime_t TimeGet(void)
Definition: util-time.c:152
stream-tcp-private.h
conf.h
FlowRecyclerThreadData_::output_thread_data
void * output_thread_data
Definition: flow-manager.c:978
FlowTimeoutCounters_::rows_checked
uint32_t rows_checked
Definition: flow-manager.c:119
FBLOCK_UNLOCK
#define FBLOCK_UNLOCK(fb)
Definition: flow-hash.h:75
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:83
FlowClearMemory
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition: flow.c:1107
flow_timeouts_delta
FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]
Definition: flow.c:96
FlowRecyclerThreadSpawn
void FlowRecyclerThreadSpawn(void)
spawn the flow recycler thread
Definition: flow-manager.c:1134
output-flow.h
detect-engine-state.h
Data structures and function prototypes for keeping state for the detection engine.
flow-timeout.h
flow-queue.h
TmModule_::name
const char * name
Definition: tm-modules.h:44
FlowRecyclerThreadData
struct FlowRecyclerThreadData_ FlowRecyclerThreadData
FlowBypassInfo_::tosrcpktcnt
uint64_t tosrcpktcnt
Definition: flow.h:534
host-timeout.h
SCCtrlCondTimedwait
#define SCCtrlCondTimedwait
Definition: threads-debug.h:385
StreamTcpThreadCacheCleanup
void StreamTcpThreadCacheCleanup(void)
Definition: stream-tcp-cache.c:134
runmodes.h
FlowTimeoutCounters_::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-manager.c:129
FlowGetStorageById
void * FlowGetStorageById(const Flow *f, FlowStorageId id)
Definition: flow-storage.c:40
Flow_::next
struct Flow_ * next
Definition: flow.h:402
FlowQueue_
Definition: flow-queue.h:49
FlowCounters_::flow_bypassed_pkts
uint16_t flow_bypassed_pkts
Definition: flow-manager.c:613
TMM_FLOWMANAGER
@ TMM_FLOWMANAGER
Definition: tm-threads-common.h:72
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:38
flow_hash
FlowBucket * flow_hash
Definition: flow-hash.c:57
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:378
FlowQueueExtractPrivate
FlowQueuePrivate FlowQueueExtractPrivate(FlowQueue *fq)
Definition: flow-queue.c:140
flow-storage.h
FlowTimeoutCounters_
Definition: flow-manager.c:118
FlowManagerThreadData_
Definition: flow-manager.c:620
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flow-manager.h
suricata-common.h
SCCtrlMutex
#define SCCtrlMutex
Definition: threads-debug.h:373
DefragTimeoutHash
uint32_t DefragTimeoutHash(SCTime_t ts)
time out tracker from the hash
Definition: defrag-timeout.c:121
FlowManagerThreadData
struct FlowManagerThreadData_ FlowManagerThreadData
flow_config
FlowConfig flow_config
Definition: flow.c:99
FlowQueueTimeoutCounters::flows_timeout
uint32_t flows_timeout
Definition: flow-manager.c:593
TmModuleFlowManagerRegister
void TmModuleFlowManagerRegister(void)
Definition: flow-manager.c:1233
FlowCounters_::memcap_pressure
uint16_t memcap_pressure
Definition: flow-manager.c:616
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
FlowTimeoutsInit
void FlowTimeoutsInit(void)
Definition: flow-manager.c:100
SCTIME_SECS
#define SCTIME_SECS(t)
Definition: util-time.h:57
SC_ATOMIC_LOAD_EXPLICIT
#define SC_ATOMIC_LOAD_EXPLICIT(name, order)
Definition: util-atomic.h:379
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
FatalError
#define FatalError(...)
Definition: util-debug.h:502
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
Flow_::livedev
struct LiveDevice_ * livedev
Definition: flow.h:404
threadvars.h
util-validate.h
FlowQueuePrivate_
Definition: flow-queue.h:41
StatsAddUI64
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:146
SCLogConfig
struct SCLogConfig_ SCLogConfig
Holds the config state used by the logging api.
FlowRecyclerThreadData_::counter_queue_max
uint16_t counter_queue_max
Definition: flow-manager.c:982
FlowTimeoutCounters_::bypassed_bytes
uint64_t bypassed_bytes
Definition: flow-manager.c:133
FlowManagerThreadData_::min
uint32_t min
Definition: flow-manager.c:622
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
OutputFlowLogThreadDeinit
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
Definition: output-flow.c:166
flow_recycle_q
FlowQueue flow_recycle_q
Definition: flow-manager.c:81
SCFree
#define SCFree(p)
Definition: util-mem.h:61
FlowTimeoutCounters_::bypassed_pkts
uint64_t bypassed_pkts
Definition: flow-manager.c:132
FlowCounters_::memcap_pressure_max
uint16_t memcap_pressure_max
Definition: flow-manager.c:617
Flow_::flags
uint32_t flags
Definition: flow.h:427
HostTimeoutHash
uint32_t HostTimeoutHash(SCTime_t ts)
time out hosts from the hash
Definition: host-timeout.c:156
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:318
FlowManagerThreadData_::timeout
FlowManagerTimeoutThread timeout
Definition: flow-manager.c:627
util-random.h
FlowCounters
struct FlowCounters_ FlowCounters
FlowCounters_::flow_emerg_mode_enter
uint16_t flow_emerg_mode_enter
Definition: flow-manager.c:601
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
FLOW_END_FLAG_SHUTDOWN
#define FLOW_END_FLAG_SHUTDOWN
Definition: flow.h:244
FLOW_EMERGENCY
#define FLOW_EMERGENCY
Definition: flow-private.h:37
suricata.h
ippair-timeout.h
FLOW_PROTO_MAX
@ FLOW_PROTO_MAX
Definition: flow-private.h:76
FlowRecyclerThreadData_::counter_flow_active
uint16_t counter_flow_active
Definition: flow-manager.c:984
FlowCounters_::flow_mgr_flows_timeout
uint16_t flow_mgr_flows_timeout
Definition: flow-manager.c:606
FlowManagerTimeoutThread::aside_queue
FlowQueuePrivate aside_queue
Definition: flow-manager.c:271
timeradd
#define timeradd(a, b, r)
Definition: util-time.h:127
FLOW_END_FLAG_TIMEOUT
#define FLOW_END_FLAG_TIMEOUT
Definition: flow.h:242
StatsSyncCounters
#define StatsSyncCounters(tv)
Definition: counters.h:137
FlowTimeoutCounters_::flows_notimeout
uint32_t flows_notimeout
Definition: flow-manager.c:125
flow_timeouts_normal
FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX]
Definition: flow.c:94
StatsRegisterAvgCounter
uint16_t StatsRegisterAvgCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the average of all the values assigned to it.
Definition: counters.c:975
FlowCounters_::flow_mgr_spare
uint16_t flow_mgr_spare
Definition: flow-manager.c:600
FlowTimeoutCounters_::flows_aside
uint32_t flows_aside
Definition: flow-manager.c:128
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:376
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:316
flow.h
FlowQueuePrivateAppendFlow
void FlowQueuePrivateAppendFlow(FlowQueuePrivate *fqc, Flow *f)
Definition: flow-queue.c:65
FlowEndCountersRegister
void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
Definition: flow-util.c:242
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:91
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
evicted
Flow * evicted
Definition: flow-hash.h:4
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:955
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
BITS
#define BITS
Flow_::timeout_at
uint32_t timeout_at
Definition: flow.h:397
FlowEndCounters_
Definition: flow-util.h:157
flow_recycler_ctrl_mutex
SCCtrlMutex flow_recycler_ctrl_mutex
Definition: flow-manager.c:98
flow-var.h
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:70
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:360
FlowCounters_::flow_emerg_mode_over
uint16_t flow_emerg_mode_over
Definition: flow-manager.c:602
TM_FLAG_MANAGEMENT_TM
#define TM_FLAG_MANAGEMENT_TM
Definition: tm-modules.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:164
FlowTimeoutCounters_::rows_skipped
uint32_t rows_skipped
Definition: flow-manager.c:120
FlowCounters_::flow_mgr_flows_checked
uint16_t flow_mgr_flows_checked
Definition: flow-manager.c:604
SCTIME_USECS
#define SCTIME_USECS(t)
Definition: util-time.h:56
FlowCounters_::flow_mgr_rows_sec
uint16_t flow_mgr_rows_sec
Definition: flow-manager.c:598
FlowQueueTimeoutCounters
Definition: flow-manager.c:591
FlowTimeoutCounters::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-worker.c:60
FQLOCK_UNLOCK
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:75
HttpRangeContainersTimeoutHash
uint32_t HttpRangeContainersTimeoutHash(const SCTime_t ts)
Definition: app-layer-htp-range.c:188