suricata
flow-manager.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2023 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22  * \author Victor Julien <victor@inliniac.net>
23  */
24 
25 #include "suricata-common.h"
26 #include "conf.h"
27 #include "threadvars.h"
28 #include "tm-threads.h"
29 #include "runmodes.h"
30 
31 #include "util-random.h"
32 #include "util-time.h"
33 
34 #include "flow.h"
35 #include "flow-queue.h"
36 #include "flow-hash.h"
37 #include "flow-util.h"
38 #include "flow-private.h"
39 #include "flow-timeout.h"
40 #include "flow-manager.h"
41 #include "flow-storage.h"
42 #include "flow-spare-pool.h"
43 
44 #include "stream-tcp-reassemble.h"
45 #include "stream-tcp.h"
46 
47 #include "util-unittest.h"
48 #include "util-unittest-helper.h"
49 #include "util-device.h"
50 
51 #include "util-debug.h"
52 
53 #include "threads.h"
54 #include "detect.h"
55 #include "detect-engine-state.h"
56 #include "stream.h"
57 
58 #include "app-layer-parser.h"
59 
60 #include "host-timeout.h"
61 #include "defrag-timeout.h"
62 #include "ippair-timeout.h"
63 #include "app-layer-htp-range.h"
64 
65 #include "output-flow.h"
66 
67 #include "runmode-unix-socket.h"
68 
69 /* Run mode selected at suricata.c */
70 extern int run_mode;
71 
72 /** queue to pass flows to cleanup/log thread(s) */
74 
75 /* multi flow manager support */
76 static uint32_t flowmgr_number = 1;
77 /* atomic counter for flow managers, to assign instance id */
78 SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
79 
80 /* multi flow recycler support */
81 static uint32_t flowrec_number = 1;
82 /* atomic counter for flow recyclers, to assign instance id */
83 SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
84 SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
85 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
86 
91 
92 void FlowTimeoutsInit(void)
93 {
94  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
95 }
96 
98 {
99  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
100 }
101 
102 /* 1 seconds */
103 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
104 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
105 /* 0.3 seconds */
106 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
107 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
108 #define NEW_FLOW_COUNT_COND 10
109 
110 typedef struct FlowTimeoutCounters_ {
111  uint32_t rows_checked;
112  uint32_t rows_skipped;
113  uint32_t rows_empty;
114  uint32_t rows_maxlen;
115 
116  uint32_t flows_checked;
117  uint32_t flows_notimeout;
118  uint32_t flows_timeout;
119  uint32_t flows_removed;
120  uint32_t flows_aside;
122 
123  uint32_t bypassed_count;
124  uint64_t bypassed_pkts;
125  uint64_t bypassed_bytes;
127 
128 /**
129  * \brief Used to disable flow manager thread(s).
130  *
131  * \todo Kinda hackish since it uses the tv name to identify flow manager
132  * thread. We need an all weather identification scheme.
133  */
135 {
137  /* flow manager thread(s) is/are a part of mgmt threads */
138  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
139  if (strncasecmp(tv->name, thread_name_flow_mgr,
140  strlen(thread_name_flow_mgr)) == 0)
141  {
143  }
144  }
146 
147  struct timeval start_ts;
148  struct timeval cur_ts;
149  gettimeofday(&start_ts, NULL);
150 
151 again:
152  gettimeofday(&cur_ts, NULL);
153  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
154  FatalError("unable to get all flow manager "
155  "threads to shutdown in time");
156  }
157 
159  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
160  if (strncasecmp(tv->name, thread_name_flow_mgr,
161  strlen(thread_name_flow_mgr)) == 0)
162  {
165  /* sleep outside lock */
166  SleepMsec(1);
167  goto again;
168  }
169  }
170  }
172 
173  /* reset count, so we can kill and respawn (unix socket) */
174  SC_ATOMIC_SET(flowmgr_cnt, 0);
175  return;
176 }
177 
178 /** \internal
179  * \brief check if a flow is timed out
180  *
181  * \param f flow
182  * \param ts timestamp
183  *
184  * \retval 0 not timed out
185  * \retval 1 timed out
186  */
187 static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
188 {
189  uint32_t flow_times_out_at = f->timeout_at;
190  if (emerg) {
192  flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
193  }
194  if (*next_ts == 0 || flow_times_out_at < *next_ts)
195  *next_ts = flow_times_out_at;
196 
197  /* do the timeout check */
198  if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) {
199  return 0;
200  }
201 
202  return 1;
203 }
204 
205 /** \internal
206  * \brief check timeout of captured bypassed flow by querying capture method
207  *
208  * \param f Flow
209  * \param ts timestamp
210  * \param counters Flow timeout counters
211  *
212  * \retval 0 not timeout
213  * \retval 1 timeout (or not capture bypassed)
214  */
215 static inline int FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
216 {
217 #ifdef CAPTURE_OFFLOAD
218  if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
219  return 1;
220  }
221 
223  if (fc && fc->BypassUpdate) {
224  /* flow will be possibly updated */
225  uint64_t pkts_tosrc = fc->tosrcpktcnt;
226  uint64_t bytes_tosrc = fc->tosrcbytecnt;
227  uint64_t pkts_todst = fc->todstpktcnt;
228  uint64_t bytes_todst = fc->todstbytecnt;
229  bool update = fc->BypassUpdate(f, fc->bypass_data, SCTIME_SECS(ts));
230  if (update) {
231  SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f));
232  pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
233  bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
234  pkts_todst = fc->todstpktcnt - pkts_todst;
235  bytes_todst = fc->todstbytecnt - bytes_todst;
236  if (f->livedev) {
237  SC_ATOMIC_ADD(f->livedev->bypassed,
238  pkts_tosrc + pkts_todst);
239  }
240  counters->bypassed_pkts += pkts_tosrc + pkts_todst;
241  counters->bypassed_bytes += bytes_tosrc + bytes_todst;
242  return 0;
243  } else {
244  SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
245  if (f->livedev) {
246  if (FLOW_IS_IPV4(f)) {
247  LiveDevSubBypassStats(f->livedev, 1, AF_INET);
248  } else if (FLOW_IS_IPV6(f)) {
249  LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
250  }
251  }
252  counters->bypassed_count++;
253  return 1;
254  }
255  }
256 #endif /* CAPTURE_OFFLOAD */
257  return 1;
258 }
259 
260 typedef struct FlowManagerTimeoutThread {
261  /* used to temporarily store flows that have timed out and are
262  * removed from the hash */
265 
266 static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
267 {
268  FlowQueuePrivate recycle = { NULL, NULL, 0 };
269  counters->flows_aside += td->aside_queue.len;
270 
271  uint32_t cnt = 0;
272  Flow *f;
273  while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
274  /* flow is still locked */
275 
276  if (f->proto == IPPROTO_TCP &&
278  !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1) {
279  /* Send the flow to its thread */
281  FLOWLOCK_UNLOCK(f);
282  /* flow ownership is passed to the worker thread */
283 
284  counters->flows_aside_needs_work++;
285  continue;
286  }
287  FLOWLOCK_UNLOCK(f);
288 
289  FlowQueuePrivateAppendFlow(&recycle, f);
290  if (recycle.len == 100) {
293  }
294  cnt++;
295  }
296  if (recycle.len) {
299  }
300  return cnt;
301 }
302 
303 /**
304  * \internal
305  *
306  * \brief check all flows in a hash row for timing out
307  *
308  * \param f last flow in the hash row
309  * \param ts timestamp
310  * \param emergency bool indicating emergency mode
311  * \param counters ptr to FlowTimeoutCounters structure
312  */
313 static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts,
314  int emergency, FlowTimeoutCounters *counters, uint32_t *next_ts)
315 {
316  uint32_t checked = 0;
317  Flow *prev_f = NULL;
318 
319  do {
320  checked++;
321 
322  /* check flow timeout based on lastts and state. Both can be
323  * accessed w/o Flow lock as we do have the hash row lock (so flow
324  * can't disappear) and flow_state is atomic. lastts can only
325  * be modified when we have both the flow and hash row lock */
326 
327  /* timeout logic goes here */
328  if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {
329 
330  counters->flows_notimeout++;
331 
332  prev_f = f;
333  f = f->next;
334  continue;
335  }
336 
337  FLOWLOCK_WRLOCK(f);
338 
339  Flow *next_flow = f->next;
340 
341  /* never prune a flow that is used by a packet we
342  * are currently processing in one of the threads */
343  if (!FlowBypassedTimeout(f, ts, counters)) {
344  FLOWLOCK_UNLOCK(f);
345  prev_f = f;
346  f = f->next;
347  continue;
348  }
349 
351 
352  counters->flows_timeout++;
353 
354  RemoveFromHash(f, prev_f);
355 
357  /* flow is still locked in the queue */
358 
359  f = next_flow;
360  } while (f != NULL);
361 
362  counters->flows_checked += checked;
363  if (checked > counters->rows_maxlen)
364  counters->rows_maxlen = checked;
365 }
366 
367 static void FlowManagerHashRowClearEvictedList(
369 {
370  do {
371  FLOWLOCK_WRLOCK(f);
372  Flow *next_flow = f->next;
373  f->next = NULL;
374  f->fb = NULL;
375 
377  /* flow is still locked in the queue */
378 
379  f = next_flow;
380  } while (f != NULL);
381 }
382 
383 /**
384  * \brief time out flows from the hash
385  *
386  * \param ts timestamp
387  * \param hash_min min hash index to consider
388  * \param hash_max max hash index to consider
389  * \param counters ptr to FlowTimeoutCounters structure
390  *
391  * \retval cnt number of timed out flow
392  */
393 static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const uint32_t hash_min,
394  const uint32_t hash_max, FlowTimeoutCounters *counters)
395 {
396  uint32_t cnt = 0;
397  const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
398  const uint32_t rows_checked = hash_max - hash_min;
399  uint32_t rows_skipped = 0;
400  uint32_t rows_empty = 0;
401 
402 #if __WORDSIZE==64
403 #define BITS 64
404 #define TYPE uint64_t
405 #else
406 #define BITS 32
407 #define TYPE uint32_t
408 #endif
409 
410  const uint32_t ts_secs = SCTIME_SECS(ts);
411  for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
412  TYPE check_bits = 0;
413  const uint32_t check = MIN(BITS, (hash_max - idx));
414  for (uint32_t i = 0; i < check; i++) {
415  FlowBucket *fb = &flow_hash[idx+i];
416  check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(
417  fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs)
418  << (TYPE)i;
419  }
420  if (check_bits == 0)
421  continue;
422 
423  for (uint32_t i = 0; i < check; i++) {
424  FlowBucket *fb = &flow_hash[idx+i];
425  if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) {
426  FBLOCK_LOCK(fb);
427  Flow *evicted = NULL;
428  if (fb->evicted != NULL || fb->head != NULL) {
429  if (fb->evicted != NULL) {
430  /* transfer out of bucket so we can do additional work outside
431  * of the bucket lock */
432  evicted = fb->evicted;
433  fb->evicted = NULL;
434  }
435  if (fb->head != NULL) {
436  uint32_t next_ts = 0;
437  FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
438 
439  if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
440  SC_ATOMIC_SET(fb->next_ts, next_ts);
441  }
442  if (fb->evicted == NULL && fb->head == NULL) {
443  SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
444  }
445  } else {
446  SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
447  rows_empty++;
448  }
449  FBLOCK_UNLOCK(fb);
450  /* processed evicted list */
451  if (evicted) {
452  FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
453  }
454  } else {
455  rows_skipped++;
456  }
457  }
458  if (td->aside_queue.len) {
459  cnt += ProcessAsideQueue(td, counters);
460  }
461  }
462 
463  counters->rows_checked += rows_checked;
464  counters->rows_skipped += rows_skipped;
465  counters->rows_empty += rows_empty;
466 
467  if (td->aside_queue.len) {
468  cnt += ProcessAsideQueue(td, counters);
469  }
470  counters->flows_removed += cnt;
471  /* coverity[missing_unlock : FALSE] */
472  return cnt;
473 }
474 
475 /** \internal
476  * \brief handle timeout for a slice of hash rows
477  * If we wrap around we call FlowTimeoutHash twice */
478 static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts,
479  const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
480  const uint32_t rows, uint32_t *pos)
481 {
482  uint32_t start = 0;
483  uint32_t end = 0;
484  uint32_t cnt = 0;
485  uint32_t rows_left = rows;
486 
487 again:
488  start = hash_min + (*pos);
489  if (start >= hash_max) {
490  start = hash_min;
491  }
492  end = start + rows_left;
493  if (end > hash_max) {
494  end = hash_max;
495  }
496  *pos = (end == hash_max) ? hash_min : end;
497  rows_left = rows_left - (end - start);
498 
499  cnt += FlowTimeoutHash(td, ts, start, end, counters);
500  if (rows_left) {
501  goto again;
502  }
503  return cnt;
504 }
505 
506 /**
507  * \internal
508  *
509  * \brief move all flows out of a hash row
510  *
511  * \param f last flow in the hash row
512  *
513  * \retval cnt removed out flows
514  */
515 static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
516 {
517  uint32_t cnt = 0;
518 
519  do {
520  FLOWLOCK_WRLOCK(f);
521 
522  Flow *next_flow = f->next;
523 
524  /* remove from the hash */
525  if (mode == 0) {
526  RemoveFromHash(f, NULL);
527  } else {
528  FlowBucket *fb = f->fb;
529  fb->evicted = f->next;
530  f->next = NULL;
531  f->fb = NULL;
532  }
534 
535  /* no one is referring to this flow, removed from hash
536  * so we can unlock it and move it to the recycle queue. */
537  FLOWLOCK_UNLOCK(f);
538  FlowQueuePrivateAppendFlow(recycle_q, f);
539 
540  cnt++;
541 
542  f = next_flow;
543  } while (f != NULL);
544 
545  return cnt;
546 }
547 
548 /**
549  * \brief remove all flows from the hash
550  *
551  * \retval cnt number of removes out flows
552  */
553 static uint32_t FlowCleanupHash(void)
554 {
555  FlowQueuePrivate local_queue = { NULL, NULL, 0 };
556  uint32_t cnt = 0;
557 
558  for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
559  FlowBucket *fb = &flow_hash[idx];
560 
561  FBLOCK_LOCK(fb);
562 
563  if (fb->head != NULL) {
564  /* we have a flow, or more than one */
565  cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
566  }
567  if (fb->evicted != NULL) {
568  /* we have a flow, or more than one */
569  cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
570  }
571 
572  FBLOCK_UNLOCK(fb);
573  if (local_queue.len >= 25) {
574  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
576  }
577  }
578  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
580 
581  return cnt;
582 }
583 
584 typedef struct FlowQueueTimeoutCounters {
585  uint32_t flows_removed;
586  uint32_t flows_timeout;
588 
589 typedef struct FlowCounters_ {
592 
593  uint16_t flow_mgr_spare;
596 
602 
604 
608 
609  uint16_t memcap_pressure;
612 
613 typedef struct FlowManagerThreadData_ {
614  uint32_t instance;
615  uint32_t min;
616  uint32_t max;
617 
619 
622 
623 static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
624 {
625  fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t);
626  fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", t);
627 
628  fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
629  fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
630  fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
631 
632  fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t);
633  fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t);
634  fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t);
635  fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t);
636  fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t);
637  fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t);
638 
639  fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
640  fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
641  fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
642 
643  fc->memcap_pressure = StatsRegisterCounter("memcap_pressure", t);
644  fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap_pressure_max", t);
645 }
646 
647 static void FlowCountersUpdate(
648  ThreadVars *th_v, const FlowManagerThreadData *ftd, const FlowTimeoutCounters *counters)
649 {
650  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters->flows_checked);
651  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters->flows_notimeout);
652 
653  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters->flows_timeout);
654  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters->flows_aside);
656  (uint64_t)counters->flows_aside_needs_work);
657 
658  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters->bypassed_count);
659  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters->bypassed_pkts);
660  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters->bypassed_bytes);
661 
662  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters->rows_maxlen);
663 }
664 
665 static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
666 {
668  if (ftd == NULL)
669  return TM_ECODE_FAILED;
670 
671  ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
672  SCLogDebug("flow manager instance %u", ftd->instance);
673 
674  /* set the min and max value used for hash row walking
675  * each thread has it's own section of the flow hash */
676  uint32_t range = flow_config.hash_size / flowmgr_number;
677 
678  ftd->min = ftd->instance * range;
679  ftd->max = (ftd->instance + 1) * range;
680 
681  /* last flow-manager takes on hash_size % flowmgr_number extra rows */
682  if ((ftd->instance + 1) == flowmgr_number) {
683  ftd->max = flow_config.hash_size;
684  }
686 
687  SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
688 
689  /* pass thread data back to caller */
690  *data = ftd;
691 
692  FlowCountersInit(t, &ftd->cnt);
693 
694  PacketPoolInit();
695  return TM_ECODE_OK;
696 }
697 
698 static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
699 {
702  SCFree(data);
703  return TM_ECODE_OK;
704 }
705 
706 /** \internal
707  * \brief calculate number of rows to scan and how much time to sleep
708  * based on the busy score `mp` (0 idle, 100 max busy).
709  *
710  * We try to to make sure we scan the hash once a second. The number size
711  * of the slice of the hash scanned is determined by our busy score 'mp'.
712  * We sleep for the remainder of the second after processing the slice,
713  * or at least an approximation of it.
714  * A minimum busy score of 10 is assumed to avoid a longer than 10 second
715  * full hash pass. This is to avoid burstiness in scanning when there is
716  * a rapid increase of the busy score, which could lead to the flow manager
717  * suddenly scanning a much larger slice of the hash leading to a burst
718  * in scan/eviction work.
719  */
720 static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
721  uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
722 {
723  if (emergency) {
724  *wu_rows = rows;
725  *wu_sleep = 250;
726  return;
727  }
728  /* minimum busy score is 10 */
729  const uint32_t emp = MAX(mp, 10);
730  const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));
731  /* calc how much time we estimate the work will take, in ms. We assume
732  * each row takes an average of 1usec. Maxing out at 1sec. */
733  const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);
734  /* calc how much time we need to sleep to get to the per second cadence
735  * but sleeping for at least 250ms. */
736  const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
737  SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
738  sleep_per_unit);
739 
740  *wu_sleep = sleep_per_unit;
741  *wu_rows = rows_per_sec;
742  *rows_sec = rows_per_sec;
743 }
744 
745 /** \brief Thread that manages the flow table and times out flows.
746  *
747  * \param td ThreadVars cast to void ptr
748  *
749  * Keeps an eye on the spare list, alloc flows if needed...
750  */
751 static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
752 {
753  FlowManagerThreadData *ftd = thread_data;
754  const uint32_t rows = ftd->max - ftd->min;
755  const bool time_is_live = TimeModeIsLive();
756 
757  uint32_t emerg_over_cnt = 0;
758  uint64_t next_run_ms = 0;
759  uint32_t pos = 0;
760  uint32_t rows_sec = 0;
761  uint32_t rows_per_wu = 0;
762  uint64_t sleep_per_wu = 0;
763  bool prev_emerg = false;
764  uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
765  SCTime_t ts;
766 
767  /* don't start our activities until time is setup */
768  while (!TimeModeIsReady()) {
769  if (suricata_ctl_flags != 0)
770  return TM_ECODE_OK;
771  usleep(10);
772  }
773 
774  uint32_t mp = MemcapsGetPressure() * 100;
775  if (ftd->instance == 0) {
776  StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
777  StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
778  }
779  GetWorkUnitSizing(rows, mp, false, &sleep_per_wu, &rows_per_wu, &rows_sec);
780  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
781 
783 
784  while (1)
785  {
786  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
790  }
791 
792  bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);
793 
794  /* Get the time */
795  ts = TimeGet();
796  SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts));
797  uint64_t ts_ms = SCTIME_MSECS(ts);
798  const bool emerge_p = (emerg && !prev_emerg);
799  if (emerge_p) {
800  next_run_ms = 0;
801  prev_emerg = true;
802  SCLogNotice("Flow emergency mode entered...");
803  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter);
804  }
805  if (ts_ms >= next_run_ms) {
806  if (ftd->instance == 0) {
807  const uint32_t sq_len = FlowSpareGetPoolSize();
808  const uint32_t spare_perc = sq_len * 100 / MAX(flow_config.prealloc, 1);
809  /* see if we still have enough spare flows */
810  if (spare_perc < 90 || spare_perc > 110) {
811  FlowSparePoolUpdate(sq_len);
812  }
813  }
814 
815  /* try to time out flows */
816  // clang-format off
817  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, };
818  // clang-format on
819 
820  if (emerg) {
821  /* in emergency mode, do a full pass of the hash table */
822  FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters);
823  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
824  } else {
825  SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,
826  rows_per_wu);
827 
828  const uint32_t ppos = pos;
829  FlowTimeoutHashInChunks(
830  &ftd->timeout, ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos);
831  if (ppos > pos) {
832  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
833  }
834  }
835 
836  const uint32_t spare_pool_len = FlowSpareGetPoolSize();
837  StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len);
838 
839  FlowCountersUpdate(th_v, ftd, &counters);
840 
841  if (emerg == true) {
842  SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32
843  "flow_spare_q status: %" PRIu32 "%% flows at the queue",
844  spare_pool_len, flow_config.prealloc,
845  spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
846 
847  /* only if we have pruned this "emergency_recovery" percentage
848  * of flows, we will unset the emergency bit */
849  if ((spare_pool_len * 100 / MAX(flow_config.prealloc, 1)) >
851  emerg_over_cnt++;
852  } else {
853  emerg_over_cnt = 0;
854  }
855 
856  if (emerg_over_cnt >= 30) {
857  SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
859 
860  emerg = false;
861  prev_emerg = false;
862  emerg_over_cnt = 0;
863  SCLogNotice("Flow emergency mode over, back to normal... unsetting"
864  " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", "
865  "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32
866  "%% flows at the queue",
867  (uintmax_t)SCTIME_SECS(ts), (uintmax_t)SCTIME_USECS(ts),
868  spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
869 
870  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
871  }
872  }
873 
874  /* update work units */
875  const uint32_t pmp = mp;
876  mp = MemcapsGetPressure() * 100;
877  if (ftd->instance == 0) {
878  StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
879  StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
880  }
881  GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
882  if (pmp != mp) {
883  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
884  }
885 
886  next_run_ms = ts_ms + sleep_per_wu;
887  }
888  if (other_last_sec == 0 || other_last_sec < (uint32_t)SCTIME_SECS(ts)) {
889  if (ftd->instance == 0) {
894  other_last_sec = (uint32_t)SCTIME_SECS(ts);
895  }
896  }
897 
898  if (TmThreadsCheckFlag(th_v, THV_KILL)) {
899  StatsSyncCounters(th_v);
900  break;
901  }
902 
903  if (emerg || !time_is_live) {
904  usleep(250);
905  } else {
906  struct timeval cond_tv;
907  gettimeofday(&cond_tv, NULL);
908  struct timeval add_tv;
909  add_tv.tv_sec = 0;
910  add_tv.tv_usec = (sleep_per_wu * 1000);
911  timeradd(&cond_tv, &add_tv, &cond_tv);
912 
913  struct timespec cond_time = FROM_TIMEVAL(cond_tv);
915  while (1) {
916  int rc = SCCtrlCondTimedwait(
918  if (rc == ETIMEDOUT || rc < 0)
919  break;
920  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
921  break;
922  }
923  }
925  }
926 
927  SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
928 
930  }
931  return TM_ECODE_OK;
932 }
933 
934 /** \brief spawn the flow manager thread */
936 {
937  intmax_t setting = 1;
938  (void)ConfGetInt("flow.managers", &setting);
939 
940  if (setting < 1 || setting > 1024) {
941  FatalError("invalid flow.managers setting %" PRIdMAX, setting);
942  }
943  flowmgr_number = (uint32_t)setting;
944 
947 
948  SCLogConfig("using %u flow manager threads", flowmgr_number);
950 
951  for (uint32_t u = 0; u < flowmgr_number; u++) {
952  char name[TM_THREAD_NAME_MAX];
953  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
954 
955  ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
956  "FlowManager", 0);
957  BUG_ON(tv_flowmgr == NULL);
958 
959  if (tv_flowmgr == NULL) {
960  FatalError("flow manager thread creation failed");
961  }
962  if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
963  FatalError("flow manager thread spawn failed");
964  }
965  }
966  return;
967 }
968 
969 typedef struct FlowRecyclerThreadData_ {
971 
972  uint16_t counter_flows;
975 
980 
981 static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
982 {
984  if (ftd == NULL)
985  return TM_ECODE_FAILED;
986  if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
987  SCLogError("initializing flow log API for thread failed");
988  SCFree(ftd);
989  return TM_ECODE_FAILED;
990  }
991  SCLogDebug("output_thread_data %p", ftd->output_thread_data);
992 
993  ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t);
994  ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t);
995  ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t);
996 
997  ftd->counter_flow_active = StatsRegisterCounter("flow.active", t);
998  ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t);
999 
1000  FlowEndCountersRegister(t, &ftd->fec);
1001 
1002  *data = ftd;
1003  return TM_ECODE_OK;
1004 }
1005 
1006 static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
1007 {
1009 
1011  if (ftd->output_thread_data != NULL)
1013 
1014  SCFree(data);
1015  return TM_ECODE_OK;
1016 }
1017 
1018 static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
1019 {
1020  FLOWLOCK_WRLOCK(f);
1021 
1022  (void)OutputFlowLog(tv, ftd->output_thread_data, f);
1023 
1024  FlowEndCountersUpdate(tv, &ftd->fec, f);
1025  if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
1027  }
1029 
1030  FlowClearMemory(f, f->protomap);
1031  FLOWLOCK_UNLOCK(f);
1032 }
1033 
1034 extern uint32_t flow_spare_pool_block_size;
1035 
1036 /** \brief Thread that manages timed out flows.
1037  *
1038  * \param td ThreadVars cast to void ptr
1039  */
1040 static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
1041 {
1042  FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
1043  BUG_ON(ftd == NULL);
1044  const bool time_is_live = TimeModeIsLive();
1045  uint64_t recycled_cnt = 0;
1046  FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
1047 
1049 
1050  while (1)
1051  {
1052  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
1056  }
1057  SC_ATOMIC_ADD(flowrec_busy,1);
1059 
1060  StatsAddUI64(th_v, ftd->counter_queue_avg, list.len);
1061  StatsSetUI64(th_v, ftd->counter_queue_max, list.len);
1062 
1063  const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
1064 
1065  /* Get the time */
1066  SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(TimeGet()));
1067 
1068  uint64_t cnt = 0;
1069  Flow *f;
1070  while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
1071  Recycler(th_v, ftd, f);
1072  cnt++;
1073 
1074  /* for every full sized block, add it to the spare pool */
1075  FlowQueuePrivateAppendFlow(&ret_queue, f);
1076  if (ret_queue.len == flow_spare_pool_block_size) {
1077  FlowSparePoolReturnFlows(&ret_queue);
1078  }
1079  }
1080  if (ret_queue.len > 0) {
1081  FlowSparePoolReturnFlows(&ret_queue);
1082  }
1083  if (cnt > 0) {
1084  recycled_cnt += cnt;
1085  StatsAddUI64(th_v, ftd->counter_flows, cnt);
1086  }
1087  SC_ATOMIC_SUB(flowrec_busy,1);
1088 
1089  if (bail) {
1090  break;
1091  }
1092 
1093  const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY);
1094  if (emerg || !time_is_live) {
1095  usleep(250);
1096  } else {
1097  struct timeval cond_tv;
1098  gettimeofday(&cond_tv, NULL);
1099  cond_tv.tv_sec += 1;
1100  struct timespec cond_time = FROM_TIMEVAL(cond_tv);
1102  while (1) {
1103  int rc = SCCtrlCondTimedwait(
1105  if (rc == ETIMEDOUT || rc < 0) {
1106  break;
1107  }
1108  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
1109  break;
1110  }
1111  if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) {
1112  break;
1113  }
1114  }
1116  }
1117 
1118  SCLogDebug("woke up...");
1119 
1121  }
1122  StatsSyncCounters(th_v);
1123  SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
1124  return TM_ECODE_OK;
1125 }
1126 
1127 static bool FlowRecyclerReadyToShutdown(void)
1128 {
1129  if (SC_ATOMIC_GET(flowrec_busy) != 0) {
1130  return false;
1131  }
1132  uint32_t len = 0;
1134  len = flow_recycle_q.qlen;
1136 
1137  return ((len == 0));
1138 }
1139 
1140 /** \brief spawn the flow recycler thread */
1142 {
1143  intmax_t setting = 1;
1144  (void)ConfGetInt("flow.recyclers", &setting);
1145 
1146  if (setting < 1 || setting > 1024) {
1147  FatalError("invalid flow.recyclers setting %" PRIdMAX, setting);
1148  }
1149  flowrec_number = (uint32_t)setting;
1150 
1153 
1154  SCLogConfig("using %u flow recycler threads", flowrec_number);
1155 
1156  for (uint32_t u = 0; u < flowrec_number; u++) {
1157  char name[TM_THREAD_NAME_MAX];
1158  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
1159 
1160  ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name,
1161  "FlowRecycler", 0);
1162 
1163  if (tv_flowrec == NULL) {
1164  FatalError("flow recycler thread creation failed");
1165  }
1166  if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) {
1167  FatalError("flow recycler thread spawn failed");
1168  }
1169  }
1170  return;
1171 }
1172 
1173 /**
1174  * \brief Used to disable flow recycler thread(s).
1175  *
1176  * \note this should only be called when the flow manager is already gone
1177  *
1178  * \todo Kinda hackish since it uses the tv name to identify flow recycler
1179  * thread. We need an all weather identification scheme.
1180  */
1182 {
1183  /* move all flows still in the hash to the recycler queue */
1184 #ifndef DEBUG
1185  (void)FlowCleanupHash();
1186 #else
1187  uint32_t flows = FlowCleanupHash();
1188  SCLogDebug("flows to progress: %u", flows);
1189 #endif
1190 
1191  /* make sure all flows are processed */
1192  do {
1194  usleep(10);
1195  } while (FlowRecyclerReadyToShutdown() == false);
1196 
1198  /* flow recycler thread(s) is/are a part of mgmt threads */
1199  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1200  if (strncasecmp(tv->name, thread_name_flow_rec,
1201  strlen(thread_name_flow_rec)) == 0)
1202  {
1204  }
1205  }
1207 
1208  struct timeval start_ts;
1209  struct timeval cur_ts;
1210  gettimeofday(&start_ts, NULL);
1211 
1212 again:
1213  gettimeofday(&cur_ts, NULL);
1214  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1215  FatalError("unable to get all flow recycler "
1216  "threads to shutdown in time");
1217  }
1218 
1220  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1221  if (strncasecmp(tv->name, thread_name_flow_rec,
1222  strlen(thread_name_flow_rec)) == 0)
1223  {
1227  /* sleep outside lock */
1228  SleepMsec(1);
1229  goto again;
1230  }
1231  }
1232  }
1234 
1235  /* reset count, so we can kill and respawn (unix socket) */
1236  SC_ATOMIC_SET(flowrec_cnt, 0);
1237  return;
1238 }
1239 
1241 {
1242  tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1243  tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1244  tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1245  tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1248  SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1249 
1250  SC_ATOMIC_INIT(flowmgr_cnt);
1251  SC_ATOMIC_INITPTR(flow_timeouts);
1252 }
1253 
1255 {
1256  tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1257  tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1258  tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1259  tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1262  SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1263 
1264  SC_ATOMIC_INIT(flowrec_cnt);
1265  SC_ATOMIC_INIT(flowrec_busy);
1266 }
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:73
FlowTimeoutCounters_::rows_empty
uint32_t rows_empty
Definition: flow-manager.c:113
FlowSparePoolUpdate
void FlowSparePoolUpdate(uint32_t size)
Definition: flow-spare-pool.c:219
FlowManagerThreadSpawn
void FlowManagerThreadSpawn(void)
spawn the flow manager thread
Definition: flow-manager.c:935
tm-threads.h
ConfGetInt
int ConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition: conf.c:399
FROM_TIMEVAL
#define FROM_TIMEVAL(timev)
initialize a 'struct timespec' from a 'struct timeval'.
Definition: util-time.h:108
OutputFlowLog
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition: output-flow.c:86
flow_manager_ctrl_mutex
SCCtrlMutex flow_manager_ctrl_mutex
Definition: flow-manager.c:88
FlowRecyclerThreadData_::fec
FlowEndCounters fec
Definition: flow-manager.c:978
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1651
app-layer-htp-range.h
FlowTimeoutCounters
Definition: flow-worker.c:61
FlowForceReassemblyForFlow
void FlowForceReassemblyForFlow(Flow *f)
Definition: flow-timeout.c:348
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1092
run_mode
int run_mode
Definition: suricata.c:175
FlowBucket_::evicted
Flow * evicted
Definition: flow-hash.h:48
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:167
FLOW_IS_IPV6
#define FLOW_IS_IPV6(f)
Definition: flow.h:164
FlowManagerThreadData_::max
uint32_t max
Definition: flow-manager.c:616
FlowManagerTimeoutThread
Definition: flow-manager.c:260
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:83
FlowSpareGetPoolSize
uint32_t FlowSpareGetPoolSize(void)
Definition: flow-spare-pool.c:47
flow-util.h
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:315
FlowManagerThreadData_::cnt
FlowCounters cnt
Definition: flow-manager.c:618
FBLOCK_LOCK
#define FBLOCK_LOCK(fb)
Definition: flow-hash.h:73
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
FlowWakeupFlowRecyclerThread
#define FlowWakeupFlowRecyclerThread()
Definition: flow-manager.h:33
TMM_FLOWRECYCLER
@ TMM_FLOWRECYCLER
Definition: tm-threads-common.h:73
stream-tcp.h
GetFlowBypassInfoID
FlowStorageId GetFlowBypassInfoID(void)
Definition: flow-util.c:217
FlowBypassInfo_
Definition: flow.h:520
FlowCnf_::emergency_recovery
uint32_t emergency_recovery
Definition: flow.h:293
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:387
FlowCnf_::hash_size
uint32_t hash_size
Definition: flow.h:287
FlowManagerThreadData_::instance
uint32_t instance
Definition: flow-manager.c:614
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
IPPairTimeoutHash
uint32_t IPPairTimeoutHash(SCTime_t ts)
time out ippairs from the hash
Definition: ippair-timeout.c:142
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:99
StatsRegisterGlobalCounter
uint16_t StatsRegisterGlobalCounter(const char *name, uint64_t(*Func)(void))
Registers a counter, which represents a global value.
Definition: counters.c:1019
Flow_::proto
uint8_t proto
Definition: flow.h:369
SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt)
flow_manager_ctrl_cond
SCCtrlCondT flow_manager_ctrl_cond
Definition: flow-manager.c:87
threads.h
flow-private.h
FlowCounters_::flow_mgr_flows_notimeout
uint16_t flow_mgr_flows_notimeout
Definition: flow-manager.c:598
Flow_
Flow data structure.
Definition: flow.h:347
TYPE
#define TYPE
flow_recycler_ctrl_cond
SCCtrlCondT flow_recycler_ctrl_cond
Definition: flow-manager.c:89
Flow_::protomap
uint8_t protomap
Definition: flow.h:441
SC_ATOMIC_EXTERN
SC_ATOMIC_EXTERN(unsigned int, flow_flags)
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:333
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:84
FlowProtoTimeout_
Definition: flow.h:509
StatsSetUI64
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:210
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
flow-hash.h
TmModuleFlowRecyclerRegister
void TmModuleFlowRecyclerRegister(void)
Definition: flow-manager.c:1254
FlowTimeoutCounters_::flows_removed
uint32_t flows_removed
Definition: flow-manager.c:119
flow_spare_pool_block_size
uint32_t flow_spare_pool_block_size
Definition: flow-spare-pool.c:43
FlowBypassInfo_::tosrcbytecnt
uint64_t tosrcbytecnt
Definition: flow.h:525
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
FlowGetMemuse
uint64_t FlowGetMemuse(void)
Definition: flow.c:140
MIN
#define MIN(x, y)
Definition: suricata-common.h:386
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:80
FlowTimeoutCounters_::flows_timeout
uint32_t flows_timeout
Definition: flow-manager.c:118
defrag-timeout.h
FlowRecyclerThreadData_::counter_flows
uint16_t counter_flows
Definition: flow-manager.c:972
stream-tcp-reassemble.h
FLOW_ACTION_DROP
#define FLOW_ACTION_DROP
Definition: flow.h:66
StatsSyncCountersIfSignalled
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:141
LiveDevSubBypassStats
void LiveDevSubBypassStats(LiveDevice *dev, uint64_t cnt, int family)
Definition: util-device.c:534
MAX
#define MAX(x, y)
Definition: suricata-common.h:390
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:85
FlowQueuePrivate_::len
uint32_t len
Definition: flow-queue.h:44
Flow_::protoctx
void * protoctx
Definition: flow.h:437
FlowCounters_
Definition: flow-manager.c:589
FlowManagerTimeoutThread
struct FlowManagerTimeoutThread FlowManagerTimeoutThread
FlowCounters_::flow_mgr_flows_aside
uint16_t flow_mgr_flows_aside
Definition: flow-manager.c:600
FLOW_TIMEOUT_REASSEMBLY_DONE
#define FLOW_TIMEOUT_REASSEMBLY_DONE
Definition: flow.h:93
runmode-unix-socket.h
util-unittest.h
FlowTimeoutCounters_::bypassed_count
uint32_t bypassed_count
Definition: flow-manager.c:123
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:37
FlowSparePoolReturnFlows
void FlowSparePoolReturnFlows(FlowQueuePrivate *fqp)
Definition: flow-spare-pool.c:122
TM_THREAD_NAME_MAX
#define TM_THREAD_NAME_MAX
Definition: tm-threads.h:49
util-unittest-helper.h
FlowCnf_::prealloc
uint32_t prealloc
Definition: flow.h:288
FlowQueueTimeoutCounters::flows_removed
uint32_t flows_removed
Definition: flow-manager.c:585
FLOWLOCK_UNLOCK
#define FLOWLOCK_UNLOCK(fb)
Definition: flow.h:266
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:84
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:284
Flow_::flow_state
FlowStateType flow_state
Definition: flow.h:408
FQLOCK_LOCK
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:73
FlowDisableFlowRecyclerThread
void FlowDisableFlowRecyclerThread(void)
Used to disable flow recycler thread(s).
Definition: flow-manager.c:1181
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
FlowTimeoutCounters_::rows_maxlen
uint32_t rows_maxlen
Definition: flow-manager.c:114
FlowCounters_::flow_bypassed_bytes
uint16_t flow_bypassed_bytes
Definition: flow-manager.c:607
FlowTimeoutCounters_::flows_checked
uint32_t flows_checked
Definition: flow-manager.c:116
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:45
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:107
flow-spare-pool.h
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
StatsDecr
void StatsDecr(ThreadVars *tv, uint16_t id)
Decrements the local counter.
Definition: counters.c:188
Flow_::fb
struct FlowBucket_ * fb
Definition: flow.h:484
FlowCounters_::flow_mgr_full_pass
uint16_t flow_mgr_full_pass
Definition: flow-manager.c:590
StatsRegisterMaxCounter
uint16_t StatsRegisterMaxCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the maximum of all the values assigned to it.
Definition: counters.c:1001
SC_ATOMIC_MEMORY_ORDER_RELAXED
#define SC_ATOMIC_MEMORY_ORDER_RELAXED
Definition: util-atomic.h:166
util-device.h
util-debug.h
FlowBypassInfo_::todstbytecnt
uint64_t todstbytecnt
Definition: flow.h:527
FlowBypassInfo_::BypassUpdate
bool(* BypassUpdate)(Flow *f, void *data, time_t tsec)
Definition: flow.h:521
FlowRecyclerThreadData_::counter_queue_avg
uint16_t counter_queue_avg
Definition: flow-manager.c:973
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:92
OutputFlowLogThreadInit
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void *initdata, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
Definition: output-flow.c:124
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
FlowCounters_::flow_mgr_rows_maxlen
uint16_t flow_mgr_rows_maxlen
Definition: flow-manager.c:603
FLOWLOCK_WRLOCK
#define FLOWLOCK_WRLOCK(fb)
Definition: flow.h:263
FlowTimeoutsReset
#define FlowTimeoutsReset()
Definition: flow-manager.h:35
FlowForceReassemblyNeedReassembly
int FlowForceReassemblyNeedReassembly(Flow *f)
Check if a flow needs forced reassembly, or any other processing.
Definition: flow-timeout.c:294
FlowDisableFlowManagerThread
void FlowDisableFlowManagerThread(void)
Used to disable flow manager thread(s).
Definition: flow-manager.c:134
SCCtrlCondT
#define SCCtrlCondT
Definition: threads-debug.h:382
detect.h
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1753
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:65
TimeModeIsReady
bool TimeModeIsReady(void)
Definition: util-time.c:92
Flow_::flow_end_flags
uint8_t flow_end_flags
Definition: flow.h:443
THV_KILL
#define THV_KILL
Definition: threadvars.h:39
MemcapsGetPressure
float MemcapsGetPressure(void)
Definition: runmode-unix-socket.c:135
FlowBypassInfo_::todstpktcnt
uint64_t todstpktcnt
Definition: flow.h:526
util-time.h
FlowQueuePrivateGetFromTop
Flow * FlowQueuePrivateGetFromTop(FlowQueuePrivate *fqc)
Definition: flow-queue.c:151
FlowBypassInfo_::bypass_data
void * bypass_data
Definition: flow.h:523
FlowQueueAppendPrivate
void FlowQueueAppendPrivate(FlowQueue *fq, FlowQueuePrivate *fqc)
Definition: flow-queue.c:119
FlowQueueTimeoutCounters
struct FlowQueueTimeoutCounters FlowQueueTimeoutCounters
FlowCounters_::flow_mgr_flows_aside_needs_work
uint16_t flow_mgr_flows_aside_needs_work
Definition: flow-manager.c:601
app-layer-parser.h
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
FlowRecyclerThreadData_
Definition: flow-manager.c:969
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:295
FLOW_IS_IPV4
#define FLOW_IS_IPV4(f)
Definition: flow.h:162
FlowRecyclerThreadData_::counter_tcp_active_sessions
uint16_t counter_tcp_active_sessions
Definition: flow-manager.c:977
FlowCounters_::flow_bypassed_cnt_clo
uint16_t flow_bypassed_cnt_clo
Definition: flow-manager.c:605
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:83
SC_ATOMIC_SUB
#define SC_ATOMIC_SUB(name, val)
sub a value from our atomic variable
Definition: util-atomic.h:342
flow_timeouts_emerg
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
Definition: flow.c:98
TimeModeIsLive
bool TimeModeIsLive(void)
Definition: util-time.c:111
stream.h
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:376
FlowTimeoutsEmergency
void FlowTimeoutsEmergency(void)
Definition: flow-manager.c:97
FlowTimeoutCounters
struct FlowTimeoutCounters_ FlowTimeoutCounters
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:33
TimeGet
SCTime_t TimeGet(void)
Definition: util-time.c:152
conf.h
FlowRecyclerThreadData_::output_thread_data
void * output_thread_data
Definition: flow-manager.c:970
FlowTimeoutCounters_::rows_checked
uint32_t rows_checked
Definition: flow-manager.c:111
FBLOCK_UNLOCK
#define FBLOCK_UNLOCK(fb)
Definition: flow-hash.h:75
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:83
FlowClearMemory
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition: flow.c:1115
flow_timeouts_delta
FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]
Definition: flow.c:99
FlowRecyclerThreadSpawn
void FlowRecyclerThreadSpawn(void)
spawn the flow recycler thread
Definition: flow-manager.c:1141
output-flow.h
detect-engine-state.h
Data structures and function prototypes for keeping state for the detection engine.
flow-timeout.h
flow-queue.h
TmModule_::name
const char * name
Definition: tm-modules.h:44
FlowRecyclerThreadData
struct FlowRecyclerThreadData_ FlowRecyclerThreadData
FlowBypassInfo_::tosrcpktcnt
uint64_t tosrcpktcnt
Definition: flow.h:524
host-timeout.h
SCCtrlCondTimedwait
#define SCCtrlCondTimedwait
Definition: threads-debug.h:385
StreamTcpThreadCacheCleanup
void StreamTcpThreadCacheCleanup(void)
Definition: stream-tcp-cache.c:134
runmodes.h
FlowTimeoutCounters_::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-manager.c:121
FlowGetStorageById
void * FlowGetStorageById(const Flow *f, FlowStorageId id)
Definition: flow-storage.c:40
Flow_::next
struct Flow_ * next
Definition: flow.h:392
FlowQueue_
Definition: flow-queue.h:49
FlowCounters_::flow_bypassed_pkts
uint16_t flow_bypassed_pkts
Definition: flow-manager.c:606
TMM_FLOWMANAGER
@ TMM_FLOWMANAGER
Definition: tm-threads-common.h:72
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:38
flow_hash
FlowBucket * flow_hash
Definition: flow-hash.c:58
FLOW_PROTO_MAX
@ FLOW_PROTO_MAX
Definition: flow-private.h:74
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:378
FlowQueueExtractPrivate
FlowQueuePrivate FlowQueueExtractPrivate(FlowQueue *fq)
Definition: flow-queue.c:140
flow-storage.h
FlowTimeoutCounters_
Definition: flow-manager.c:110
FlowManagerThreadData_
Definition: flow-manager.c:613
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flow-manager.h
suricata-common.h
SCCtrlMutex
#define SCCtrlMutex
Definition: threads-debug.h:373
DefragTimeoutHash
uint32_t DefragTimeoutHash(SCTime_t ts)
time out tracker from the hash
Definition: defrag-timeout.c:121
FlowManagerThreadData
struct FlowManagerThreadData_ FlowManagerThreadData
flow_config
FlowConfig flow_config
Definition: flow.c:102
FlowQueueTimeoutCounters::flows_timeout
uint32_t flows_timeout
Definition: flow-manager.c:586
TmModuleFlowManagerRegister
void TmModuleFlowManagerRegister(void)
Definition: flow-manager.c:1240
FlowCounters_::memcap_pressure
uint16_t memcap_pressure
Definition: flow-manager.c:609
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
FlowTimeoutsInit
void FlowTimeoutsInit(void)
Definition: flow-manager.c:92
SCTIME_SECS
#define SCTIME_SECS(t)
Definition: util-time.h:57
SC_ATOMIC_LOAD_EXPLICIT
#define SC_ATOMIC_LOAD_EXPLICIT(name, order)
Definition: util-atomic.h:379
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
FatalError
#define FatalError(...)
Definition: util-debug.h:502
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
Flow_::livedev
struct LiveDevice_ * livedev
Definition: flow.h:394
threadvars.h
FlowQueuePrivate_
Definition: flow-queue.h:41
StatsAddUI64
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:146
SCLogConfig
struct SCLogConfig_ SCLogConfig
Holds the config state used by the logging api.
FlowRecyclerThreadData_::counter_queue_max
uint16_t counter_queue_max
Definition: flow-manager.c:974
FlowTimeoutCounters_::bypassed_bytes
uint64_t bypassed_bytes
Definition: flow-manager.c:125
FlowManagerThreadData_::min
uint32_t min
Definition: flow-manager.c:615
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
OutputFlowLogThreadDeinit
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
Definition: output-flow.c:166
flow_recycle_q
FlowQueue flow_recycle_q
Definition: flow-manager.c:73
SCFree
#define SCFree(p)
Definition: util-mem.h:61
FlowTimeoutCounters_::bypassed_pkts
uint64_t bypassed_pkts
Definition: flow-manager.c:124
FlowCounters_::memcap_pressure_max
uint16_t memcap_pressure_max
Definition: flow-manager.c:610
Flow_::flags
uint32_t flags
Definition: flow.h:417
HostTimeoutHash
uint32_t HostTimeoutHash(SCTime_t ts)
time out hosts from the hash
Definition: host-timeout.c:156
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:318
FlowManagerThreadData_::timeout
FlowManagerTimeoutThread timeout
Definition: flow-manager.c:620
util-random.h
FlowCounters
struct FlowCounters_ FlowCounters
FlowCounters_::flow_emerg_mode_enter
uint16_t flow_emerg_mode_enter
Definition: flow-manager.c:594
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
FLOW_END_FLAG_SHUTDOWN
#define FLOW_END_FLAG_SHUTDOWN
Definition: flow.h:238
FLOW_EMERGENCY
#define FLOW_EMERGENCY
Definition: flow-private.h:37
ippair-timeout.h
FlowRecyclerThreadData_::counter_flow_active
uint16_t counter_flow_active
Definition: flow-manager.c:976
FlowCounters_::flow_mgr_flows_timeout
uint16_t flow_mgr_flows_timeout
Definition: flow-manager.c:599
FlowManagerTimeoutThread::aside_queue
FlowQueuePrivate aside_queue
Definition: flow-manager.c:263
timeradd
#define timeradd(a, b, r)
Definition: util-time.h:120
FLOW_END_FLAG_TIMEOUT
#define FLOW_END_FLAG_TIMEOUT
Definition: flow.h:236
StatsSyncCounters
#define StatsSyncCounters(tv)
Definition: counters.h:138
FlowTimeoutCounters_::flows_notimeout
uint32_t flows_notimeout
Definition: flow-manager.c:117
flow_timeouts_normal
FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX]
Definition: flow.c:97
StatsRegisterAvgCounter
uint16_t StatsRegisterAvgCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the average of all the values assigned to it.
Definition: counters.c:981
FlowCounters_::flow_mgr_spare
uint16_t flow_mgr_spare
Definition: flow-manager.c:593
FlowTimeoutCounters_::flows_aside
uint32_t flows_aside
Definition: flow-manager.c:120
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:376
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:316
flow.h
FlowQueuePrivateAppendFlow
void FlowQueuePrivateAppendFlow(FlowQueuePrivate *fqc, Flow *f)
Definition: flow-queue.c:65
FlowEndCountersRegister
void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
Definition: flow-util.c:241
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:91
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
evicted
Flow * evicted
Definition: flow-hash.h:4
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:961
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
BITS
#define BITS
Flow_::timeout_at
uint32_t timeout_at
Definition: flow.h:387
FlowEndCounters_
Definition: flow-util.h:154
flow_recycler_ctrl_mutex
SCCtrlMutex flow_recycler_ctrl_mutex
Definition: flow-manager.c:90
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:76
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:360
FlowCounters_::flow_emerg_mode_over
uint16_t flow_emerg_mode_over
Definition: flow-manager.c:595
TM_FLAG_MANAGEMENT_TM
#define TM_FLAG_MANAGEMENT_TM
Definition: tm-modules.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:172
FlowTimeoutCounters_::rows_skipped
uint32_t rows_skipped
Definition: flow-manager.c:112
FlowCounters_::flow_mgr_flows_checked
uint16_t flow_mgr_flows_checked
Definition: flow-manager.c:597
SCTIME_USECS
#define SCTIME_USECS(t)
Definition: util-time.h:56
FlowCounters_::flow_mgr_rows_sec
uint16_t flow_mgr_rows_sec
Definition: flow-manager.c:591
FlowQueueTimeoutCounters
Definition: flow-manager.c:584
FlowTimeoutCounters::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-worker.c:62
FQLOCK_UNLOCK
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:75
HttpRangeContainersTimeoutHash
uint32_t HttpRangeContainersTimeoutHash(const SCTime_t ts)
Definition: app-layer-htp-range.c:188