suricata
flow-manager.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2023 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22  * \author Victor Julien <victor@inliniac.net>
23  */
24 
25 #include "suricata-common.h"
26 #include "conf.h"
27 #include "threadvars.h"
28 #include "tm-threads.h"
29 #include "runmodes.h"
30 
31 #include "util-random.h"
32 #include "util-time.h"
33 
34 #include "flow.h"
35 #include "flow-queue.h"
36 #include "flow-hash.h"
37 #include "flow-util.h"
38 #include "flow-private.h"
39 #include "flow-timeout.h"
40 #include "flow-manager.h"
41 #include "flow-storage.h"
42 #include "flow-spare-pool.h"
43 
44 #include "stream-tcp-reassemble.h"
45 #include "stream-tcp.h"
46 
47 #include "util-unittest.h"
48 #include "util-unittest-helper.h"
49 #include "util-device.h"
50 
51 #include "util-debug.h"
52 
53 #include "threads.h"
54 #include "detect.h"
55 #include "detect-engine-state.h"
56 #include "stream.h"
57 
58 #include "app-layer-parser.h"
59 
60 #include "host-timeout.h"
61 #include "defrag-timeout.h"
62 #include "ippair-timeout.h"
63 #include "app-layer-htp-range.h"
64 
65 #include "output-flow.h"
66 
67 #include "runmode-unix-socket.h"
68 
69 /* Run mode selected at suricata.c */
70 extern int run_mode;
71 
72 /** queue to pass flows to cleanup/log thread(s) */
74 
75 /* multi flow manager support */
76 static uint32_t flowmgr_number = 1;
77 /* atomic counter for flow managers, to assign instance id */
78 SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
79 
80 /* multi flow recycler support */
81 static uint32_t flowrec_number = 1;
82 /* atomic counter for flow recyclers, to assign instance id */
83 SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
84 SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
85 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
86 
87 static SCCtrlCondT flow_manager_ctrl_cond = PTHREAD_COND_INITIALIZER;
88 static SCCtrlMutex flow_manager_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
89 static SCCtrlCondT flow_recycler_ctrl_cond = PTHREAD_COND_INITIALIZER;
90 static SCCtrlMutex flow_recycler_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
91 
93 {
94  SCCtrlMutexLock(&flow_manager_ctrl_mutex);
95  SCCtrlCondSignal(&flow_manager_ctrl_cond);
96  SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
97 }
98 
100 {
101  SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
102  SCCtrlCondSignal(&flow_recycler_ctrl_cond);
103  SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
104 }
105 
107 {
108  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
109 }
110 
112 {
113  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
114 }
115 
116 /* 1 seconds */
117 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
118 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
119 /* 0.3 seconds */
120 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
121 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
122 #define NEW_FLOW_COUNT_COND 10
123 
124 typedef struct FlowTimeoutCounters_ {
125  uint32_t rows_checked;
126  uint32_t rows_skipped;
127  uint32_t rows_empty;
128  uint32_t rows_maxlen;
129 
130  uint32_t flows_checked;
131  uint32_t flows_notimeout;
132  uint32_t flows_timeout;
133  uint32_t flows_removed;
134  uint32_t flows_aside;
136 
137  uint32_t bypassed_count;
138  uint64_t bypassed_pkts;
139  uint64_t bypassed_bytes;
141 
142 /**
143  * \brief Used to disable flow manager thread(s).
144  *
145  * \todo Kinda hackish since it uses the tv name to identify flow manager
146  * thread. We need an all weather identification scheme.
147  */
149 {
151  /* flow manager thread(s) is/are a part of mgmt threads */
152  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
153  if (strncasecmp(tv->name, thread_name_flow_mgr,
154  strlen(thread_name_flow_mgr)) == 0)
155  {
157  }
158  }
160 
161  struct timeval start_ts;
162  struct timeval cur_ts;
163  gettimeofday(&start_ts, NULL);
164 
165 again:
166  gettimeofday(&cur_ts, NULL);
167  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
168  FatalError("unable to get all flow manager "
169  "threads to shutdown in time");
170  }
171 
173  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
174  if (strncasecmp(tv->name, thread_name_flow_mgr,
175  strlen(thread_name_flow_mgr)) == 0)
176  {
179  /* sleep outside lock */
180  SleepMsec(1);
181  goto again;
182  }
183  }
184  }
186 
187  /* reset count, so we can kill and respawn (unix socket) */
188  SC_ATOMIC_SET(flowmgr_cnt, 0);
189  return;
190 }
191 
192 /** \internal
193  * \brief check if a flow is timed out
194  *
195  * \param f flow
196  * \param ts timestamp
197  *
198  * \retval 0 not timed out
199  * \retval 1 timed out
200  */
201 static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
202 {
203  uint32_t flow_times_out_at = f->timeout_at;
204  if (emerg) {
206  flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
207  }
208  if (*next_ts == 0 || flow_times_out_at < *next_ts)
209  *next_ts = flow_times_out_at;
210 
211  /* do the timeout check */
212  if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) {
213  return 0;
214  }
215 
216  return 1;
217 }
218 
219 /** \internal
220  * \brief check timeout of captured bypassed flow by querying capture method
221  *
222  * \param f Flow
223  * \param ts timestamp
224  * \param counters Flow timeout counters
225  *
226  * \retval 0 not timeout
227  * \retval 1 timeout (or not capture bypassed)
228  */
229 static inline int FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
230 {
231 #ifdef CAPTURE_OFFLOAD
232  if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
233  return 1;
234  }
235 
237  if (fc && fc->BypassUpdate) {
238  /* flow will be possibly updated */
239  uint64_t pkts_tosrc = fc->tosrcpktcnt;
240  uint64_t bytes_tosrc = fc->tosrcbytecnt;
241  uint64_t pkts_todst = fc->todstpktcnt;
242  uint64_t bytes_todst = fc->todstbytecnt;
243  bool update = fc->BypassUpdate(f, fc->bypass_data, SCTIME_SECS(ts));
244  if (update) {
245  SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f));
246  pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
247  bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
248  pkts_todst = fc->todstpktcnt - pkts_todst;
249  bytes_todst = fc->todstbytecnt - bytes_todst;
250  if (f->livedev) {
251  SC_ATOMIC_ADD(f->livedev->bypassed,
252  pkts_tosrc + pkts_todst);
253  }
254  counters->bypassed_pkts += pkts_tosrc + pkts_todst;
255  counters->bypassed_bytes += bytes_tosrc + bytes_todst;
256  return 0;
257  } else {
258  SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
259  if (f->livedev) {
260  if (FLOW_IS_IPV4(f)) {
261  LiveDevSubBypassStats(f->livedev, 1, AF_INET);
262  } else if (FLOW_IS_IPV6(f)) {
263  LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
264  }
265  }
266  counters->bypassed_count++;
267  return 1;
268  }
269  }
270 #endif /* CAPTURE_OFFLOAD */
271  return 1;
272 }
273 
274 typedef struct FlowManagerTimeoutThread {
275  /* used to temporarily store flows that have timed out and are
276  * removed from the hash */
279 
280 static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
281 {
282  FlowQueuePrivate recycle = { NULL, NULL, 0 };
283  counters->flows_aside += td->aside_queue.len;
284 
285  uint32_t cnt = 0;
286  Flow *f;
287  while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
288  /* flow is still locked */
289 
290  if (f->proto == IPPROTO_TCP &&
292  !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1) {
293  /* Send the flow to its thread */
295  FLOWLOCK_UNLOCK(f);
296  /* flow ownership is passed to the worker thread */
297 
298  counters->flows_aside_needs_work++;
299  continue;
300  }
301  FLOWLOCK_UNLOCK(f);
302 
303  FlowQueuePrivateAppendFlow(&recycle, f);
304  if (recycle.len == 100) {
307  }
308  cnt++;
309  }
310  if (recycle.len) {
313  }
314  return cnt;
315 }
316 
317 /**
318  * \internal
319  *
320  * \brief check all flows in a hash row for timing out
321  *
322  * \param f last flow in the hash row
323  * \param ts timestamp
324  * \param emergency bool indicating emergency mode
325  * \param counters ptr to FlowTimeoutCounters structure
326  */
327 static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts,
328  int emergency, FlowTimeoutCounters *counters, uint32_t *next_ts)
329 {
330  uint32_t checked = 0;
331  Flow *prev_f = NULL;
332 
333  do {
334  checked++;
335 
336  /* check flow timeout based on lastts and state. Both can be
337  * accessed w/o Flow lock as we do have the hash row lock (so flow
338  * can't disappear) and flow_state is atomic. lastts can only
339  * be modified when we have both the flow and hash row lock */
340 
341  /* timeout logic goes here */
342  if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {
343 
344  counters->flows_notimeout++;
345 
346  prev_f = f;
347  f = f->next;
348  continue;
349  }
350 
351  FLOWLOCK_WRLOCK(f);
352 
353  Flow *next_flow = f->next;
354 
355  /* never prune a flow that is used by a packet we
356  * are currently processing in one of the threads */
357  if (!FlowBypassedTimeout(f, ts, counters)) {
358  FLOWLOCK_UNLOCK(f);
359  prev_f = f;
360  f = f->next;
361  continue;
362  }
363 
365 
366  counters->flows_timeout++;
367 
368  RemoveFromHash(f, prev_f);
369 
371  /* flow is still locked in the queue */
372 
373  f = next_flow;
374  } while (f != NULL);
375 
376  counters->flows_checked += checked;
377  if (checked > counters->rows_maxlen)
378  counters->rows_maxlen = checked;
379 }
380 
381 static void FlowManagerHashRowClearEvictedList(
383 {
384  do {
385  FLOWLOCK_WRLOCK(f);
386  Flow *next_flow = f->next;
387  f->next = NULL;
388  f->fb = NULL;
389 
391  /* flow is still locked in the queue */
392 
393  f = next_flow;
394  } while (f != NULL);
395 }
396 
397 /**
398  * \brief time out flows from the hash
399  *
400  * \param ts timestamp
401  * \param hash_min min hash index to consider
402  * \param hash_max max hash index to consider
403  * \param counters ptr to FlowTimeoutCounters structure
404  *
405  * \retval cnt number of timed out flow
406  */
407 static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const uint32_t hash_min,
408  const uint32_t hash_max, FlowTimeoutCounters *counters)
409 {
410  uint32_t cnt = 0;
411  const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
412  const uint32_t rows_checked = hash_max - hash_min;
413  uint32_t rows_skipped = 0;
414  uint32_t rows_empty = 0;
415 
416 #if __WORDSIZE==64
417 #define BITS 64
418 #define TYPE uint64_t
419 #else
420 #define BITS 32
421 #define TYPE uint32_t
422 #endif
423 
424  const uint32_t ts_secs = SCTIME_SECS(ts);
425  for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
426  TYPE check_bits = 0;
427  const uint32_t check = MIN(BITS, (hash_max - idx));
428  for (uint32_t i = 0; i < check; i++) {
429  FlowBucket *fb = &flow_hash[idx+i];
430  check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(
431  fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs)
432  << (TYPE)i;
433  }
434  if (check_bits == 0)
435  continue;
436 
437  for (uint32_t i = 0; i < check; i++) {
438  FlowBucket *fb = &flow_hash[idx+i];
439  if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) {
440  FBLOCK_LOCK(fb);
441  Flow *evicted = NULL;
442  if (fb->evicted != NULL || fb->head != NULL) {
443  if (fb->evicted != NULL) {
444  /* transfer out of bucket so we can do additional work outside
445  * of the bucket lock */
446  evicted = fb->evicted;
447  fb->evicted = NULL;
448  }
449  if (fb->head != NULL) {
450  uint32_t next_ts = 0;
451  FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
452 
453  if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
454  SC_ATOMIC_SET(fb->next_ts, next_ts);
455  }
456  if (fb->evicted == NULL && fb->head == NULL) {
457  SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
458  }
459  } else {
460  SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
461  rows_empty++;
462  }
463  FBLOCK_UNLOCK(fb);
464  /* processed evicted list */
465  if (evicted) {
466  FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
467  }
468  } else {
469  rows_skipped++;
470  }
471  }
472  if (td->aside_queue.len) {
473  cnt += ProcessAsideQueue(td, counters);
474  }
475  }
476 
477  counters->rows_checked += rows_checked;
478  counters->rows_skipped += rows_skipped;
479  counters->rows_empty += rows_empty;
480 
481  if (td->aside_queue.len) {
482  cnt += ProcessAsideQueue(td, counters);
483  }
484  counters->flows_removed += cnt;
485  /* coverity[missing_unlock : FALSE] */
486  return cnt;
487 }
488 
489 /** \internal
490  * \brief handle timeout for a slice of hash rows
491  * If we wrap around we call FlowTimeoutHash twice */
492 static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts,
493  const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
494  const uint32_t rows, uint32_t *pos)
495 {
496  uint32_t start = 0;
497  uint32_t end = 0;
498  uint32_t cnt = 0;
499  uint32_t rows_left = rows;
500 
501 again:
502  start = hash_min + (*pos);
503  if (start >= hash_max) {
504  start = hash_min;
505  }
506  end = start + rows_left;
507  if (end > hash_max) {
508  end = hash_max;
509  }
510  *pos = (end == hash_max) ? hash_min : end;
511  rows_left = rows_left - (end - start);
512 
513  cnt += FlowTimeoutHash(td, ts, start, end, counters);
514  if (rows_left) {
515  goto again;
516  }
517  return cnt;
518 }
519 
520 /**
521  * \internal
522  *
523  * \brief move all flows out of a hash row
524  *
525  * \param f last flow in the hash row
526  *
527  * \retval cnt removed out flows
528  */
529 static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
530 {
531  uint32_t cnt = 0;
532 
533  do {
534  FLOWLOCK_WRLOCK(f);
535 
536  Flow *next_flow = f->next;
537 
538  /* remove from the hash */
539  if (mode == 0) {
540  RemoveFromHash(f, NULL);
541  } else {
542  FlowBucket *fb = f->fb;
543  fb->evicted = f->next;
544  f->next = NULL;
545  f->fb = NULL;
546  }
548 
549  /* no one is referring to this flow, removed from hash
550  * so we can unlock it and move it to the recycle queue. */
551  FLOWLOCK_UNLOCK(f);
552  FlowQueuePrivateAppendFlow(recycle_q, f);
553 
554  cnt++;
555 
556  f = next_flow;
557  } while (f != NULL);
558 
559  return cnt;
560 }
561 
562 /**
563  * \brief remove all flows from the hash
564  *
565  * \retval cnt number of removes out flows
566  */
567 static uint32_t FlowCleanupHash(void)
568 {
569  FlowQueuePrivate local_queue = { NULL, NULL, 0 };
570  uint32_t cnt = 0;
571 
572  for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
573  FlowBucket *fb = &flow_hash[idx];
574 
575  FBLOCK_LOCK(fb);
576 
577  if (fb->head != NULL) {
578  /* we have a flow, or more than one */
579  cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
580  }
581  if (fb->evicted != NULL) {
582  /* we have a flow, or more than one */
583  cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
584  }
585 
586  FBLOCK_UNLOCK(fb);
587  if (local_queue.len >= 25) {
588  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
590  }
591  }
592  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
594 
595  return cnt;
596 }
597 
598 typedef struct FlowQueueTimeoutCounters {
599  uint32_t flows_removed;
600  uint32_t flows_timeout;
602 
603 typedef struct FlowCounters_ {
606 
607  uint16_t flow_mgr_spare;
610 
616 
618 
622 
623  uint16_t memcap_pressure;
626 
627 typedef struct FlowManagerThreadData_ {
628  uint32_t instance;
629  uint32_t min;
630  uint32_t max;
631 
633 
636 
637 static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
638 {
639  fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t);
640  fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", t);
641 
642  fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
643  fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
644  fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
645 
646  fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t);
647  fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t);
648  fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t);
649  fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t);
650  fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t);
651  fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t);
652 
653  fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
654  fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
655  fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
656 
657  fc->memcap_pressure = StatsRegisterCounter("memcap.pressure", t);
658  fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap.pressure_max", t);
659 }
660 
661 static void FlowCountersUpdate(
662  ThreadVars *th_v, const FlowManagerThreadData *ftd, const FlowTimeoutCounters *counters)
663 {
664  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters->flows_checked);
665  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters->flows_notimeout);
666 
667  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters->flows_timeout);
668  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters->flows_aside);
670  (uint64_t)counters->flows_aside_needs_work);
671 
672  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters->bypassed_count);
673  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters->bypassed_pkts);
674  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters->bypassed_bytes);
675 
676  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters->rows_maxlen);
677 }
678 
679 static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
680 {
682  if (ftd == NULL)
683  return TM_ECODE_FAILED;
684 
685  ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
686  SCLogDebug("flow manager instance %u", ftd->instance);
687 
688  /* set the min and max value used for hash row walking
689  * each thread has it's own section of the flow hash */
690  uint32_t range = flow_config.hash_size / flowmgr_number;
691 
692  ftd->min = ftd->instance * range;
693  ftd->max = (ftd->instance + 1) * range;
694 
695  /* last flow-manager takes on hash_size % flowmgr_number extra rows */
696  if ((ftd->instance + 1) == flowmgr_number) {
697  ftd->max = flow_config.hash_size;
698  }
700 
701  SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
702 
703  /* pass thread data back to caller */
704  *data = ftd;
705 
706  FlowCountersInit(t, &ftd->cnt);
707 
708  PacketPoolInit();
709  return TM_ECODE_OK;
710 }
711 
712 static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
713 {
716  SCFree(data);
717  return TM_ECODE_OK;
718 }
719 
720 /** \internal
721  * \brief calculate number of rows to scan and how much time to sleep
722  * based on the busy score `mp` (0 idle, 100 max busy).
723  *
724  * We try to to make sure we scan the hash once a second. The number size
725  * of the slice of the hash scanned is determined by our busy score 'mp'.
726  * We sleep for the remainder of the second after processing the slice,
727  * or at least an approximation of it.
728  * A minimum busy score of 10 is assumed to avoid a longer than 10 second
729  * full hash pass. This is to avoid burstiness in scanning when there is
730  * a rapid increase of the busy score, which could lead to the flow manager
731  * suddenly scanning a much larger slice of the hash leading to a burst
732  * in scan/eviction work.
733  */
734 static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
735  uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
736 {
737  if (emergency) {
738  *wu_rows = rows;
739  *wu_sleep = 250;
740  return;
741  }
742  /* minimum busy score is 10 */
743  const uint32_t emp = MAX(mp, 10);
744  const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));
745  /* calc how much time we estimate the work will take, in ms. We assume
746  * each row takes an average of 1usec. Maxing out at 1sec. */
747  const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);
748  /* calc how much time we need to sleep to get to the per second cadence
749  * but sleeping for at least 250ms. */
750  const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
751  SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
752  sleep_per_unit);
753 
754  *wu_sleep = sleep_per_unit;
755  *wu_rows = rows_per_sec;
756  *rows_sec = rows_per_sec;
757 }
758 
759 /** \brief Thread that manages the flow table and times out flows.
760  *
761  * \param td ThreadVars cast to void ptr
762  *
763  * Keeps an eye on the spare list, alloc flows if needed...
764  */
765 static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
766 {
767  FlowManagerThreadData *ftd = thread_data;
768  const uint32_t rows = ftd->max - ftd->min;
769  const bool time_is_live = TimeModeIsLive();
770 
771  uint32_t emerg_over_cnt = 0;
772  uint64_t next_run_ms = 0;
773  uint32_t pos = 0;
774  uint32_t rows_sec = 0;
775  uint32_t rows_per_wu = 0;
776  uint64_t sleep_per_wu = 0;
777  bool prev_emerg = false;
778  uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
779  SCTime_t ts;
780 
781  /* don't start our activities until time is setup */
782  while (!TimeModeIsReady()) {
783  if (suricata_ctl_flags != 0)
784  return TM_ECODE_OK;
785  usleep(10);
786  }
787 
788  uint32_t mp = MemcapsGetPressure() * 100;
789  if (ftd->instance == 0) {
790  StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
791  StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
792  }
793  GetWorkUnitSizing(rows, mp, false, &sleep_per_wu, &rows_per_wu, &rows_sec);
794  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
795 
797 
798  while (1)
799  {
800  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
804  }
805 
806  bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);
807 
808  /* Get the time */
809  ts = TimeGet();
810  SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts));
811  uint64_t ts_ms = SCTIME_MSECS(ts);
812  const bool emerge_p = (emerg && !prev_emerg);
813  if (emerge_p) {
814  next_run_ms = 0;
815  prev_emerg = true;
816  SCLogNotice("Flow emergency mode entered...");
817  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter);
818  }
819  if (ts_ms >= next_run_ms) {
820  if (ftd->instance == 0) {
821  const uint32_t sq_len = FlowSpareGetPoolSize();
822  const uint32_t spare_perc = sq_len * 100 / MAX(flow_config.prealloc, 1);
823  /* see if we still have enough spare flows */
824  if (spare_perc < 90 || spare_perc > 110) {
825  FlowSparePoolUpdate(sq_len);
826  }
827  }
828 
829  /* try to time out flows */
830  // clang-format off
831  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, };
832  // clang-format on
833 
834  if (emerg) {
835  /* in emergency mode, do a full pass of the hash table */
836  FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters);
837  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
838  } else {
839  SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,
840  rows_per_wu);
841 
842  const uint32_t ppos = pos;
843  FlowTimeoutHashInChunks(
844  &ftd->timeout, ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos);
845  if (ppos > pos) {
846  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
847  }
848  }
849 
850  const uint32_t spare_pool_len = FlowSpareGetPoolSize();
851  StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len);
852 
853  FlowCountersUpdate(th_v, ftd, &counters);
854 
855  if (emerg == true) {
856  SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32
857  "flow_spare_q status: %" PRIu32 "%% flows at the queue",
858  spare_pool_len, flow_config.prealloc,
859  spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
860 
861  /* only if we have pruned this "emergency_recovery" percentage
862  * of flows, we will unset the emergency bit */
863  if ((spare_pool_len * 100 / MAX(flow_config.prealloc, 1)) >
865  emerg_over_cnt++;
866  } else {
867  emerg_over_cnt = 0;
868  }
869 
870  if (emerg_over_cnt >= 30) {
871  SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
873 
874  emerg = false;
875  prev_emerg = false;
876  emerg_over_cnt = 0;
877  SCLogNotice("Flow emergency mode over, back to normal... unsetting"
878  " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", "
879  "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32
880  "%% flows at the queue",
881  (uintmax_t)SCTIME_SECS(ts), (uintmax_t)SCTIME_USECS(ts),
882  spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
883 
884  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
885  }
886  }
887 
888  /* update work units */
889  const uint32_t pmp = mp;
890  mp = MemcapsGetPressure() * 100;
891  if (ftd->instance == 0) {
892  StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
893  StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
894  }
895  GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
896  if (pmp != mp) {
897  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
898  }
899 
900  next_run_ms = ts_ms + sleep_per_wu;
901  }
902  if (other_last_sec == 0 || other_last_sec < (uint32_t)SCTIME_SECS(ts)) {
903  if (ftd->instance == 0) {
908  other_last_sec = (uint32_t)SCTIME_SECS(ts);
909  }
910  }
911 
912  if (TmThreadsCheckFlag(th_v, THV_KILL)) {
913  StatsSyncCounters(th_v);
914  break;
915  }
916 
917  if (emerg || !time_is_live) {
918  usleep(250);
919  } else {
920  struct timeval cond_tv;
921  gettimeofday(&cond_tv, NULL);
922  struct timeval add_tv;
923  add_tv.tv_sec = 0;
924  add_tv.tv_usec = (sleep_per_wu * 1000);
925  timeradd(&cond_tv, &add_tv, &cond_tv);
926 
927  struct timespec cond_time = FROM_TIMEVAL(cond_tv);
928  SCCtrlMutexLock(&flow_manager_ctrl_mutex);
929  while (1) {
930  int rc = SCCtrlCondTimedwait(
931  &flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time);
932  if (rc == ETIMEDOUT || rc < 0)
933  break;
934  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
935  break;
936  }
937  }
938  SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
939  }
940 
941  SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
942 
944  }
945  return TM_ECODE_OK;
946 }
947 
948 /** \brief spawn the flow manager thread */
950 {
951  intmax_t setting = 1;
952  (void)ConfGetInt("flow.managers", &setting);
953 
954  if (setting < 1 || setting > 1024) {
955  FatalError("invalid flow.managers setting %" PRIdMAX, setting);
956  }
957  flowmgr_number = (uint32_t)setting;
958 
959  SCLogConfig("using %u flow manager threads", flowmgr_number);
961 
962  for (uint32_t u = 0; u < flowmgr_number; u++) {
963  char name[TM_THREAD_NAME_MAX];
964  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
965 
966  ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
967  "FlowManager", 0);
968  BUG_ON(tv_flowmgr == NULL);
969 
970  if (tv_flowmgr == NULL) {
971  FatalError("flow manager thread creation failed");
972  }
973  if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
974  FatalError("flow manager thread spawn failed");
975  }
976  }
977  return;
978 }
979 
980 typedef struct FlowRecyclerThreadData_ {
982 
983  uint16_t counter_flows;
986 
991 
992 static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
993 {
995  if (ftd == NULL)
996  return TM_ECODE_FAILED;
997  if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
998  SCLogError("initializing flow log API for thread failed");
999  SCFree(ftd);
1000  return TM_ECODE_FAILED;
1001  }
1002  SCLogDebug("output_thread_data %p", ftd->output_thread_data);
1003 
1004  ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t);
1005  ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t);
1006  ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t);
1007 
1008  ftd->counter_flow_active = StatsRegisterCounter("flow.active", t);
1009  ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t);
1010 
1011  FlowEndCountersRegister(t, &ftd->fec);
1012 
1013  *data = ftd;
1014  return TM_ECODE_OK;
1015 }
1016 
1017 static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
1018 {
1020 
1022  if (ftd->output_thread_data != NULL)
1024 
1025  SCFree(data);
1026  return TM_ECODE_OK;
1027 }
1028 
1029 static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
1030 {
1031  FLOWLOCK_WRLOCK(f);
1032 
1033  (void)OutputFlowLog(tv, ftd->output_thread_data, f);
1034 
1035  FlowEndCountersUpdate(tv, &ftd->fec, f);
1036  if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
1038  }
1040 
1041  FlowClearMemory(f, f->protomap);
1042  FLOWLOCK_UNLOCK(f);
1043 }
1044 
1045 extern uint32_t flow_spare_pool_block_size;
1046 
1047 /** \brief Thread that manages timed out flows.
1048  *
1049  * \param td ThreadVars cast to void ptr
1050  */
1051 static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
1052 {
1053  FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
1054  BUG_ON(ftd == NULL);
1055  const bool time_is_live = TimeModeIsLive();
1056  uint64_t recycled_cnt = 0;
1057  FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
1058 
1060 
1061  while (1)
1062  {
1063  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
1067  }
1068  SC_ATOMIC_ADD(flowrec_busy,1);
1070 
1071  StatsAddUI64(th_v, ftd->counter_queue_avg, list.len);
1072  StatsSetUI64(th_v, ftd->counter_queue_max, list.len);
1073 
1074  const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
1075 
1076  /* Get the time */
1077  SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(TimeGet()));
1078 
1079  uint64_t cnt = 0;
1080  Flow *f;
1081  while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
1082  Recycler(th_v, ftd, f);
1083  cnt++;
1084 
1085  /* for every full sized block, add it to the spare pool */
1086  FlowQueuePrivateAppendFlow(&ret_queue, f);
1087  if (ret_queue.len == flow_spare_pool_block_size) {
1088  FlowSparePoolReturnFlows(&ret_queue);
1089  }
1090  }
1091  if (ret_queue.len > 0) {
1092  FlowSparePoolReturnFlows(&ret_queue);
1093  }
1094  if (cnt > 0) {
1095  recycled_cnt += cnt;
1096  StatsAddUI64(th_v, ftd->counter_flows, cnt);
1097  }
1098  SC_ATOMIC_SUB(flowrec_busy,1);
1099 
1100  if (bail) {
1101  break;
1102  }
1103 
1104  const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY);
1105  if (emerg || !time_is_live) {
1106  usleep(250);
1107  } else {
1108  struct timeval cond_tv;
1109  gettimeofday(&cond_tv, NULL);
1110  cond_tv.tv_sec += 1;
1111  struct timespec cond_time = FROM_TIMEVAL(cond_tv);
1112  SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
1113  while (1) {
1114  int rc = SCCtrlCondTimedwait(
1115  &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time);
1116  if (rc == ETIMEDOUT || rc < 0) {
1117  break;
1118  }
1119  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
1120  break;
1121  }
1122  if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) {
1123  break;
1124  }
1125  }
1126  SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
1127  }
1128 
1129  SCLogDebug("woke up...");
1130 
1132  }
1133  StatsSyncCounters(th_v);
1134  SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
1135  return TM_ECODE_OK;
1136 }
1137 
1138 static bool FlowRecyclerReadyToShutdown(void)
1139 {
1140  if (SC_ATOMIC_GET(flowrec_busy) != 0) {
1141  return false;
1142  }
1143  uint32_t len = 0;
1145  len = flow_recycle_q.qlen;
1147 
1148  return ((len == 0));
1149 }
1150 
1151 /** \brief spawn the flow recycler thread */
1153 {
1154  intmax_t setting = 1;
1155  (void)ConfGetInt("flow.recyclers", &setting);
1156 
1157  if (setting < 1 || setting > 1024) {
1158  FatalError("invalid flow.recyclers setting %" PRIdMAX, setting);
1159  }
1160  flowrec_number = (uint32_t)setting;
1161 
1162  SCLogConfig("using %u flow recycler threads", flowrec_number);
1163 
1164  for (uint32_t u = 0; u < flowrec_number; u++) {
1165  char name[TM_THREAD_NAME_MAX];
1166  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
1167 
1168  ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name,
1169  "FlowRecycler", 0);
1170 
1171  if (tv_flowrec == NULL) {
1172  FatalError("flow recycler thread creation failed");
1173  }
1174  if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) {
1175  FatalError("flow recycler thread spawn failed");
1176  }
1177  }
1178  return;
1179 }
1180 
1181 /**
1182  * \brief Used to disable flow recycler thread(s).
1183  *
1184  * \note this should only be called when the flow manager is already gone
1185  *
1186  * \todo Kinda hackish since it uses the tv name to identify flow recycler
1187  * thread. We need an all weather identification scheme.
1188  */
1190 {
1191  /* move all flows still in the hash to the recycler queue */
1192 #ifndef DEBUG
1193  (void)FlowCleanupHash();
1194 #else
1195  uint32_t flows = FlowCleanupHash();
1196  SCLogDebug("flows to progress: %u", flows);
1197 #endif
1198 
1199  /* make sure all flows are processed */
1200  do {
1202  usleep(10);
1203  } while (FlowRecyclerReadyToShutdown() == false);
1204 
1206  /* flow recycler thread(s) is/are a part of mgmt threads */
1207  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1208  if (strncasecmp(tv->name, thread_name_flow_rec,
1209  strlen(thread_name_flow_rec)) == 0)
1210  {
1212  }
1213  }
1215 
1216  struct timeval start_ts;
1217  struct timeval cur_ts;
1218  gettimeofday(&start_ts, NULL);
1219 
1220 again:
1221  gettimeofday(&cur_ts, NULL);
1222  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1223  FatalError("unable to get all flow recycler "
1224  "threads to shutdown in time");
1225  }
1226 
1228  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1229  if (strncasecmp(tv->name, thread_name_flow_rec,
1230  strlen(thread_name_flow_rec)) == 0)
1231  {
1235  /* sleep outside lock */
1236  SleepMsec(1);
1237  goto again;
1238  }
1239  }
1240  }
1242 
1243  /* reset count, so we can kill and respawn (unix socket) */
1244  SC_ATOMIC_SET(flowrec_cnt, 0);
1245  return;
1246 }
1247 
1249 {
1250  tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1251  tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1252  tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1253  tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1256  SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1257 
1258  SC_ATOMIC_INIT(flowmgr_cnt);
1259  SC_ATOMIC_INITPTR(flow_timeouts);
1260 }
1261 
1263 {
1264  tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1265  tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1266  tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1267  tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1270  SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1271 
1272  SC_ATOMIC_INIT(flowrec_cnt);
1273  SC_ATOMIC_INIT(flowrec_busy);
1274 }
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:74
FlowTimeoutCounters_::rows_empty
uint32_t rows_empty
Definition: flow-manager.c:127
FlowSparePoolUpdate
void FlowSparePoolUpdate(uint32_t size)
Definition: flow-spare-pool.c:219
FlowManagerThreadSpawn
void FlowManagerThreadSpawn(void)
spawn the flow manager thread
Definition: flow-manager.c:949
tm-threads.h
ConfGetInt
int ConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition: conf.c:399
FROM_TIMEVAL
#define FROM_TIMEVAL(timev)
initialize a 'struct timespec' from a 'struct timeval'.
Definition: util-time.h:116
OutputFlowLog
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition: output-flow.c:85
FlowRecyclerThreadData_::fec
FlowEndCounters fec
Definition: flow-manager.c:989
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1660
app-layer-htp-range.h
FlowTimeoutCounters
Definition: flow-worker.c:61
FlowForceReassemblyForFlow
void FlowForceReassemblyForFlow(Flow *f)
Definition: flow-timeout.c:345
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1093
run_mode
int run_mode
Definition: suricata.c:175
FlowBucket_::evicted
Flow * evicted
Definition: flow-hash.h:48
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:167
FLOW_IS_IPV6
#define FLOW_IS_IPV6(f)
Definition: flow.h:166
FlowManagerThreadData_::max
uint32_t max
Definition: flow-manager.c:630
FlowManagerTimeoutThread
Definition: flow-manager.c:274
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:84
FlowSpareGetPoolSize
uint32_t FlowSpareGetPoolSize(void)
Definition: flow-spare-pool.c:47
flow-util.h
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:314
FlowManagerThreadData_::cnt
FlowCounters cnt
Definition: flow-manager.c:632
FBLOCK_LOCK
#define FBLOCK_LOCK(fb)
Definition: flow-hash.h:73
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
TMM_FLOWRECYCLER
@ TMM_FLOWRECYCLER
Definition: tm-threads-common.h:73
stream-tcp.h
GetFlowBypassInfoID
FlowStorageId GetFlowBypassInfoID(void)
Definition: flow-util.c:212
FlowBypassInfo_
Definition: flow.h:526
FlowCnf_::emergency_recovery
uint32_t emergency_recovery
Definition: flow.h:295
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
FlowCnf_::hash_size
uint32_t hash_size
Definition: flow.h:289
FlowManagerThreadData_::instance
uint32_t instance
Definition: flow-manager.c:628
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
IPPairTimeoutHash
uint32_t IPPairTimeoutHash(SCTime_t ts)
time out ippairs from the hash
Definition: ippair-timeout.c:132
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:99
StatsRegisterGlobalCounter
uint16_t StatsRegisterGlobalCounter(const char *name, uint64_t(*Func)(void))
Registers a counter, which represents a global value.
Definition: counters.c:1029
Flow_::proto
uint8_t proto
Definition: flow.h:373
SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt)
threads.h
flow-private.h
FlowCounters_::flow_mgr_flows_notimeout
uint16_t flow_mgr_flows_notimeout
Definition: flow-manager.c:612
Flow_
Flow data structure.
Definition: flow.h:351
TYPE
#define TYPE
Flow_::protomap
uint8_t protomap
Definition: flow.h:445
SC_ATOMIC_EXTERN
SC_ATOMIC_EXTERN(unsigned int, flow_flags)
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:85
FlowProtoTimeout_
Definition: flow.h:515
StatsSetUI64
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:210
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
flow-hash.h
TmModuleFlowRecyclerRegister
void TmModuleFlowRecyclerRegister(void)
Definition: flow-manager.c:1262
FlowTimeoutCounters_::flows_removed
uint32_t flows_removed
Definition: flow-manager.c:133
flow_spare_pool_block_size
uint32_t flow_spare_pool_block_size
Definition: flow-spare-pool.c:43
FlowBypassInfo_::tosrcbytecnt
uint64_t tosrcbytecnt
Definition: flow.h:531
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
FlowGetMemuse
uint64_t FlowGetMemuse(void)
Definition: flow.c:140
MIN
#define MIN(x, y)
Definition: suricata-common.h:391
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:80
FlowTimeoutCounters_::flows_timeout
uint32_t flows_timeout
Definition: flow-manager.c:132
defrag-timeout.h
FlowRecyclerThreadData_::counter_flows
uint16_t counter_flows
Definition: flow-manager.c:983
stream-tcp-reassemble.h
FLOW_ACTION_DROP
#define FLOW_ACTION_DROP
Definition: flow.h:67
LiveDevSubBypassStats
void LiveDevSubBypassStats(LiveDevice *dev, uint64_t cnt, int family)
Definition: util-device.c:515
MAX
#define MAX(x, y)
Definition: suricata-common.h:395
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:85
FlowQueuePrivate_::len
uint32_t len
Definition: flow-queue.h:44
Flow_::protoctx
void * protoctx
Definition: flow.h:441
FlowCounters_
Definition: flow-manager.c:603
FlowManagerTimeoutThread
struct FlowManagerTimeoutThread FlowManagerTimeoutThread
SCCtrlCondSignal
#define SCCtrlCondSignal
Definition: threads-debug.h:384
FlowCounters_::flow_mgr_flows_aside
uint16_t flow_mgr_flows_aside
Definition: flow-manager.c:614
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:92
FLOW_TIMEOUT_REASSEMBLY_DONE
#define FLOW_TIMEOUT_REASSEMBLY_DONE
Definition: flow.h:94
runmode-unix-socket.h
util-unittest.h
FlowTimeoutCounters_::bypassed_count
uint32_t bypassed_count
Definition: flow-manager.c:137
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:37
FlowSparePoolReturnFlows
void FlowSparePoolReturnFlows(FlowQueuePrivate *fqp)
Definition: flow-spare-pool.c:122
TM_THREAD_NAME_MAX
#define TM_THREAD_NAME_MAX
Definition: tm-threads.h:49
util-unittest-helper.h
FlowCnf_::prealloc
uint32_t prealloc
Definition: flow.h:290
FlowQueueTimeoutCounters::flows_removed
uint32_t flows_removed
Definition: flow-manager.c:599
FLOWLOCK_UNLOCK
#define FLOWLOCK_UNLOCK(fb)
Definition: flow.h:268
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:84
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:246
Flow_::flow_state
FlowStateType flow_state
Definition: flow.h:412
FQLOCK_LOCK
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:73
FlowWakeupFlowManagerThread
void FlowWakeupFlowManagerThread(void)
Definition: flow-manager.c:92
FlowDisableFlowRecyclerThread
void FlowDisableFlowRecyclerThread(void)
Used to disable flow recycler thread(s).
Definition: flow-manager.c:1189
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:50
FlowTimeoutCounters_::rows_maxlen
uint32_t rows_maxlen
Definition: flow-manager.c:128
FlowCounters_::flow_bypassed_bytes
uint16_t flow_bypassed_bytes
Definition: flow-manager.c:621
FlowTimeoutCounters_::flows_checked
uint32_t flows_checked
Definition: flow-manager.c:130
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:45
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:107
flow-spare-pool.h
StatsDecr
void StatsDecr(ThreadVars *tv, uint16_t id)
Decrements the local counter.
Definition: counters.c:188
Flow_::fb
struct FlowBucket_ * fb
Definition: flow.h:488
FlowCounters_::flow_mgr_full_pass
uint16_t flow_mgr_full_pass
Definition: flow-manager.c:604
StatsRegisterMaxCounter
uint16_t StatsRegisterMaxCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the maximum of all the values assigned to it.
Definition: counters.c:1011
SC_ATOMIC_MEMORY_ORDER_RELAXED
#define SC_ATOMIC_MEMORY_ORDER_RELAXED
Definition: util-atomic.h:165
util-device.h
util-debug.h
FlowWakeupFlowRecyclerThread
void FlowWakeupFlowRecyclerThread(void)
Definition: flow-manager.c:99
FlowBypassInfo_::todstbytecnt
uint64_t todstbytecnt
Definition: flow.h:533
FlowBypassInfo_::BypassUpdate
bool(* BypassUpdate)(Flow *f, void *data, time_t tsec)
Definition: flow.h:527
FLOW_PROTO_MAX
@ FLOW_PROTO_MAX
Definition: flow-private.h:74
FlowRecyclerThreadData_::counter_queue_avg
uint16_t counter_queue_avg
Definition: flow-manager.c:984
OutputFlowLogThreadInit
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void *initdata, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
Definition: output-flow.c:123
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
FlowCounters_::flow_mgr_rows_maxlen
uint16_t flow_mgr_rows_maxlen
Definition: flow-manager.c:617
FLOWLOCK_WRLOCK
#define FLOWLOCK_WRLOCK(fb)
Definition: flow.h:265
FlowTimeoutsReset
#define FlowTimeoutsReset()
Definition: flow-manager.h:30
FlowForceReassemblyNeedReassembly
int FlowForceReassemblyNeedReassembly(Flow *f)
Check if a flow needs forced reassembly, or any other processing.
Definition: flow-timeout.c:287
FlowDisableFlowManagerThread
void FlowDisableFlowManagerThread(void)
Used to disable flow manager thread(s).
Definition: flow-manager.c:148
SCCtrlCondT
#define SCCtrlCondT
Definition: threads-debug.h:382
detect.h
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1762
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:66
TimeModeIsReady
bool TimeModeIsReady(void)
Definition: util-time.c:92
Flow_::flow_end_flags
uint8_t flow_end_flags
Definition: flow.h:447
THV_KILL
#define THV_KILL
Definition: threadvars.h:39
MemcapsGetPressure
float MemcapsGetPressure(void)
Definition: runmode-unix-socket.c:135
FlowBypassInfo_::todstpktcnt
uint64_t todstpktcnt
Definition: flow.h:532
util-time.h
FlowQueuePrivateGetFromTop
Flow * FlowQueuePrivateGetFromTop(FlowQueuePrivate *fqc)
Definition: flow-queue.c:151
FlowBypassInfo_::bypass_data
void * bypass_data
Definition: flow.h:529
FlowQueueAppendPrivate
void FlowQueueAppendPrivate(FlowQueue *fq, FlowQueuePrivate *fqc)
Definition: flow-queue.c:119
FlowQueueTimeoutCounters
struct FlowQueueTimeoutCounters FlowQueueTimeoutCounters
FlowCounters_::flow_mgr_flows_aside_needs_work
uint16_t flow_mgr_flows_aside_needs_work
Definition: flow-manager.c:615
app-layer-parser.h
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
FlowRecyclerThreadData_
Definition: flow-manager.c:980
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:300
FLOW_IS_IPV4
#define FLOW_IS_IPV4(f)
Definition: flow.h:164
FlowRecyclerThreadData_::counter_tcp_active_sessions
uint16_t counter_tcp_active_sessions
Definition: flow-manager.c:988
FlowCounters_::flow_bypassed_cnt_clo
uint16_t flow_bypassed_cnt_clo
Definition: flow-manager.c:619
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:83
SC_ATOMIC_SUB
#define SC_ATOMIC_SUB(name, val)
sub a value from our atomic variable
Definition: util-atomic.h:341
flow_timeouts_emerg
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
Definition: flow.c:98
TimeModeIsLive
bool TimeModeIsLive(void)
Definition: util-time.c:111
stream.h
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:376
FlowTimeoutsEmergency
void FlowTimeoutsEmergency(void)
Definition: flow-manager.c:111
FlowTimeoutCounters
struct FlowTimeoutCounters_ FlowTimeoutCounters
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:33
TimeGet
SCTime_t TimeGet(void)
Definition: util-time.c:152
conf.h
FlowRecyclerThreadData_::output_thread_data
void * output_thread_data
Definition: flow-manager.c:981
FlowTimeoutCounters_::rows_checked
uint32_t rows_checked
Definition: flow-manager.c:125
FBLOCK_UNLOCK
#define FBLOCK_UNLOCK(fb)
Definition: flow-hash.h:75
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:83
FlowClearMemory
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition: flow.c:1103
flow_timeouts_delta
FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]
Definition: flow.c:99
FlowRecyclerThreadSpawn
void FlowRecyclerThreadSpawn(void)
spawn the flow recycler thread
Definition: flow-manager.c:1152
output-flow.h
detect-engine-state.h
Data structures and function prototypes for keeping state for the detection engine.
flow-timeout.h
flow-queue.h
TmModule_::name
const char * name
Definition: tm-modules.h:45
FlowRecyclerThreadData
struct FlowRecyclerThreadData_ FlowRecyclerThreadData
FlowBypassInfo_::tosrcpktcnt
uint64_t tosrcpktcnt
Definition: flow.h:530
host-timeout.h
SCCtrlCondTimedwait
#define SCCtrlCondTimedwait
Definition: threads-debug.h:385
StreamTcpThreadCacheCleanup
void StreamTcpThreadCacheCleanup(void)
Definition: stream-tcp-cache.c:134
runmodes.h
FlowTimeoutCounters_::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-manager.c:135
FlowGetStorageById
void * FlowGetStorageById(const Flow *f, FlowStorageId id)
Definition: flow-storage.c:40
Flow_::next
struct Flow_ * next
Definition: flow.h:396
FlowQueue_
Definition: flow-queue.h:49
FlowCounters_::flow_bypassed_pkts
uint16_t flow_bypassed_pkts
Definition: flow-manager.c:620
TMM_FLOWMANAGER
@ TMM_FLOWMANAGER
Definition: tm-threads-common.h:72
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:38
flow_hash
FlowBucket * flow_hash
Definition: flow-hash.c:58
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:378
FlowQueueExtractPrivate
FlowQueuePrivate FlowQueueExtractPrivate(FlowQueue *fq)
Definition: flow-queue.c:140
flow-storage.h
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
FlowTimeoutCounters_
Definition: flow-manager.c:124
FlowManagerThreadData_
Definition: flow-manager.c:627
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flow-manager.h
suricata-common.h
SCCtrlMutex
#define SCCtrlMutex
Definition: threads-debug.h:373
DefragTimeoutHash
uint32_t DefragTimeoutHash(SCTime_t ts)
time out tracker from the hash
Definition: defrag-timeout.c:121
FlowManagerThreadData
struct FlowManagerThreadData_ FlowManagerThreadData
flow_config
FlowConfig flow_config
Definition: flow.c:102
FlowQueueTimeoutCounters::flows_timeout
uint32_t flows_timeout
Definition: flow-manager.c:600
TmModuleFlowManagerRegister
void TmModuleFlowManagerRegister(void)
Definition: flow-manager.c:1248
FlowCounters_::memcap_pressure
uint16_t memcap_pressure
Definition: flow-manager.c:623
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
FlowTimeoutsInit
void FlowTimeoutsInit(void)
Definition: flow-manager.c:106
SCTIME_SECS
#define SCTIME_SECS(t)
Definition: util-time.h:57
SC_ATOMIC_LOAD_EXPLICIT
#define SC_ATOMIC_LOAD_EXPLICIT(name, order)
Definition: util-atomic.h:378
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:48
FatalError
#define FatalError(...)
Definition: util-debug.h:502
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
StatsSyncCounters
void StatsSyncCounters(ThreadVars *tv)
Definition: counters.c:456
Flow_::livedev
struct LiveDevice_ * livedev
Definition: flow.h:398
threadvars.h
FlowQueuePrivate_
Definition: flow-queue.h:41
StatsAddUI64
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:146
SCLogConfig
struct SCLogConfig_ SCLogConfig
Holds the config state used by the logging api.
FlowRecyclerThreadData_::counter_queue_max
uint16_t counter_queue_max
Definition: flow-manager.c:985
FlowTimeoutCounters_::bypassed_bytes
uint64_t bypassed_bytes
Definition: flow-manager.c:139
FlowManagerThreadData_::min
uint32_t min
Definition: flow-manager.c:629
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
OutputFlowLogThreadDeinit
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
Definition: output-flow.c:163
flow_recycle_q
FlowQueue flow_recycle_q
Definition: flow-manager.c:73
SCFree
#define SCFree(p)
Definition: util-mem.h:61
FlowTimeoutCounters_::bypassed_pkts
uint64_t bypassed_pkts
Definition: flow-manager.c:138
FlowCounters_::memcap_pressure_max
uint16_t memcap_pressure_max
Definition: flow-manager.c:624
Flow_::flags
uint32_t flags
Definition: flow.h:421
HostTimeoutHash
uint32_t HostTimeoutHash(SCTime_t ts)
time out hosts from the hash
Definition: host-timeout.c:128
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:317
FlowManagerThreadData_::timeout
FlowManagerTimeoutThread timeout
Definition: flow-manager.c:634
util-random.h
FlowCounters
struct FlowCounters_ FlowCounters
FlowCounters_::flow_emerg_mode_enter
uint16_t flow_emerg_mode_enter
Definition: flow-manager.c:608
FLOW_END_FLAG_SHUTDOWN
#define FLOW_END_FLAG_SHUTDOWN
Definition: flow.h:240
FLOW_EMERGENCY
#define FLOW_EMERGENCY
Definition: flow-private.h:37
ippair-timeout.h
FlowRecyclerThreadData_::counter_flow_active
uint16_t counter_flow_active
Definition: flow-manager.c:987
FlowCounters_::flow_mgr_flows_timeout
uint16_t flow_mgr_flows_timeout
Definition: flow-manager.c:613
FlowManagerTimeoutThread::aside_queue
FlowQueuePrivate aside_queue
Definition: flow-manager.c:277
timeradd
#define timeradd(a, b, r)
Definition: util-time.h:128
FLOW_END_FLAG_TIMEOUT
#define FLOW_END_FLAG_TIMEOUT
Definition: flow.h:238
FlowTimeoutCounters_::flows_notimeout
uint32_t flows_notimeout
Definition: flow-manager.c:131
flow_timeouts_normal
FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX]
Definition: flow.c:97
StatsRegisterAvgCounter
uint16_t StatsRegisterAvgCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the average of all the values assigned to it.
Definition: counters.c:991
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition: counters.c:461
FlowCounters_::flow_mgr_spare
uint16_t flow_mgr_spare
Definition: flow-manager.c:607
FlowTimeoutCounters_::flows_aside
uint32_t flows_aside
Definition: flow-manager.c:134
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:277
flow.h
FlowQueuePrivateAppendFlow
void FlowQueuePrivateAppendFlow(FlowQueuePrivate *fqc, Flow *f)
Definition: flow-queue.c:65
FlowEndCountersRegister
void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
Definition: flow-util.c:236
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:91
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
evicted
Flow * evicted
Definition: flow-hash.h:4
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:971
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
BITS
#define BITS
Flow_::timeout_at
uint32_t timeout_at
Definition: flow.h:391
FlowEndCounters_
Definition: flow-util.h:148
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:77
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:359
FlowCounters_::flow_emerg_mode_over
uint16_t flow_emerg_mode_over
Definition: flow-manager.c:609
TM_FLAG_MANAGEMENT_TM
#define TM_FLAG_MANAGEMENT_TM
Definition: tm-modules.h:37
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:172
FlowTimeoutCounters_::rows_skipped
uint32_t rows_skipped
Definition: flow-manager.c:126
FlowCounters_::flow_mgr_flows_checked
uint16_t flow_mgr_flows_checked
Definition: flow-manager.c:611
SCTIME_USECS
#define SCTIME_USECS(t)
Definition: util-time.h:56
FlowCounters_::flow_mgr_rows_sec
uint16_t flow_mgr_rows_sec
Definition: flow-manager.c:605
FlowQueueTimeoutCounters
Definition: flow-manager.c:598
FlowTimeoutCounters::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-worker.c:62
FQLOCK_UNLOCK
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:75
HttpRangeContainersTimeoutHash
uint32_t HttpRangeContainersTimeoutHash(const SCTime_t ts)
Definition: app-layer-htp-range.c:188