suricata
flow-manager.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2023 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22  * \author Victor Julien <victor@inliniac.net>
23  */
24 
25 #include "suricata-common.h"
26 #include "conf.h"
27 #include "threadvars.h"
28 #include "tm-threads.h"
29 #include "runmodes.h"
30 
31 #include "util-time.h"
32 
33 #include "flow.h"
34 #include "flow-queue.h"
35 #include "flow-hash.h"
36 #include "flow-util.h"
37 #include "flow-private.h"
38 #include "flow-timeout.h"
39 #include "flow-manager.h"
40 #include "flow-storage.h"
41 #include "flow-spare-pool.h"
42 
43 #include "stream-tcp.h"
44 
45 #include "util-device.h"
46 
47 #include "util-debug.h"
48 
49 #include "threads.h"
50 
51 #include "host-timeout.h"
52 #include "defrag-timeout.h"
53 #include "ippair-timeout.h"
54 #include "app-layer-htp-range.h"
55 
56 #include "output-flow.h"
57 
58 #include "runmode-unix-socket.h"
59 
60 /** queue to pass flows to cleanup/log thread(s) */
62 
63 /* multi flow manager support */
64 static uint32_t flowmgr_number = 1;
65 /* atomic counter for flow managers, to assign instance id */
66 SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
67 
68 /* multi flow recycler support */
69 static uint32_t flowrec_number = 1;
70 /* atomic counter for flow recyclers, to assign instance id */
71 SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
72 SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
73 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
74 
75 static SCCtrlCondT flow_manager_ctrl_cond = PTHREAD_COND_INITIALIZER;
76 static SCCtrlMutex flow_manager_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
77 static SCCtrlCondT flow_recycler_ctrl_cond = PTHREAD_COND_INITIALIZER;
78 static SCCtrlMutex flow_recycler_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
79 
81 {
82  SCCtrlMutexLock(&flow_manager_ctrl_mutex);
83  SCCtrlCondSignal(&flow_manager_ctrl_cond);
84  SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
85 }
86 
88 {
89  SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
90  SCCtrlCondSignal(&flow_recycler_ctrl_cond);
91  SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
92 }
93 
94 void FlowTimeoutsInit(void)
95 {
96  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
97 }
98 
100 {
101  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
102 }
103 
104 /* 1 seconds */
105 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
106 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
107 /* 0.3 seconds */
108 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
109 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
110 #define NEW_FLOW_COUNT_COND 10
111 
112 typedef struct FlowTimeoutCounters_ {
113  uint32_t rows_checked;
114  uint32_t rows_skipped;
115  uint32_t rows_empty;
116  uint32_t rows_maxlen;
117 
118  uint32_t flows_checked;
119  uint32_t flows_notimeout;
120  uint32_t flows_timeout;
121  uint32_t flows_removed;
122  uint32_t flows_aside;
124 
125  uint32_t bypassed_count;
126  uint64_t bypassed_pkts;
127  uint64_t bypassed_bytes;
129 
130 /**
131  * \brief Used to disable flow manager thread(s).
132  *
133  * \todo Kinda hackish since it uses the tv name to identify flow manager
134  * thread. We need an all weather identification scheme.
135  */
137 {
139  /* flow manager thread(s) is/are a part of mgmt threads */
140  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
141  if (strncasecmp(tv->name, thread_name_flow_mgr,
142  strlen(thread_name_flow_mgr)) == 0)
143  {
145  }
146  }
148 
149  struct timeval start_ts;
150  struct timeval cur_ts;
151  gettimeofday(&start_ts, NULL);
152 
153 again:
154  gettimeofday(&cur_ts, NULL);
155  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
156  FatalError("unable to get all flow manager "
157  "threads to shutdown in time");
158  }
159 
161  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
162  if (strncasecmp(tv->name, thread_name_flow_mgr,
163  strlen(thread_name_flow_mgr)) == 0)
164  {
167  /* sleep outside lock */
168  SleepMsec(1);
169  goto again;
170  }
171  }
172  }
174 
175  /* reset count, so we can kill and respawn (unix socket) */
176  SC_ATOMIC_SET(flowmgr_cnt, 0);
177  return;
178 }
179 
180 /** \internal
181  * \brief check if a flow is timed out
182  *
183  * \param f flow
184  * \param ts timestamp
185  *
186  * \retval 0 not timed out
187  * \retval 1 timed out
188  */
189 static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
190 {
191  uint32_t flow_times_out_at = f->timeout_at;
192  if (emerg) {
194  flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
195  }
196  if (*next_ts == 0 || flow_times_out_at < *next_ts)
197  *next_ts = flow_times_out_at;
198 
199  /* do the timeout check */
200  if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) {
201  return 0;
202  }
203 
204  return 1;
205 }
206 
207 /** \internal
208  * \brief check timeout of captured bypassed flow by querying capture method
209  *
210  * \param f Flow
211  * \param ts timestamp
212  * \param counters Flow timeout counters
213  *
214  * \retval 0 not timeout
215  * \retval 1 timeout (or not capture bypassed)
216  */
217 static inline int FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
218 {
219 #ifdef CAPTURE_OFFLOAD
220  if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
221  return 1;
222  }
223 
225  if (fc && fc->BypassUpdate) {
226  /* flow will be possibly updated */
227  uint64_t pkts_tosrc = fc->tosrcpktcnt;
228  uint64_t bytes_tosrc = fc->tosrcbytecnt;
229  uint64_t pkts_todst = fc->todstpktcnt;
230  uint64_t bytes_todst = fc->todstbytecnt;
231  bool update = fc->BypassUpdate(f, fc->bypass_data, SCTIME_SECS(ts));
232  if (update) {
233  SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f));
234  pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
235  bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
236  pkts_todst = fc->todstpktcnt - pkts_todst;
237  bytes_todst = fc->todstbytecnt - bytes_todst;
238  if (f->livedev) {
239  SC_ATOMIC_ADD(f->livedev->bypassed,
240  pkts_tosrc + pkts_todst);
241  }
242  counters->bypassed_pkts += pkts_tosrc + pkts_todst;
243  counters->bypassed_bytes += bytes_tosrc + bytes_todst;
244  return 0;
245  } else {
246  SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
247  if (f->livedev) {
248  if (FLOW_IS_IPV4(f)) {
249  LiveDevSubBypassStats(f->livedev, 1, AF_INET);
250  } else if (FLOW_IS_IPV6(f)) {
251  LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
252  }
253  }
254  counters->bypassed_count++;
255  return 1;
256  }
257  }
258 #endif /* CAPTURE_OFFLOAD */
259  return 1;
260 }
261 
262 typedef struct FlowManagerTimeoutThread {
263  /* used to temporarily store flows that have timed out and are
264  * removed from the hash */
267 
268 static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
269 {
270  FlowQueuePrivate recycle = { NULL, NULL, 0 };
271  counters->flows_aside += td->aside_queue.len;
272 
273  uint32_t cnt = 0;
274  Flow *f;
275  while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
276  /* flow is still locked */
277 
278  if (f->proto == IPPROTO_TCP &&
280  !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1) {
281  /* Send the flow to its thread */
283  FLOWLOCK_UNLOCK(f);
284  /* flow ownership is passed to the worker thread */
285 
286  counters->flows_aside_needs_work++;
287  continue;
288  }
289  FLOWLOCK_UNLOCK(f);
290 
291  FlowQueuePrivateAppendFlow(&recycle, f);
292  if (recycle.len == 100) {
295  }
296  cnt++;
297  }
298  if (recycle.len) {
301  }
302  return cnt;
303 }
304 
305 /**
306  * \internal
307  *
308  * \brief check all flows in a hash row for timing out
309  *
310  * \param f last flow in the hash row
311  * \param ts timestamp
312  * \param emergency bool indicating emergency mode
313  * \param counters ptr to FlowTimeoutCounters structure
314  */
315 static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts,
316  int emergency, FlowTimeoutCounters *counters, uint32_t *next_ts)
317 {
318  uint32_t checked = 0;
319  Flow *prev_f = NULL;
320 
321  do {
322  checked++;
323 
324  /* check flow timeout based on lastts and state. Both can be
325  * accessed w/o Flow lock as we do have the hash row lock (so flow
326  * can't disappear) and flow_state is atomic. lastts can only
327  * be modified when we have both the flow and hash row lock */
328 
329  /* timeout logic goes here */
330  if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {
331 
332  counters->flows_notimeout++;
333 
334  prev_f = f;
335  f = f->next;
336  continue;
337  }
338 
339  FLOWLOCK_WRLOCK(f);
340 
341  Flow *next_flow = f->next;
342 
343  /* never prune a flow that is used by a packet we
344  * are currently processing in one of the threads */
345  if (!FlowBypassedTimeout(f, ts, counters)) {
346  FLOWLOCK_UNLOCK(f);
347  prev_f = f;
348  f = f->next;
349  continue;
350  }
351 
353 
354  counters->flows_timeout++;
355 
356  RemoveFromHash(f, prev_f);
357 
359  /* flow is still locked in the queue */
360 
361  f = next_flow;
362  } while (f != NULL);
363 
364  counters->flows_checked += checked;
365  if (checked > counters->rows_maxlen)
366  counters->rows_maxlen = checked;
367 }
368 
369 static void FlowManagerHashRowClearEvictedList(
371 {
372  do {
373  FLOWLOCK_WRLOCK(f);
374  Flow *next_flow = f->next;
375  f->next = NULL;
376  f->fb = NULL;
377 
379  /* flow is still locked in the queue */
380 
381  f = next_flow;
382  } while (f != NULL);
383 }
384 
385 /**
386  * \brief time out flows from the hash
387  *
388  * \param ts timestamp
389  * \param hash_min min hash index to consider
390  * \param hash_max max hash index to consider
391  * \param counters ptr to FlowTimeoutCounters structure
392  *
393  * \retval cnt number of timed out flow
394  */
395 static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const uint32_t hash_min,
396  const uint32_t hash_max, FlowTimeoutCounters *counters)
397 {
398  uint32_t cnt = 0;
399  const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
400  const uint32_t rows_checked = hash_max - hash_min;
401  uint32_t rows_skipped = 0;
402  uint32_t rows_empty = 0;
403 
404 #if __WORDSIZE==64
405 #define BITS 64
406 #define TYPE uint64_t
407 #else
408 #define BITS 32
409 #define TYPE uint32_t
410 #endif
411 
412  const uint32_t ts_secs = SCTIME_SECS(ts);
413  for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
414  TYPE check_bits = 0;
415  const uint32_t check = MIN(BITS, (hash_max - idx));
416  for (uint32_t i = 0; i < check; i++) {
417  FlowBucket *fb = &flow_hash[idx+i];
418  check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(
419  fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs)
420  << (TYPE)i;
421  }
422  if (check_bits == 0)
423  continue;
424 
425  for (uint32_t i = 0; i < check; i++) {
426  FlowBucket *fb = &flow_hash[idx+i];
427  if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) {
428  FBLOCK_LOCK(fb);
429  Flow *evicted = NULL;
430  if (fb->evicted != NULL || fb->head != NULL) {
431  if (fb->evicted != NULL) {
432  /* transfer out of bucket so we can do additional work outside
433  * of the bucket lock */
434  evicted = fb->evicted;
435  fb->evicted = NULL;
436  }
437  if (fb->head != NULL) {
438  uint32_t next_ts = 0;
439  FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
440 
441  if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
442  SC_ATOMIC_SET(fb->next_ts, next_ts);
443  }
444  if (fb->evicted == NULL && fb->head == NULL) {
445  SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
446  }
447  } else {
448  SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
449  rows_empty++;
450  }
451  FBLOCK_UNLOCK(fb);
452  /* processed evicted list */
453  if (evicted) {
454  FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
455  }
456  } else {
457  rows_skipped++;
458  }
459  }
460  if (td->aside_queue.len) {
461  cnt += ProcessAsideQueue(td, counters);
462  }
463  }
464 
465  counters->rows_checked += rows_checked;
466  counters->rows_skipped += rows_skipped;
467  counters->rows_empty += rows_empty;
468 
469  if (td->aside_queue.len) {
470  cnt += ProcessAsideQueue(td, counters);
471  }
472  counters->flows_removed += cnt;
473  /* coverity[missing_unlock : FALSE] */
474  return cnt;
475 }
476 
477 /** \internal
478  * \brief handle timeout for a slice of hash rows
479  * If we wrap around we call FlowTimeoutHash twice */
480 static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts,
481  const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
482  const uint32_t rows, uint32_t *pos)
483 {
484  uint32_t start = 0;
485  uint32_t end = 0;
486  uint32_t cnt = 0;
487  uint32_t rows_left = rows;
488 
489 again:
490  start = hash_min + (*pos);
491  if (start >= hash_max) {
492  start = hash_min;
493  }
494  end = start + rows_left;
495  if (end > hash_max) {
496  end = hash_max;
497  }
498  *pos = (end == hash_max) ? hash_min : end;
499  rows_left = rows_left - (end - start);
500 
501  cnt += FlowTimeoutHash(td, ts, start, end, counters);
502  if (rows_left) {
503  goto again;
504  }
505  return cnt;
506 }
507 
508 /**
509  * \internal
510  *
511  * \brief move all flows out of a hash row
512  *
513  * \param f last flow in the hash row
514  *
515  * \retval cnt removed out flows
516  */
517 static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
518 {
519  uint32_t cnt = 0;
520 
521  do {
522  FLOWLOCK_WRLOCK(f);
523 
524  Flow *next_flow = f->next;
525 
526  /* remove from the hash */
527  if (mode == 0) {
528  RemoveFromHash(f, NULL);
529  } else {
530  FlowBucket *fb = f->fb;
531  fb->evicted = f->next;
532  f->next = NULL;
533  f->fb = NULL;
534  }
536 
537  /* no one is referring to this flow, removed from hash
538  * so we can unlock it and move it to the recycle queue. */
539  FLOWLOCK_UNLOCK(f);
540  FlowQueuePrivateAppendFlow(recycle_q, f);
541 
542  cnt++;
543 
544  f = next_flow;
545  } while (f != NULL);
546 
547  return cnt;
548 }
549 
550 /**
551  * \brief remove all flows from the hash
552  *
553  * \retval cnt number of removes out flows
554  */
555 static uint32_t FlowCleanupHash(void)
556 {
557  FlowQueuePrivate local_queue = { NULL, NULL, 0 };
558  uint32_t cnt = 0;
559 
560  for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
561  FlowBucket *fb = &flow_hash[idx];
562 
563  FBLOCK_LOCK(fb);
564 
565  if (fb->head != NULL) {
566  /* we have a flow, or more than one */
567  cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
568  }
569  if (fb->evicted != NULL) {
570  /* we have a flow, or more than one */
571  cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
572  }
573 
574  FBLOCK_UNLOCK(fb);
575  if (local_queue.len >= 25) {
576  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
578  }
579  }
580  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
582 
583  return cnt;
584 }
585 
586 typedef struct FlowQueueTimeoutCounters {
587  uint32_t flows_removed;
588  uint32_t flows_timeout;
590 
591 typedef struct FlowCounters_ {
594 
595  uint16_t flow_mgr_spare;
598 
604 
606 
610 
611  uint16_t memcap_pressure;
614 
615 typedef struct FlowManagerThreadData_ {
616  uint32_t instance;
617  uint32_t min;
618  uint32_t max;
619 
621 
624 
625 static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
626 {
627  fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t);
628  fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", t);
629 
630  fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
631  fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
632  fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
633 
634  fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t);
635  fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t);
636  fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t);
637  fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t);
638  fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t);
639  fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t);
640 
641  fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
642  fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
643  fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
644 
645  fc->memcap_pressure = StatsRegisterCounter("memcap.pressure", t);
646  fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap.pressure_max", t);
647 }
648 
649 static void FlowCountersUpdate(
650  ThreadVars *th_v, const FlowManagerThreadData *ftd, const FlowTimeoutCounters *counters)
651 {
652  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters->flows_checked);
653  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters->flows_notimeout);
654 
655  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters->flows_timeout);
656  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters->flows_aside);
658  (uint64_t)counters->flows_aside_needs_work);
659 
660  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters->bypassed_count);
661  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters->bypassed_pkts);
662  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters->bypassed_bytes);
663 
664  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters->rows_maxlen);
665 }
666 
667 static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
668 {
670  if (ftd == NULL)
671  return TM_ECODE_FAILED;
672 
673  ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
674  SCLogDebug("flow manager instance %u", ftd->instance);
675 
676  /* set the min and max value used for hash row walking
677  * each thread has it's own section of the flow hash */
678  uint32_t range = flow_config.hash_size / flowmgr_number;
679 
680  ftd->min = ftd->instance * range;
681  ftd->max = (ftd->instance + 1) * range;
682 
683  /* last flow-manager takes on hash_size % flowmgr_number extra rows */
684  if ((ftd->instance + 1) == flowmgr_number) {
685  ftd->max = flow_config.hash_size;
686  }
688 
689  SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
690 
691  /* pass thread data back to caller */
692  *data = ftd;
693 
694  FlowCountersInit(t, &ftd->cnt);
695 
696  PacketPoolInit();
697  return TM_ECODE_OK;
698 }
699 
700 static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
701 {
704  SCFree(data);
705  return TM_ECODE_OK;
706 }
707 
708 /** \internal
709  * \brief calculate number of rows to scan and how much time to sleep
710  * based on the busy score `mp` (0 idle, 100 max busy).
711  *
712  * We try to to make sure we scan the hash once a second. The number size
713  * of the slice of the hash scanned is determined by our busy score 'mp'.
714  * We sleep for the remainder of the second after processing the slice,
715  * or at least an approximation of it.
716  * A minimum busy score of 10 is assumed to avoid a longer than 10 second
717  * full hash pass. This is to avoid burstiness in scanning when there is
718  * a rapid increase of the busy score, which could lead to the flow manager
719  * suddenly scanning a much larger slice of the hash leading to a burst
720  * in scan/eviction work.
721  */
722 static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
723  uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
724 {
725  if (emergency) {
726  *wu_rows = rows;
727  *wu_sleep = 250;
728  return;
729  }
730  /* minimum busy score is 10 */
731  const uint32_t emp = MAX(mp, 10);
732  const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));
733  /* calc how much time we estimate the work will take, in ms. We assume
734  * each row takes an average of 1usec. Maxing out at 1sec. */
735  const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);
736  /* calc how much time we need to sleep to get to the per second cadence
737  * but sleeping for at least 250ms. */
738  const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
739  SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
740  sleep_per_unit);
741 
742  *wu_sleep = sleep_per_unit;
743  *wu_rows = rows_per_sec;
744  *rows_sec = rows_per_sec;
745 }
746 
747 /** \brief Thread that manages the flow table and times out flows.
748  *
749  * \param td ThreadVars cast to void ptr
750  *
751  * Keeps an eye on the spare list, alloc flows if needed...
752  */
753 static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
754 {
755  FlowManagerThreadData *ftd = thread_data;
756  const uint32_t rows = ftd->max - ftd->min;
757  const bool time_is_live = TimeModeIsLive();
758 
759  uint32_t emerg_over_cnt = 0;
760  uint64_t next_run_ms = 0;
761  uint32_t pos = 0;
762  uint32_t rows_sec = 0;
763  uint32_t rows_per_wu = 0;
764  uint64_t sleep_per_wu = 0;
765  bool prev_emerg = false;
766  uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
767  SCTime_t ts;
768 
769  /* don't start our activities until time is setup */
770  while (!TimeModeIsReady()) {
771  if (suricata_ctl_flags != 0)
772  return TM_ECODE_OK;
773  usleep(10);
774  }
775 
776  uint32_t mp = MemcapsGetPressure() * 100;
777  if (ftd->instance == 0) {
778  StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
779  StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
780  }
781  GetWorkUnitSizing(rows, mp, false, &sleep_per_wu, &rows_per_wu, &rows_sec);
782  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
783 
785 
786  while (1)
787  {
788  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
792  }
793 
794  bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);
795 
796  /* Get the time */
797  ts = TimeGet();
798  SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts));
799  uint64_t ts_ms = SCTIME_MSECS(ts);
800  const bool emerge_p = (emerg && !prev_emerg);
801  if (emerge_p) {
802  next_run_ms = 0;
803  prev_emerg = true;
804  SCLogNotice("Flow emergency mode entered...");
805  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter);
806  }
807  if (ts_ms >= next_run_ms) {
808  if (ftd->instance == 0) {
809  const uint32_t sq_len = FlowSpareGetPoolSize();
810  const uint32_t spare_perc = sq_len * 100 / MAX(flow_config.prealloc, 1);
811  /* see if we still have enough spare flows */
812  if (spare_perc < 90 || spare_perc > 110) {
813  FlowSparePoolUpdate(sq_len);
814  }
815  }
816 
817  /* try to time out flows */
818  // clang-format off
819  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, };
820  // clang-format on
821 
822  if (emerg) {
823  /* in emergency mode, do a full pass of the hash table */
824  FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters);
825  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
826  } else {
827  SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,
828  rows_per_wu);
829 
830  const uint32_t ppos = pos;
831  FlowTimeoutHashInChunks(
832  &ftd->timeout, ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos);
833  if (ppos > pos) {
834  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
835  }
836  }
837 
838  const uint32_t spare_pool_len = FlowSpareGetPoolSize();
839  StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len);
840 
841  FlowCountersUpdate(th_v, ftd, &counters);
842 
843  if (emerg == true) {
844  SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32
845  "flow_spare_q status: %" PRIu32 "%% flows at the queue",
846  spare_pool_len, flow_config.prealloc,
847  spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
848 
849  /* only if we have pruned this "emergency_recovery" percentage
850  * of flows, we will unset the emergency bit */
851  if ((spare_pool_len * 100 / MAX(flow_config.prealloc, 1)) >
853  emerg_over_cnt++;
854  } else {
855  emerg_over_cnt = 0;
856  }
857 
858  if (emerg_over_cnt >= 30) {
859  SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
861 
862  emerg = false;
863  prev_emerg = false;
864  emerg_over_cnt = 0;
865  SCLogNotice("Flow emergency mode over, back to normal... unsetting"
866  " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", "
867  "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32
868  "%% flows at the queue",
869  (uintmax_t)SCTIME_SECS(ts), (uintmax_t)SCTIME_USECS(ts),
870  spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
871 
872  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
873  }
874  }
875 
876  /* update work units */
877  const uint32_t pmp = mp;
878  mp = MemcapsGetPressure() * 100;
879  if (ftd->instance == 0) {
880  StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
881  StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
882  }
883  GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
884  if (pmp != mp) {
885  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
886  }
887 
888  next_run_ms = ts_ms + sleep_per_wu;
889  }
890  if (other_last_sec == 0 || other_last_sec < (uint32_t)SCTIME_SECS(ts)) {
891  if (ftd->instance == 0) {
896  other_last_sec = (uint32_t)SCTIME_SECS(ts);
897  }
898  }
899 
900  if (TmThreadsCheckFlag(th_v, THV_KILL)) {
901  StatsSyncCounters(th_v);
902  break;
903  }
904 
905  if (emerg || !time_is_live) {
906  usleep(250);
907  } else {
908  struct timeval cond_tv;
909  gettimeofday(&cond_tv, NULL);
910  struct timeval add_tv;
911  add_tv.tv_sec = 0;
912  add_tv.tv_usec = (sleep_per_wu * 1000);
913  timeradd(&cond_tv, &add_tv, &cond_tv);
914 
915  struct timespec cond_time = FROM_TIMEVAL(cond_tv);
916  SCCtrlMutexLock(&flow_manager_ctrl_mutex);
917  while (1) {
918  int rc = SCCtrlCondTimedwait(
919  &flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time);
920  if (rc == ETIMEDOUT || rc < 0)
921  break;
922  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
923  break;
924  }
925  }
926  SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
927  }
928 
929  SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
930 
932  }
933  return TM_ECODE_OK;
934 }
935 
936 /** \brief spawn the flow manager thread */
938 {
939  intmax_t setting = 1;
940  (void)ConfGetInt("flow.managers", &setting);
941 
942  if (setting < 1 || setting > 1024) {
943  FatalError("invalid flow.managers setting %" PRIdMAX, setting);
944  }
945  flowmgr_number = (uint32_t)setting;
946 
947  SCLogConfig("using %u flow manager threads", flowmgr_number);
949 
950  for (uint32_t u = 0; u < flowmgr_number; u++) {
951  char name[TM_THREAD_NAME_MAX];
952  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
953 
954  ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
955  "FlowManager", 0);
956  BUG_ON(tv_flowmgr == NULL);
957 
958  if (tv_flowmgr == NULL) {
959  FatalError("flow manager thread creation failed");
960  }
961  if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
962  FatalError("flow manager thread spawn failed");
963  }
964  }
965  return;
966 }
967 
968 typedef struct FlowRecyclerThreadData_ {
970 
971  uint16_t counter_flows;
974 
979 
980 static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
981 {
983  if (ftd == NULL)
984  return TM_ECODE_FAILED;
985  if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
986  SCLogError("initializing flow log API for thread failed");
987  SCFree(ftd);
988  return TM_ECODE_FAILED;
989  }
990  SCLogDebug("output_thread_data %p", ftd->output_thread_data);
991 
992  ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t);
993  ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t);
994  ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t);
995 
996  ftd->counter_flow_active = StatsRegisterCounter("flow.active", t);
997  ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t);
998 
999  FlowEndCountersRegister(t, &ftd->fec);
1000 
1001  *data = ftd;
1002  return TM_ECODE_OK;
1003 }
1004 
1005 static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
1006 {
1008 
1010  if (ftd->output_thread_data != NULL)
1012 
1013  SCFree(data);
1014  return TM_ECODE_OK;
1015 }
1016 
1017 static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
1018 {
1019  FLOWLOCK_WRLOCK(f);
1020 
1021  (void)OutputFlowLog(tv, ftd->output_thread_data, f);
1022 
1023  FlowEndCountersUpdate(tv, &ftd->fec, f);
1024  if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
1026  }
1028 
1029  FlowClearMemory(f, f->protomap);
1030  FLOWLOCK_UNLOCK(f);
1031 }
1032 
1033 extern uint32_t flow_spare_pool_block_size;
1034 
1035 /** \brief Thread that manages timed out flows.
1036  *
1037  * \param td ThreadVars cast to void ptr
1038  */
1039 static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
1040 {
1041  FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
1042  BUG_ON(ftd == NULL);
1043  const bool time_is_live = TimeModeIsLive();
1044  uint64_t recycled_cnt = 0;
1045  FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
1046 
1048 
1049  while (1)
1050  {
1051  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
1055  }
1056  SC_ATOMIC_ADD(flowrec_busy,1);
1058 
1059  StatsAddUI64(th_v, ftd->counter_queue_avg, list.len);
1060  StatsSetUI64(th_v, ftd->counter_queue_max, list.len);
1061 
1062  const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
1063 
1064  /* Get the time */
1065  SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(TimeGet()));
1066 
1067  uint64_t cnt = 0;
1068  Flow *f;
1069  while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
1070  Recycler(th_v, ftd, f);
1071  cnt++;
1072 
1073  /* for every full sized block, add it to the spare pool */
1074  FlowQueuePrivateAppendFlow(&ret_queue, f);
1075  if (ret_queue.len == flow_spare_pool_block_size) {
1076  FlowSparePoolReturnFlows(&ret_queue);
1077  }
1078  }
1079  if (ret_queue.len > 0) {
1080  FlowSparePoolReturnFlows(&ret_queue);
1081  }
1082  if (cnt > 0) {
1083  recycled_cnt += cnt;
1084  StatsAddUI64(th_v, ftd->counter_flows, cnt);
1085  }
1086  SC_ATOMIC_SUB(flowrec_busy,1);
1087 
1088  if (bail) {
1089  break;
1090  }
1091 
1092  const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY);
1093  if (emerg || !time_is_live) {
1094  usleep(250);
1095  } else {
1096  struct timeval cond_tv;
1097  gettimeofday(&cond_tv, NULL);
1098  cond_tv.tv_sec += 1;
1099  struct timespec cond_time = FROM_TIMEVAL(cond_tv);
1100  SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
1101  while (1) {
1102  int rc = SCCtrlCondTimedwait(
1103  &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time);
1104  if (rc == ETIMEDOUT || rc < 0) {
1105  break;
1106  }
1107  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
1108  break;
1109  }
1110  if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) {
1111  break;
1112  }
1113  }
1114  SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
1115  }
1116 
1117  SCLogDebug("woke up...");
1118 
1120  }
1121  StatsSyncCounters(th_v);
1122  SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
1123  return TM_ECODE_OK;
1124 }
1125 
1126 static bool FlowRecyclerReadyToShutdown(void)
1127 {
1128  if (SC_ATOMIC_GET(flowrec_busy) != 0) {
1129  return false;
1130  }
1131  uint32_t len = 0;
1133  len = flow_recycle_q.qlen;
1135 
1136  return ((len == 0));
1137 }
1138 
1139 /** \brief spawn the flow recycler thread */
1141 {
1142  intmax_t setting = 1;
1143  (void)ConfGetInt("flow.recyclers", &setting);
1144 
1145  if (setting < 1 || setting > 1024) {
1146  FatalError("invalid flow.recyclers setting %" PRIdMAX, setting);
1147  }
1148  flowrec_number = (uint32_t)setting;
1149 
1150  SCLogConfig("using %u flow recycler threads", flowrec_number);
1151 
1152  for (uint32_t u = 0; u < flowrec_number; u++) {
1153  char name[TM_THREAD_NAME_MAX];
1154  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
1155 
1156  ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name,
1157  "FlowRecycler", 0);
1158 
1159  if (tv_flowrec == NULL) {
1160  FatalError("flow recycler thread creation failed");
1161  }
1162  if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) {
1163  FatalError("flow recycler thread spawn failed");
1164  }
1165  }
1166  return;
1167 }
1168 
1169 /**
1170  * \brief Used to disable flow recycler thread(s).
1171  *
1172  * \note this should only be called when the flow manager is already gone
1173  *
1174  * \todo Kinda hackish since it uses the tv name to identify flow recycler
1175  * thread. We need an all weather identification scheme.
1176  */
1178 {
1179  /* move all flows still in the hash to the recycler queue */
1180 #ifndef DEBUG
1181  (void)FlowCleanupHash();
1182 #else
1183  uint32_t flows = FlowCleanupHash();
1184  SCLogDebug("flows to progress: %u", flows);
1185 #endif
1186 
1187  /* make sure all flows are processed */
1188  do {
1190  usleep(10);
1191  } while (FlowRecyclerReadyToShutdown() == false);
1192 
1194  /* flow recycler thread(s) is/are a part of mgmt threads */
1195  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1196  if (strncasecmp(tv->name, thread_name_flow_rec,
1197  strlen(thread_name_flow_rec)) == 0)
1198  {
1200  }
1201  }
1203 
1204  struct timeval start_ts;
1205  struct timeval cur_ts;
1206  gettimeofday(&start_ts, NULL);
1207 
1208 again:
1209  gettimeofday(&cur_ts, NULL);
1210  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1211  FatalError("unable to get all flow recycler "
1212  "threads to shutdown in time");
1213  }
1214 
1216  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1217  if (strncasecmp(tv->name, thread_name_flow_rec,
1218  strlen(thread_name_flow_rec)) == 0)
1219  {
1223  /* sleep outside lock */
1224  SleepMsec(1);
1225  goto again;
1226  }
1227  }
1228  }
1230 
1231  /* reset count, so we can kill and respawn (unix socket) */
1232  SC_ATOMIC_SET(flowrec_cnt, 0);
1233  return;
1234 }
1235 
1237 {
1238  tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1239  tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1240  tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1241  tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1244  SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1245 
1246  SC_ATOMIC_INIT(flowmgr_cnt);
1247  SC_ATOMIC_INITPTR(flow_timeouts);
1248 }
1249 
1251 {
1252  tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1253  tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1254  tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1255  tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1258  SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1259 
1260  SC_ATOMIC_INIT(flowrec_cnt);
1261  SC_ATOMIC_INIT(flowrec_busy);
1262 }
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:74
FlowTimeoutCounters_::rows_empty
uint32_t rows_empty
Definition: flow-manager.c:115
FlowSparePoolUpdate
void FlowSparePoolUpdate(uint32_t size)
Definition: flow-spare-pool.c:219
FlowManagerThreadSpawn
void FlowManagerThreadSpawn(void)
spawn the flow manager thread
Definition: flow-manager.c:937
tm-threads.h
ConfGetInt
int ConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition: conf.c:399
FROM_TIMEVAL
#define FROM_TIMEVAL(timev)
initialize a 'struct timespec' from a 'struct timeval'.
Definition: util-time.h:116
OutputFlowLog
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition: output-flow.c:85
FlowRecyclerThreadData_::fec
FlowEndCounters fec
Definition: flow-manager.c:977
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1660
app-layer-htp-range.h
FlowTimeoutCounters
Definition: flow-worker.c:61
FlowForceReassemblyForFlow
void FlowForceReassemblyForFlow(Flow *f)
Definition: flow-timeout.c:345
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1093
FlowBucket_::evicted
Flow * evicted
Definition: flow-hash.h:48
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:167
FLOW_IS_IPV6
#define FLOW_IS_IPV6(f)
Definition: flow.h:166
FlowManagerThreadData_::max
uint32_t max
Definition: flow-manager.c:618
FlowManagerTimeoutThread
Definition: flow-manager.c:262
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:72
FlowSpareGetPoolSize
uint32_t FlowSpareGetPoolSize(void)
Definition: flow-spare-pool.c:47
flow-util.h
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:314
FlowManagerThreadData_::cnt
FlowCounters cnt
Definition: flow-manager.c:620
FBLOCK_LOCK
#define FBLOCK_LOCK(fb)
Definition: flow-hash.h:73
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
TMM_FLOWRECYCLER
@ TMM_FLOWRECYCLER
Definition: tm-threads-common.h:73
stream-tcp.h
GetFlowBypassInfoID
FlowStorageId GetFlowBypassInfoID(void)
Definition: flow-util.c:211
FlowBypassInfo_
Definition: flow.h:530
FlowCnf_::emergency_recovery
uint32_t emergency_recovery
Definition: flow.h:299
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
FlowCnf_::hash_size
uint32_t hash_size
Definition: flow.h:293
FlowManagerThreadData_::instance
uint32_t instance
Definition: flow-manager.c:616
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
IPPairTimeoutHash
uint32_t IPPairTimeoutHash(SCTime_t ts)
time out ippairs from the hash
Definition: ippair-timeout.c:132
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:99
StatsRegisterGlobalCounter
uint16_t StatsRegisterGlobalCounter(const char *name, uint64_t(*Func)(void))
Registers a counter, which represents a global value.
Definition: counters.c:1029
Flow_::proto
uint8_t proto
Definition: flow.h:377
SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt)
threads.h
flow-private.h
FlowCounters_::flow_mgr_flows_notimeout
uint16_t flow_mgr_flows_notimeout
Definition: flow-manager.c:600
Flow_
Flow data structure.
Definition: flow.h:355
TYPE
#define TYPE
Flow_::protomap
uint8_t protomap
Definition: flow.h:449
SC_ATOMIC_EXTERN
SC_ATOMIC_EXTERN(unsigned int, flow_flags)
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:73
FlowProtoTimeout_
Definition: flow.h:519
StatsSetUI64
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:210
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
flow-hash.h
TmModuleFlowRecyclerRegister
void TmModuleFlowRecyclerRegister(void)
Definition: flow-manager.c:1250
FlowTimeoutCounters_::flows_removed
uint32_t flows_removed
Definition: flow-manager.c:121
flow_spare_pool_block_size
uint32_t flow_spare_pool_block_size
Definition: flow-spare-pool.c:43
FlowBypassInfo_::tosrcbytecnt
uint64_t tosrcbytecnt
Definition: flow.h:535
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
FlowGetMemuse
uint64_t FlowGetMemuse(void)
Definition: flow.c:125
MIN
#define MIN(x, y)
Definition: suricata-common.h:391
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:80
FlowTimeoutCounters_::flows_timeout
uint32_t flows_timeout
Definition: flow-manager.c:120
defrag-timeout.h
FlowRecyclerThreadData_::counter_flows
uint16_t counter_flows
Definition: flow-manager.c:971
FLOW_ACTION_DROP
#define FLOW_ACTION_DROP
Definition: flow.h:67
LiveDevSubBypassStats
void LiveDevSubBypassStats(LiveDevice *dev, uint64_t cnt, int family)
Definition: util-device.c:515
MAX
#define MAX(x, y)
Definition: suricata-common.h:395
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:85
FlowQueuePrivate_::len
uint32_t len
Definition: flow-queue.h:44
Flow_::protoctx
void * protoctx
Definition: flow.h:445
FlowCounters_
Definition: flow-manager.c:591
FlowManagerTimeoutThread
struct FlowManagerTimeoutThread FlowManagerTimeoutThread
SCCtrlCondSignal
#define SCCtrlCondSignal
Definition: threads-debug.h:384
FlowCounters_::flow_mgr_flows_aside
uint16_t flow_mgr_flows_aside
Definition: flow-manager.c:602
FLOW_TIMEOUT_REASSEMBLY_DONE
#define FLOW_TIMEOUT_REASSEMBLY_DONE
Definition: flow.h:94
runmode-unix-socket.h
FlowTimeoutCounters_::bypassed_count
uint32_t bypassed_count
Definition: flow-manager.c:125
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:37
FlowSparePoolReturnFlows
void FlowSparePoolReturnFlows(FlowQueuePrivate *fqp)
Definition: flow-spare-pool.c:122
TM_THREAD_NAME_MAX
#define TM_THREAD_NAME_MAX
Definition: tm-threads.h:49
FlowCnf_::prealloc
uint32_t prealloc
Definition: flow.h:294
FlowQueueTimeoutCounters::flows_removed
uint32_t flows_removed
Definition: flow-manager.c:587
FLOWLOCK_UNLOCK
#define FLOWLOCK_UNLOCK(fb)
Definition: flow.h:272
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:84
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:246
Flow_::flow_state
FlowStateType flow_state
Definition: flow.h:416
FQLOCK_LOCK
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:73
FlowWakeupFlowManagerThread
void FlowWakeupFlowManagerThread(void)
Definition: flow-manager.c:80
FlowDisableFlowRecyclerThread
void FlowDisableFlowRecyclerThread(void)
Used to disable flow recycler thread(s).
Definition: flow-manager.c:1177
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:50
FlowTimeoutCounters_::rows_maxlen
uint32_t rows_maxlen
Definition: flow-manager.c:116
FlowCounters_::flow_bypassed_bytes
uint16_t flow_bypassed_bytes
Definition: flow-manager.c:609
FlowTimeoutCounters_::flows_checked
uint32_t flows_checked
Definition: flow-manager.c:118
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:45
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:107
flow-spare-pool.h
StatsDecr
void StatsDecr(ThreadVars *tv, uint16_t id)
Decrements the local counter.
Definition: counters.c:188
Flow_::fb
struct FlowBucket_ * fb
Definition: flow.h:492
FlowCounters_::flow_mgr_full_pass
uint16_t flow_mgr_full_pass
Definition: flow-manager.c:592
StatsRegisterMaxCounter
uint16_t StatsRegisterMaxCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the maximum of all the values assigned to it.
Definition: counters.c:1011
SC_ATOMIC_MEMORY_ORDER_RELAXED
#define SC_ATOMIC_MEMORY_ORDER_RELAXED
Definition: util-atomic.h:165
util-device.h
util-debug.h
FlowWakeupFlowRecyclerThread
void FlowWakeupFlowRecyclerThread(void)
Definition: flow-manager.c:87
FlowBypassInfo_::todstbytecnt
uint64_t todstbytecnt
Definition: flow.h:537
FlowBypassInfo_::BypassUpdate
bool(* BypassUpdate)(Flow *f, void *data, time_t tsec)
Definition: flow.h:531
FlowRecyclerThreadData_::counter_queue_avg
uint16_t counter_queue_avg
Definition: flow-manager.c:972
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:92
OutputFlowLogThreadInit
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void *initdata, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
Definition: output-flow.c:123
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
FlowCounters_::flow_mgr_rows_maxlen
uint16_t flow_mgr_rows_maxlen
Definition: flow-manager.c:605
FLOWLOCK_WRLOCK
#define FLOWLOCK_WRLOCK(fb)
Definition: flow.h:269
FlowTimeoutsReset
#define FlowTimeoutsReset()
Definition: flow-manager.h:30
FlowForceReassemblyNeedReassembly
int FlowForceReassemblyNeedReassembly(Flow *f)
Check if a flow needs forced reassembly, or any other processing.
Definition: flow-timeout.c:287
FlowDisableFlowManagerThread
void FlowDisableFlowManagerThread(void)
Used to disable flow manager thread(s).
Definition: flow-manager.c:136
SCCtrlCondT
#define SCCtrlCondT
Definition: threads-debug.h:382
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1762
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:66
TimeModeIsReady
bool TimeModeIsReady(void)
Definition: util-time.c:92
Flow_::flow_end_flags
uint8_t flow_end_flags
Definition: flow.h:451
THV_KILL
#define THV_KILL
Definition: threadvars.h:39
MemcapsGetPressure
float MemcapsGetPressure(void)
Definition: runmode-unix-socket.c:135
FlowBypassInfo_::todstpktcnt
uint64_t todstpktcnt
Definition: flow.h:536
util-time.h
FlowQueuePrivateGetFromTop
Flow * FlowQueuePrivateGetFromTop(FlowQueuePrivate *fqc)
Definition: flow-queue.c:151
FlowBypassInfo_::bypass_data
void * bypass_data
Definition: flow.h:533
FlowQueueAppendPrivate
void FlowQueueAppendPrivate(FlowQueue *fq, FlowQueuePrivate *fqc)
Definition: flow-queue.c:119
FlowQueueTimeoutCounters
struct FlowQueueTimeoutCounters FlowQueueTimeoutCounters
FlowCounters_::flow_mgr_flows_aside_needs_work
uint16_t flow_mgr_flows_aside_needs_work
Definition: flow-manager.c:603
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
FlowRecyclerThreadData_
Definition: flow-manager.c:968
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:300
FLOW_IS_IPV4
#define FLOW_IS_IPV4(f)
Definition: flow.h:164
FlowRecyclerThreadData_::counter_tcp_active_sessions
uint16_t counter_tcp_active_sessions
Definition: flow-manager.c:976
FlowCounters_::flow_bypassed_cnt_clo
uint16_t flow_bypassed_cnt_clo
Definition: flow-manager.c:607
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:83
SC_ATOMIC_SUB
#define SC_ATOMIC_SUB(name, val)
sub a value from our atomic variable
Definition: util-atomic.h:341
flow_timeouts_emerg
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
Definition: flow.c:86
TimeModeIsLive
bool TimeModeIsLive(void)
Definition: util-time.c:111
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:376
FlowTimeoutsEmergency
void FlowTimeoutsEmergency(void)
Definition: flow-manager.c:99
FlowTimeoutCounters
struct FlowTimeoutCounters_ FlowTimeoutCounters
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:33
TimeGet
SCTime_t TimeGet(void)
Definition: util-time.c:152
conf.h
FlowRecyclerThreadData_::output_thread_data
void * output_thread_data
Definition: flow-manager.c:969
FlowTimeoutCounters_::rows_checked
uint32_t rows_checked
Definition: flow-manager.c:113
FBLOCK_UNLOCK
#define FBLOCK_UNLOCK(fb)
Definition: flow-hash.h:75
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:83
FlowClearMemory
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition: flow.c:1093
flow_timeouts_delta
FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]
Definition: flow.c:87
FlowRecyclerThreadSpawn
void FlowRecyclerThreadSpawn(void)
spawn the flow recycler thread
Definition: flow-manager.c:1140
output-flow.h
flow-timeout.h
flow-queue.h
TmModule_::name
const char * name
Definition: tm-modules.h:45
FlowRecyclerThreadData
struct FlowRecyclerThreadData_ FlowRecyclerThreadData
FlowBypassInfo_::tosrcpktcnt
uint64_t tosrcpktcnt
Definition: flow.h:534
host-timeout.h
SCCtrlCondTimedwait
#define SCCtrlCondTimedwait
Definition: threads-debug.h:385
StreamTcpThreadCacheCleanup
void StreamTcpThreadCacheCleanup(void)
Definition: stream-tcp-cache.c:134
runmodes.h
FlowTimeoutCounters_::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-manager.c:123
FlowGetStorageById
void * FlowGetStorageById(const Flow *f, FlowStorageId id)
Definition: flow-storage.c:40
Flow_::next
struct Flow_ * next
Definition: flow.h:400
FlowQueue_
Definition: flow-queue.h:49
FlowCounters_::flow_bypassed_pkts
uint16_t flow_bypassed_pkts
Definition: flow-manager.c:608
TMM_FLOWMANAGER
@ TMM_FLOWMANAGER
Definition: tm-threads-common.h:72
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:38
flow_hash
FlowBucket * flow_hash
Definition: flow-hash.c:58
FLOW_PROTO_MAX
@ FLOW_PROTO_MAX
Definition: flow-private.h:74
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:378
FlowQueueExtractPrivate
FlowQueuePrivate FlowQueueExtractPrivate(FlowQueue *fq)
Definition: flow-queue.c:140
flow-storage.h
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
FlowTimeoutCounters_
Definition: flow-manager.c:112
FlowManagerThreadData_
Definition: flow-manager.c:615
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flow-manager.h
suricata-common.h
SCCtrlMutex
#define SCCtrlMutex
Definition: threads-debug.h:373
DefragTimeoutHash
uint32_t DefragTimeoutHash(SCTime_t ts)
time out tracker from the hash
Definition: defrag-timeout.c:121
FlowManagerThreadData
struct FlowManagerThreadData_ FlowManagerThreadData
flow_config
FlowConfig flow_config
Definition: flow.c:90
FlowQueueTimeoutCounters::flows_timeout
uint32_t flows_timeout
Definition: flow-manager.c:588
TmModuleFlowManagerRegister
void TmModuleFlowManagerRegister(void)
Definition: flow-manager.c:1236
FlowCounters_::memcap_pressure
uint16_t memcap_pressure
Definition: flow-manager.c:611
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
FlowTimeoutsInit
void FlowTimeoutsInit(void)
Definition: flow-manager.c:94
SCTIME_SECS
#define SCTIME_SECS(t)
Definition: util-time.h:57
SC_ATOMIC_LOAD_EXPLICIT
#define SC_ATOMIC_LOAD_EXPLICIT(name, order)
Definition: util-atomic.h:378
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:48
FatalError
#define FatalError(...)
Definition: util-debug.h:502
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
StatsSyncCounters
void StatsSyncCounters(ThreadVars *tv)
Definition: counters.c:456
Flow_::livedev
struct LiveDevice_ * livedev
Definition: flow.h:402
threadvars.h
FlowQueuePrivate_
Definition: flow-queue.h:41
StatsAddUI64
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:146
SCLogConfig
struct SCLogConfig_ SCLogConfig
Holds the config state used by the logging api.
FlowRecyclerThreadData_::counter_queue_max
uint16_t counter_queue_max
Definition: flow-manager.c:973
FlowTimeoutCounters_::bypassed_bytes
uint64_t bypassed_bytes
Definition: flow-manager.c:127
FlowManagerThreadData_::min
uint32_t min
Definition: flow-manager.c:617
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
OutputFlowLogThreadDeinit
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
Definition: output-flow.c:163
flow_recycle_q
FlowQueue flow_recycle_q
Definition: flow-manager.c:61
SCFree
#define SCFree(p)
Definition: util-mem.h:61
FlowTimeoutCounters_::bypassed_pkts
uint64_t bypassed_pkts
Definition: flow-manager.c:126
FlowCounters_::memcap_pressure_max
uint16_t memcap_pressure_max
Definition: flow-manager.c:612
Flow_::flags
uint32_t flags
Definition: flow.h:425
HostTimeoutHash
uint32_t HostTimeoutHash(SCTime_t ts)
time out hosts from the hash
Definition: host-timeout.c:128
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:317
FlowManagerThreadData_::timeout
FlowManagerTimeoutThread timeout
Definition: flow-manager.c:622
FlowCounters
struct FlowCounters_ FlowCounters
FlowCounters_::flow_emerg_mode_enter
uint16_t flow_emerg_mode_enter
Definition: flow-manager.c:596
FLOW_END_FLAG_SHUTDOWN
#define FLOW_END_FLAG_SHUTDOWN
Definition: flow.h:244
FLOW_EMERGENCY
#define FLOW_EMERGENCY
Definition: flow-private.h:37
ippair-timeout.h
FlowRecyclerThreadData_::counter_flow_active
uint16_t counter_flow_active
Definition: flow-manager.c:975
FlowCounters_::flow_mgr_flows_timeout
uint16_t flow_mgr_flows_timeout
Definition: flow-manager.c:601
FlowManagerTimeoutThread::aside_queue
FlowQueuePrivate aside_queue
Definition: flow-manager.c:265
timeradd
#define timeradd(a, b, r)
Definition: util-time.h:128
FLOW_END_FLAG_TIMEOUT
#define FLOW_END_FLAG_TIMEOUT
Definition: flow.h:242
FlowTimeoutCounters_::flows_notimeout
uint32_t flows_notimeout
Definition: flow-manager.c:119
flow_timeouts_normal
FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX]
Definition: flow.c:85
StatsRegisterAvgCounter
uint16_t StatsRegisterAvgCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the average of all the values assigned to it.
Definition: counters.c:991
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition: counters.c:461
FlowCounters_::flow_mgr_spare
uint16_t flow_mgr_spare
Definition: flow-manager.c:595
FlowTimeoutCounters_::flows_aside
uint32_t flows_aside
Definition: flow-manager.c:122
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:277
flow.h
FlowQueuePrivateAppendFlow
void FlowQueuePrivateAppendFlow(FlowQueuePrivate *fqc, Flow *f)
Definition: flow-queue.c:65
FlowEndCountersRegister
void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
Definition: flow-util.c:235
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:91
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
evicted
Flow * evicted
Definition: flow-hash.h:4
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:971
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
BITS
#define BITS
Flow_::timeout_at
uint32_t timeout_at
Definition: flow.h:395
FlowEndCounters_
Definition: flow-util.h:148
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:77
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:359
FlowCounters_::flow_emerg_mode_over
uint16_t flow_emerg_mode_over
Definition: flow-manager.c:597
TM_FLAG_MANAGEMENT_TM
#define TM_FLAG_MANAGEMENT_TM
Definition: tm-modules.h:37
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:172
FlowTimeoutCounters_::rows_skipped
uint32_t rows_skipped
Definition: flow-manager.c:114
FlowCounters_::flow_mgr_flows_checked
uint16_t flow_mgr_flows_checked
Definition: flow-manager.c:599
SCTIME_USECS
#define SCTIME_USECS(t)
Definition: util-time.h:56
FlowCounters_::flow_mgr_rows_sec
uint16_t flow_mgr_rows_sec
Definition: flow-manager.c:593
FlowQueueTimeoutCounters
Definition: flow-manager.c:586
FlowTimeoutCounters::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-worker.c:62
FQLOCK_UNLOCK
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:75
HttpRangeContainersTimeoutHash
uint32_t HttpRangeContainersTimeoutHash(const SCTime_t ts)
Definition: app-layer-htp-range.c:188