suricata
flow-manager.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2022 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22  * \author Victor Julien <victor@inliniac.net>
23  */
24 
25 #include "suricata-common.h"
26 #include "suricata.h"
27 #include "decode.h"
28 #include "conf.h"
29 #include "threadvars.h"
30 #include "tm-threads.h"
31 #include "runmodes.h"
32 
33 #include "util-random.h"
34 #include "util-time.h"
35 
36 #include "flow.h"
37 #include "flow-queue.h"
38 #include "flow-hash.h"
39 #include "flow-util.h"
40 #include "flow-var.h"
41 #include "flow-private.h"
42 #include "flow-timeout.h"
43 #include "flow-manager.h"
44 #include "flow-storage.h"
45 #include "flow-spare-pool.h"
46 
47 #include "stream-tcp-private.h"
48 #include "stream-tcp-reassemble.h"
49 #include "stream-tcp.h"
50 
51 #include "util-unittest.h"
52 #include "util-unittest-helper.h"
53 #include "util-byte.h"
54 #include "util-device.h"
55 
56 #include "util-debug.h"
57 #include "util-privs.h"
58 #include "util-signal.h"
59 
60 #include "threads.h"
61 #include "detect.h"
62 #include "detect-engine-state.h"
63 #include "stream.h"
64 
65 #include "app-layer-parser.h"
66 
67 #include "host-timeout.h"
68 #include "defrag-timeout.h"
69 #include "ippair-timeout.h"
70 #include "app-layer-htp-range.h"
71 
72 #include "output-flow.h"
73 #include "util-validate.h"
74 
75 #include "runmode-unix-socket.h"
76 
77 /* Run mode selected at suricata.c */
78 extern int run_mode;
79 
80 /** queue to pass flows to cleanup/log thread(s) */
82 
83 /* multi flow mananger support */
84 static uint32_t flowmgr_number = 1;
85 /* atomic counter for flow managers, to assign instance id */
86 SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
87 
88 /* multi flow recycler support */
89 static uint32_t flowrec_number = 1;
90 /* atomic counter for flow recyclers, to assign instance id */
91 SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
92 SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
93 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
94 
99 
101 {
102  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
103 }
104 
106 {
107  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
108 }
109 
110 /* 1 seconds */
111 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
112 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
113 /* 0.3 seconds */
114 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
115 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
116 #define NEW_FLOW_COUNT_COND 10
117 
118 typedef struct FlowTimeoutCounters_ {
119  uint32_t rows_checked;
120  uint32_t rows_skipped;
121  uint32_t rows_empty;
122  uint32_t rows_maxlen;
123 
124  uint32_t flows_checked;
125  uint32_t flows_notimeout;
126  uint32_t flows_timeout;
128  uint32_t flows_removed;
129  uint32_t flows_aside;
131 
132  uint32_t bypassed_count;
133  uint64_t bypassed_pkts;
134  uint64_t bypassed_bytes;
136 
137 /**
138  * \brief Used to disable flow manager thread(s).
139  *
140  * \todo Kinda hackish since it uses the tv name to identify flow manager
141  * thread. We need an all weather identification scheme.
142  */
144 {
146  /* flow manager thread(s) is/are a part of mgmt threads */
147  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
148  if (strncasecmp(tv->name, thread_name_flow_mgr,
149  strlen(thread_name_flow_mgr)) == 0)
150  {
152  }
153  }
155 
156  struct timeval start_ts;
157  struct timeval cur_ts;
158  gettimeofday(&start_ts, NULL);
159 
160 again:
161  gettimeofday(&cur_ts, NULL);
162  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
163  FatalError(SC_ERR_SHUTDOWN, "unable to get all flow manager "
164  "threads to shutdown in time");
165  }
166 
168  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
169  if (strncasecmp(tv->name, thread_name_flow_mgr,
170  strlen(thread_name_flow_mgr)) == 0)
171  {
174  /* sleep outside lock */
175  SleepMsec(1);
176  goto again;
177  }
178  }
179  }
181 
182  /* reset count, so we can kill and respawn (unix socket) */
183  SC_ATOMIC_SET(flowmgr_cnt, 0);
184  return;
185 }
186 
187 /** \internal
188  * \brief check if a flow is timed out
189  *
190  * \param f flow
191  * \param ts timestamp
192  *
193  * \retval 0 not timed out
194  * \retval 1 timed out
195  */
196 static int FlowManagerFlowTimeout(Flow *f, struct timeval *ts, int32_t *next_ts, const bool emerg)
197 {
198  int32_t flow_times_out_at = f->timeout_at;
199  if (emerg) {
201  flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
202  }
203  if (*next_ts == 0 || flow_times_out_at < *next_ts)
204  *next_ts = flow_times_out_at;
205 
206  /* do the timeout check */
207  if (flow_times_out_at >= ts->tv_sec) {
208  return 0;
209  }
210 
211  return 1;
212 }
213 
214 /** \internal
215  * \brief check timeout of captured bypassed flow by querying capture method
216  *
217  * \param f Flow
218  * \param ts timestamp
219  * \param counters Flow timeout counters
220  *
221  * \retval 0 not timeout
222  * \retval 1 timeout (or not capture bypassed)
223  */
224 static inline int FlowBypassedTimeout(Flow *f, struct timeval *ts,
225  FlowTimeoutCounters *counters)
226 {
227 #ifdef CAPTURE_OFFLOAD
228  if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
229  return 1;
230  }
231 
233  if (fc && fc->BypassUpdate) {
234  /* flow will be possibly updated */
235  uint64_t pkts_tosrc = fc->tosrcpktcnt;
236  uint64_t bytes_tosrc = fc->tosrcbytecnt;
237  uint64_t pkts_todst = fc->todstpktcnt;
238  uint64_t bytes_todst = fc->todstbytecnt;
239  bool update = fc->BypassUpdate(f, fc->bypass_data, ts->tv_sec);
240  if (update) {
241  SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f));
242  pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
243  bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
244  pkts_todst = fc->todstpktcnt - pkts_todst;
245  bytes_todst = fc->todstbytecnt - bytes_todst;
246  if (f->livedev) {
247  SC_ATOMIC_ADD(f->livedev->bypassed,
248  pkts_tosrc + pkts_todst);
249  }
250  counters->bypassed_pkts += pkts_tosrc + pkts_todst;
251  counters->bypassed_bytes += bytes_tosrc + bytes_todst;
252  return 0;
253  } else {
254  SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
255  if (f->livedev) {
256  if (FLOW_IS_IPV4(f)) {
257  LiveDevSubBypassStats(f->livedev, 1, AF_INET);
258  } else if (FLOW_IS_IPV6(f)) {
259  LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
260  }
261  }
262  counters->bypassed_count++;
263  return 1;
264  }
265  }
266 #endif /* CAPTURE_OFFLOAD */
267  return 1;
268 }
269 
270 typedef struct FlowManagerTimeoutThread {
271  /* used to temporarily store flows that have timed out and are
272  * removed from the hash */
275 
276 static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
277 {
278  FlowQueuePrivate recycle = { NULL, NULL, 0 };
279  counters->flows_aside += td->aside_queue.len;
280 
281  uint32_t cnt = 0;
282  Flow *f;
283  while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
284  /* flow is still locked */
285 
286  if (f->proto == IPPROTO_TCP && !(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
287  !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1) {
288  /* Send the flow to its thread */
290  FLOWLOCK_UNLOCK(f);
291  /* flow ownership is passed to the worker thread */
292 
293  counters->flows_aside_needs_work++;
294  continue;
295  }
296  FLOWLOCK_UNLOCK(f);
297 
298  FlowQueuePrivateAppendFlow(&recycle, f);
299  if (recycle.len == 100) {
302  }
303  cnt++;
304  }
305  if (recycle.len) {
308  }
309  return cnt;
310 }
311 
312 /**
313  * \internal
314  *
315  * \brief check all flows in a hash row for timing out
316  *
317  * \param f last flow in the hash row
318  * \param ts timestamp
319  * \param emergency bool indicating emergency mode
320  * \param counters ptr to FlowTimeoutCounters structure
321  */
322 static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td,
323  Flow *f, struct timeval *ts,
324  int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
325 {
326  uint32_t checked = 0;
327  Flow *prev_f = NULL;
328 
329  do {
330  checked++;
331 
332  /* check flow timeout based on lastts and state. Both can be
333  * accessed w/o Flow lock as we do have the hash row lock (so flow
334  * can't disappear) and flow_state is atomic. lastts can only
335  * be modified when we have both the flow and hash row lock */
336 
337  /* timeout logic goes here */
338  if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {
339 
340  counters->flows_notimeout++;
341 
342  prev_f = f;
343  f = f->next;
344  continue;
345  }
346 
347  FLOWLOCK_WRLOCK(f);
348 
349  Flow *next_flow = f->next;
350 
351  /* never prune a flow that is used by a packet we
352  * are currently processing in one of the threads */
353  if (f->use_cnt > 0 || !FlowBypassedTimeout(f, ts, counters)) {
354  FLOWLOCK_UNLOCK(f);
355  prev_f = f;
356  if (f->use_cnt > 0) {
357  counters->flows_timeout_inuse++;
358  }
359  f = f->next;
360  continue;
361  }
362 
364 
365  counters->flows_timeout++;
366 
367  RemoveFromHash(f, prev_f);
368 
370  /* flow is still locked in the queue */
371 
372  f = next_flow;
373  } while (f != NULL);
374 
375  counters->flows_checked += checked;
376  if (checked > counters->rows_maxlen)
377  counters->rows_maxlen = checked;
378 }
379 
380 static void FlowManagerHashRowClearEvictedList(FlowManagerTimeoutThread *td,
381  Flow *f, struct timeval *ts, FlowTimeoutCounters *counters)
382 {
383  do {
384  FLOWLOCK_WRLOCK(f);
385  Flow *next_flow = f->next;
386  f->next = NULL;
387  f->fb = NULL;
388 
390 
392  /* flow is still locked in the queue */
393 
394  f = next_flow;
395  } while (f != NULL);
396 }
397 
398 /**
399  * \brief time out flows from the hash
400  *
401  * \param ts timestamp
402  * \param hash_min min hash index to consider
403  * \param hash_max max hash index to consider
404  * \param counters ptr to FlowTimeoutCounters structure
405  *
406  * \retval cnt number of timed out flow
407  */
408 static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td,
409  struct timeval *ts,
410  const uint32_t hash_min, const uint32_t hash_max,
411  FlowTimeoutCounters *counters)
412 {
413  uint32_t cnt = 0;
414  const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
415  const uint32_t rows_checked = hash_max - hash_min;
416  uint32_t rows_skipped = 0;
417  uint32_t rows_empty = 0;
418 
419 #if __WORDSIZE==64
420 #define BITS 64
421 #define TYPE uint64_t
422 #else
423 #define BITS 32
424 #define TYPE uint32_t
425 #endif
426 
427  for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
428  TYPE check_bits = 0;
429  const uint32_t check = MIN(BITS, (hash_max - idx));
430  for (uint32_t i = 0; i < check; i++) {
431  FlowBucket *fb = &flow_hash[idx+i];
432  check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= (int32_t)ts->tv_sec) << (TYPE)i;
433  }
434  if (check_bits == 0)
435  continue;
436 
437  for (uint32_t i = 0; i < check; i++) {
438  FlowBucket *fb = &flow_hash[idx+i];
439  if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= (int32_t)ts->tv_sec) {
440  FBLOCK_LOCK(fb);
441  Flow *evicted = NULL;
442  if (fb->evicted != NULL || fb->head != NULL) {
443  if (fb->evicted != NULL) {
444  /* transfer out of bucket so we can do additional work outside
445  * of the bucket lock */
446  evicted = fb->evicted;
447  fb->evicted = NULL;
448  }
449  if (fb->head != NULL) {
450  int32_t next_ts = 0;
451  FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
452 
453  if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
454  SC_ATOMIC_SET(fb->next_ts, next_ts);
455  }
456  if (fb->evicted == NULL && fb->head == NULL) {
457  SC_ATOMIC_SET(fb->next_ts, INT_MAX);
458  }
459  } else {
460  SC_ATOMIC_SET(fb->next_ts, INT_MAX);
461  rows_empty++;
462  }
463  FBLOCK_UNLOCK(fb);
464  /* processed evicted list */
465  if (evicted) {
466  FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
467  }
468  } else {
469  rows_skipped++;
470  }
471  }
472  if (td->aside_queue.len) {
473  cnt += ProcessAsideQueue(td, counters);
474  }
475  }
476 
477  counters->rows_checked += rows_checked;
478  counters->rows_skipped += rows_skipped;
479  counters->rows_empty += rows_empty;
480 
481  if (td->aside_queue.len) {
482  cnt += ProcessAsideQueue(td, counters);
483  }
484  counters->flows_removed += cnt;
485  /* coverity[missing_unlock : FALSE] */
486  return cnt;
487 }
488 
489 /** \internal
490  * \brief handle timeout for a slice of hash rows
491  * If we wrap around we call FlowTimeoutHash twice */
492 static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, struct timeval *ts,
493  const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
494  const uint32_t rows, uint32_t *pos)
495 {
496  uint32_t start = 0;
497  uint32_t end = 0;
498  uint32_t cnt = 0;
499  uint32_t rows_left = rows;
500 
501 again:
502  start = hash_min + (*pos);
503  if (start >= hash_max) {
504  start = hash_min;
505  }
506  end = start + rows_left;
507  if (end > hash_max) {
508  end = hash_max;
509  }
510  *pos = (end == hash_max) ? hash_min : end;
511  rows_left = rows_left - (end - start);
512 
513  cnt += FlowTimeoutHash(td, ts, start, end, counters);
514  if (rows_left) {
515  goto again;
516  }
517  return cnt;
518 }
519 
520 /**
521  * \internal
522  *
523  * \brief move all flows out of a hash row
524  *
525  * \param f last flow in the hash row
526  *
527  * \retval cnt removed out flows
528  */
529 static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
530 {
531  uint32_t cnt = 0;
532 
533  do {
534  FLOWLOCK_WRLOCK(f);
535 
536  Flow *next_flow = f->next;
537 
538  /* remove from the hash */
539  if (mode == 0) {
540  RemoveFromHash(f, NULL);
541  } else {
542  FlowBucket *fb = f->fb;
543  fb->evicted = f->next;
544  f->next = NULL;
545  f->fb = NULL;
546  }
548 
549  /* no one is referring to this flow, use_cnt 0, removed from hash
550  * so we can unlock it and move it to the recycle queue. */
551  FLOWLOCK_UNLOCK(f);
552  FlowQueuePrivateAppendFlow(recycle_q, f);
553 
554  cnt++;
555 
556  f = next_flow;
557  } while (f != NULL);
558 
559  return cnt;
560 }
561 
562 /**
563  * \brief remove all flows from the hash
564  *
565  * \retval cnt number of removes out flows
566  */
567 static uint32_t FlowCleanupHash(void)
568 {
569  FlowQueuePrivate local_queue = { NULL, NULL, 0 };
570  uint32_t cnt = 0;
571 
572  for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
573  FlowBucket *fb = &flow_hash[idx];
574 
575  FBLOCK_LOCK(fb);
576 
577  if (fb->head != NULL) {
578  /* we have a flow, or more than one */
579  cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
580  }
581  if (fb->evicted != NULL) {
582  /* we have a flow, or more than one */
583  cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
584  }
585 
586  FBLOCK_UNLOCK(fb);
587  if (local_queue.len >= 25) {
588  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
590  }
591  }
592  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
594 
595  return cnt;
596 }
597 
598 typedef struct FlowQueueTimeoutCounters {
599  uint32_t flows_removed;
600  uint32_t flows_timeout;
602 
603 typedef struct FlowCounters_ {
606 
607  uint16_t flow_mgr_spare;
610 
617 
619 
623 
624  uint16_t memcap_pressure;
627 
628 typedef struct FlowManagerThreadData_ {
629  uint32_t instance;
630  uint32_t min;
631  uint32_t max;
632 
634 
637 
638 static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
639 {
640  fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t);
641  fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", t);
642 
643  fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
644  fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
645  fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
646 
647  fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t);
648  fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t);
649  fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t);
650  fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t);
651  fc->flow_mgr_flows_timeout_inuse = StatsRegisterCounter("flow.mgr.flows_timeout_inuse", t);
652  fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t);
653  fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t);
654 
655  fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
656  fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
657  fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
658 
659  fc->memcap_pressure = StatsRegisterCounter("memcap_pressure", t);
660  fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap_pressure_max", t);
661 }
662 
663 static void FlowCountersUpdate(
664  ThreadVars *th_v, const FlowManagerThreadData *ftd, const FlowTimeoutCounters *counters)
665 {
666  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters->flows_checked);
667  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters->flows_notimeout);
668 
669  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters->flows_timeout);
670  StatsAddUI64(
671  th_v, ftd->cnt.flow_mgr_flows_timeout_inuse, (uint64_t)counters->flows_timeout_inuse);
672  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters->flows_aside);
674  (uint64_t)counters->flows_aside_needs_work);
675 
676  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters->bypassed_count);
677  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters->bypassed_pkts);
678  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters->bypassed_bytes);
679 
680  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters->rows_maxlen);
681 }
682 
683 static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
684 {
686  if (ftd == NULL)
687  return TM_ECODE_FAILED;
688 
689  ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
690  SCLogDebug("flow manager instance %u", ftd->instance);
691 
692  /* set the min and max value used for hash row walking
693  * each thread has it's own section of the flow hash */
694  uint32_t range = flow_config.hash_size / flowmgr_number;
695 
696  ftd->min = ftd->instance * range;
697  ftd->max = (ftd->instance + 1) * range;
698 
699  /* last flow-manager takes on hash_size % flowmgr_number extra rows */
700  if ((ftd->instance + 1) == flowmgr_number) {
701  ftd->max = flow_config.hash_size;
702  }
704 
705  SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
706 
707  /* pass thread data back to caller */
708  *data = ftd;
709 
710  FlowCountersInit(t, &ftd->cnt);
711 
712  PacketPoolInit();
713  return TM_ECODE_OK;
714 }
715 
716 static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
717 {
720  SCFree(data);
721  return TM_ECODE_OK;
722 }
723 
724 /** \internal
725  * \brief calculate number of rows to scan and how much time to sleep
726  * based on the busy score `mp` (0 idle, 100 max busy).
727  *
728  * We try to to make sure we scan the hash once a second. The number size
729  * of the slice of the hash scanned is determined by our busy score 'mp'.
730  * We sleep for the remainder of the second after processing the slice,
731  * or at least an approximation of it.
732  * A minimum busy score of 10 is assumed to avoid a longer than 10 second
733  * full hash pass. This is to avoid burstiness in scanning when there is
734  * a rapid increase of the busy score, which could lead to the flow manager
735  * suddenly scanning a much larger slice of the hash leading to a burst
736  * in scan/eviction work.
737  */
738 static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
739  uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
740 {
741  if (emergency) {
742  *wu_rows = rows;
743  *wu_sleep = 250;
744  return;
745  }
746  /* minimum busy score is 10 */
747  const uint32_t emp = MAX(mp, 10);
748  const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));
749  /* calc how much time we estimate the work will take, in ms. We assume
750  * each row takes an average of 1usec. Maxing out at 1sec. */
751  const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);
752  /* calc how much time we need to sleep to get to the per second cadence
753  * but sleeping for at least 250ms. */
754  const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
755  SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
756  sleep_per_unit);
757 
758  *wu_sleep = sleep_per_unit;
759  *wu_rows = rows_per_sec;
760  *rows_sec = rows_per_sec;
761 }
762 
763 /** \brief Thread that manages the flow table and times out flows.
764  *
765  * \param td ThreadVars casted to void ptr
766  *
767  * Keeps an eye on the spare list, alloc flows if needed...
768  */
769 static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
770 {
771  FlowManagerThreadData *ftd = thread_data;
772  const uint32_t rows = ftd->max - ftd->min;
773  const bool time_is_live = TimeModeIsLive();
774 
775  uint32_t emerg_over_cnt = 0;
776  uint64_t next_run_ms = 0;
777  uint32_t pos = 0;
778  uint32_t rows_sec = 0;
779  uint32_t rows_per_wu = 0;
780  uint64_t sleep_per_wu = 0;
781  bool emerg = false;
782  bool prev_emerg = false;
783  uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
784  struct timeval ts;
785  memset(&ts, 0, sizeof(ts));
786 
787  /* don't start our activities until time is setup */
788  while (!TimeModeIsReady()) {
789  if (suricata_ctl_flags != 0)
790  return TM_ECODE_OK;
791  usleep(10);
792  }
793 
794  uint32_t mp = MemcapsGetPressure() * 100;
795  if (ftd->instance == 0) {
796  StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
797  StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
798  }
799  GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
800  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
801 
803 
804  while (1)
805  {
806  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
810  }
811 
812  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
813  emerg = true;
814  }
815  /* Get the time */
816  memset(&ts, 0, sizeof(ts));
817  TimeGet(&ts);
818  SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
819  uint64_t ts_ms = ts.tv_sec * 1000 + ts.tv_usec / 1000;
820  const bool emerge_p = (emerg && !prev_emerg);
821  if (emerge_p) {
822  next_run_ms = 0;
823  prev_emerg = true;
824  SCLogNotice("Flow emergency mode entered...");
825  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter);
826  }
827  if (ts_ms >= next_run_ms) {
828  if (ftd->instance == 0) {
829  const uint32_t sq_len = FlowSpareGetPoolSize();
830  const uint32_t spare_perc = sq_len * 100 / flow_config.prealloc;
831  /* see if we still have enough spare flows */
832  if (spare_perc < 90 || spare_perc > 110) {
833  FlowSparePoolUpdate(sq_len);
834  }
835  }
836 
837  /* try to time out flows */
838  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
839 
840  if (emerg) {
841  /* in emergency mode, do a full pass of the hash table */
842  FlowTimeoutHash(&ftd->timeout, &ts, ftd->min, ftd->max, &counters);
843  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
844  } else {
845  SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,
846  rows_per_wu);
847 
848  const uint32_t ppos = pos;
849  FlowTimeoutHashInChunks(
850  &ftd->timeout, &ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos);
851  if (ppos > pos) {
852  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
853  }
854  }
855 
856  const uint32_t spare_pool_len = FlowSpareGetPoolSize();
857  StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len);
858 
859  FlowCountersUpdate(th_v, ftd, &counters);
860 
861  if (emerg == true) {
862  SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32
863  "flow_spare_q status: %" PRIu32 "%% flows at the queue",
864  spare_pool_len, flow_config.prealloc,
865  spare_pool_len * 100 / flow_config.prealloc);
866 
867  /* only if we have pruned this "emergency_recovery" percentage
868  * of flows, we will unset the emergency bit */
869  if (spare_pool_len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
870  emerg_over_cnt++;
871  } else {
872  emerg_over_cnt = 0;
873  }
874 
875  if (emerg_over_cnt >= 30) {
876  SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
878 
879  emerg = false;
880  prev_emerg = false;
881  emerg_over_cnt = 0;
882  SCLogNotice("Flow emergency mode over, back to normal... unsetting"
883  " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", "
884  "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32
885  "%% flows at the queue",
886  (uintmax_t)ts.tv_sec, (uintmax_t)ts.tv_usec,
887  spare_pool_len * 100 / flow_config.prealloc);
888 
889  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
890  }
891  }
892 
893  /* update work units */
894  const uint32_t pmp = mp;
895  mp = MemcapsGetPressure() * 100;
896  if (ftd->instance == 0) {
897  StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
898  StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
899  }
900  GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
901  if (pmp != mp) {
902  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
903  }
904 
905  next_run_ms = ts_ms + sleep_per_wu;
906  }
907  if (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec) {
908  if (ftd->instance == 0) {
913  other_last_sec = (uint32_t)ts.tv_sec;
914  }
915  }
916 
917  if (TmThreadsCheckFlag(th_v, THV_KILL)) {
918  StatsSyncCounters(th_v);
919  break;
920  }
921 
922  if (emerg || !time_is_live) {
923  usleep(250);
924  } else {
925  struct timeval cond_tv;
926  gettimeofday(&cond_tv, NULL);
927  struct timeval add_tv;
928  add_tv.tv_sec = 0;
929  add_tv.tv_usec = (sleep_per_wu * 1000);
930  timeradd(&cond_tv, &add_tv, &cond_tv);
931 
932  struct timespec cond_time = FROM_TIMEVAL(cond_tv);
934  while (1) {
935  int rc = SCCtrlCondTimedwait(
937  if (rc == ETIMEDOUT || rc < 0)
938  break;
939  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
940  break;
941  }
942  }
944  }
945 
946  SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
947 
949  }
950  return TM_ECODE_OK;
951 }
952 
953 /** \brief spawn the flow manager thread */
955 {
956  intmax_t setting = 1;
957  (void)ConfGetInt("flow.managers", &setting);
958 
959  if (setting < 1 || setting > 1024) {
961  "invalid flow.managers setting %"PRIdMAX, setting);
962  }
963  flowmgr_number = (uint32_t)setting;
964 
967 
968  SCLogConfig("using %u flow manager threads", flowmgr_number);
970 
971  for (uint32_t u = 0; u < flowmgr_number; u++) {
972  char name[TM_THREAD_NAME_MAX];
973  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
974 
975  ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
976  "FlowManager", 0);
977  BUG_ON(tv_flowmgr == NULL);
978 
979  if (tv_flowmgr == NULL) {
980  FatalError(SC_ERR_FATAL, "flow manager thread creation failed");
981  }
982  if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
983  FatalError(SC_ERR_FATAL, "flow manager thread spawn failed");
984  }
985  }
986  return;
987 }
988 
989 typedef struct FlowRecyclerThreadData_ {
991 
992  uint16_t counter_flows;
995 
1000 
1001 static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
1002 {
1004  if (ftd == NULL)
1005  return TM_ECODE_FAILED;
1006  if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
1007  SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
1008  SCFree(ftd);
1009  return TM_ECODE_FAILED;
1010  }
1011  SCLogDebug("output_thread_data %p", ftd->output_thread_data);
1012 
1013  ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t);
1014  ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t);
1015  ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t);
1016 
1017  ftd->counter_flow_active = StatsRegisterCounter("flow.active", t);
1018  ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t);
1019 
1020  FlowEndCountersRegister(t, &ftd->fec);
1021 
1022  *data = ftd;
1023  return TM_ECODE_OK;
1024 }
1025 
1026 static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
1027 {
1029 
1031  if (ftd->output_thread_data != NULL)
1033 
1034  SCFree(data);
1035  return TM_ECODE_OK;
1036 }
1037 
1038 static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
1039 {
1040  FLOWLOCK_WRLOCK(f);
1041 
1042  (void)OutputFlowLog(tv, ftd->output_thread_data, f);
1043 
1044  FlowEndCountersUpdate(tv, &ftd->fec, f);
1045  if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
1047  }
1049 
1050  FlowClearMemory(f, f->protomap);
1051  FLOWLOCK_UNLOCK(f);
1053 }
1054 
1055 /** \brief Thread that manages timed out flows.
1056  *
1057  * \param td ThreadVars casted to void ptr
1058  */
1059 static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
1060 {
1061  FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
1062  BUG_ON(ftd == NULL);
1063  const bool time_is_live = TimeModeIsLive();
1064  uint64_t recycled_cnt = 0;
1065  struct timeval ts;
1066  memset(&ts, 0, sizeof(ts));
1067 
1069 
1070  while (1)
1071  {
1072  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
1076  }
1077  SC_ATOMIC_ADD(flowrec_busy,1);
1079 
1080  StatsAddUI64(th_v, ftd->counter_queue_avg, list.len);
1081  StatsSetUI64(th_v, ftd->counter_queue_max, list.len);
1082 
1083  const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
1084 
1085  /* Get the time */
1086  memset(&ts, 0, sizeof(ts));
1087  TimeGet(&ts);
1088  SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
1089 
1090  Flow *f;
1091  while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
1092  Recycler(th_v, ftd, f);
1093  recycled_cnt++;
1094  StatsIncr(th_v, ftd->counter_flows);
1095  }
1096  SC_ATOMIC_SUB(flowrec_busy,1);
1097 
1098  if (bail) {
1099  break;
1100  }
1101 
1102  const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY);
1103  if (emerg || !time_is_live) {
1104  usleep(250);
1105  } else {
1106  struct timeval cond_tv;
1107  gettimeofday(&cond_tv, NULL);
1108  cond_tv.tv_sec += 1;
1109  struct timespec cond_time = FROM_TIMEVAL(cond_tv);
1111  while (1) {
1112  int rc = SCCtrlCondTimedwait(
1114  if (rc == ETIMEDOUT || rc < 0) {
1115  break;
1116  }
1117  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
1118  break;
1119  }
1120  if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) {
1121  break;
1122  }
1123  }
1125  }
1126 
1127  SCLogDebug("woke up...");
1128 
1130  }
1131  StatsSyncCounters(th_v);
1132  SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
1133  return TM_ECODE_OK;
1134 }
1135 
1136 static bool FlowRecyclerReadyToShutdown(void)
1137 {
1138  if (SC_ATOMIC_GET(flowrec_busy) != 0) {
1139  return false;
1140  }
1141  uint32_t len = 0;
1143  len = flow_recycle_q.qlen;
1145 
1146  return ((len == 0));
1147 }
1148 
1149 /** \brief spawn the flow recycler thread */
1151 {
1152  intmax_t setting = 1;
1153  (void)ConfGetInt("flow.recyclers", &setting);
1154 
1155  if (setting < 1 || setting > 1024) {
1157  "invalid flow.recyclers setting %"PRIdMAX, setting);
1158  }
1159  flowrec_number = (uint32_t)setting;
1160 
1163 
1164  SCLogConfig("using %u flow recycler threads", flowrec_number);
1165 
1166  for (uint32_t u = 0; u < flowrec_number; u++) {
1167  char name[TM_THREAD_NAME_MAX];
1168  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
1169 
1170  ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name,
1171  "FlowRecycler", 0);
1172 
1173  if (tv_flowrec == NULL) {
1174  FatalError(SC_ERR_FATAL, "flow recycler thread creation failed");
1175  }
1176  if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) {
1177  FatalError(SC_ERR_FATAL, "flow recycler thread spawn failed");
1178  }
1179  }
1180  return;
1181 }
1182 
1183 /**
1184  * \brief Used to disable flow recycler thread(s).
1185  *
1186  * \note this should only be called when the flow manager is already gone
1187  *
1188  * \todo Kinda hackish since it uses the tv name to identify flow recycler
1189  * thread. We need an all weather identification scheme.
1190  */
1192 {
1193  int cnt = 0;
1194 
1195  /* move all flows still in the hash to the recycler queue */
1196 #ifndef DEBUG
1197  (void)FlowCleanupHash();
1198 #else
1199  uint32_t flows = FlowCleanupHash();
1200  SCLogDebug("flows to progress: %u", flows);
1201 #endif
1202 
1203  /* make sure all flows are processed */
1204  do {
1206  usleep(10);
1207  } while (FlowRecyclerReadyToShutdown() == false);
1208 
1210  /* flow recycler thread(s) is/are a part of mgmt threads */
1211  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1212  if (strncasecmp(tv->name, thread_name_flow_rec,
1213  strlen(thread_name_flow_rec)) == 0)
1214  {
1216  cnt++;
1217  }
1218  }
1220 
1221  struct timeval start_ts;
1222  struct timeval cur_ts;
1223  gettimeofday(&start_ts, NULL);
1224 
1225 again:
1226  gettimeofday(&cur_ts, NULL);
1227  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1228  FatalError(SC_ERR_SHUTDOWN, "unable to get all flow recycler "
1229  "threads to shutdown in time");
1230  }
1231 
1233  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1234  if (strncasecmp(tv->name, thread_name_flow_rec,
1235  strlen(thread_name_flow_rec)) == 0)
1236  {
1240  /* sleep outside lock */
1241  SleepMsec(1);
1242  goto again;
1243  }
1244  }
1245  }
1247 
1248  /* reset count, so we can kill and respawn (unix socket) */
1249  SC_ATOMIC_SET(flowrec_cnt, 0);
1250  return;
1251 }
1252 
1254 {
1255  tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1256  tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1257  tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1258  tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1261  SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1262 
1263  SC_ATOMIC_INIT(flowmgr_cnt);
1264  SC_ATOMIC_INITPTR(flow_timeouts);
1265 }
1266 
1268 {
1269  tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1270  tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1271  tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1272  tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1275  SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1276 
1277  SC_ATOMIC_INIT(flowrec_cnt);
1278  SC_ATOMIC_INIT(flowrec_busy);
1279 }
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:67
FlowTimeoutCounters_::rows_empty
uint32_t rows_empty
Definition: flow-manager.c:121
FlowSparePoolUpdate
void FlowSparePoolUpdate(uint32_t size)
Definition: flow-spare-pool.c:171
util-byte.h
tm-threads.h
ConfGetInt
int ConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition: conf.c:393
FROM_TIMEVAL
#define FROM_TIMEVAL(timev)
intialize a 'struct timespec' from a 'struct timeval'.
Definition: util-time.h:34
OutputFlowLog
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition: output-flow.c:86
flow_manager_ctrl_mutex
SCCtrlMutex flow_manager_ctrl_mutex
Definition: flow-manager.c:96
FlowRecyclerThreadData_::fec
FlowEndCounters fec
Definition: flow-manager.c:998
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1639
app-layer-htp-range.h
FlowTimeoutCounters
Definition: flow-worker.c:59
FlowForceReassemblyForFlow
void FlowForceReassemblyForFlow(Flow *f)
Definition: flow-timeout.c:350
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1083
run_mode
int run_mode
Definition: suricata.c:164
FlowBucket_::evicted
Flow * evicted
Definition: flow-hash.h:48
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:167
FLOW_IS_IPV6
#define FLOW_IS_IPV6(f)
Definition: flow.h:161
FlowManagerThreadData_::max
uint32_t max
Definition: flow-manager.c:631
FlowManagerTimeoutThread
Definition: flow-manager.c:270
FlowSparePoolReturnFlow
void FlowSparePoolReturnFlow(Flow *f)
Definition: flow-spare-pool.c:100
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:82
FlowSpareGetPoolSize
uint32_t FlowSpareGetPoolSize(void)
Definition: flow-spare-pool.c:47
flow-util.h
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:315
FlowManagerThreadData_::cnt
FlowCounters cnt
Definition: flow-manager.c:633
FBLOCK_LOCK
#define FBLOCK_LOCK(fb)
Definition: flow-hash.h:73
FlowWakeupFlowRecyclerThread
#define FlowWakeupFlowRecyclerThread()
Definition: flow-manager.h:33
TMM_FLOWRECYCLER
@ TMM_FLOWRECYCLER
Definition: tm-threads-common.h:71
stream-tcp.h
GetFlowBypassInfoID
FlowStorageId GetFlowBypassInfoID(void)
Definition: flow-util.c:218
FlowBypassInfo_
Definition: flow.h:537
FlowCnf_::emergency_recovery
uint32_t emergency_recovery
Definition: flow.h:301
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:387
FlowCnf_::hash_size
uint32_t hash_size
Definition: flow.h:292
FlowManagerThreadData_::instance
uint32_t instance
Definition: flow-manager.c:629
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:296
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:98
StatsRegisterGlobalCounter
uint16_t StatsRegisterGlobalCounter(const char *name, uint64_t(*Func)(void))
Registers a counter, which represents a global value.
Definition: counters.c:1013
Flow_::proto
uint8_t proto
Definition: flow.h:378
SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt)
flow_manager_ctrl_cond
SCCtrlCondT flow_manager_ctrl_cond
Definition: flow-manager.c:95
threads.h
flow-private.h
FlowCounters_::flow_mgr_flows_notimeout
uint16_t flow_mgr_flows_notimeout
Definition: flow-manager.c:612
Flow_
Flow data structure.
Definition: flow.h:356
TYPE
#define TYPE
flow_recycler_ctrl_cond
SCCtrlCondT flow_recycler_ctrl_cond
Definition: flow-manager.c:97
Flow_::protomap
uint8_t protomap
Definition: flow.h:458
SC_ATOMIC_EXTERN
SC_ATOMIC_EXTERN(unsigned int, flow_flags)
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:333
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:83
FlowProtoTimeout_
Definition: flow.h:526
StatsSetUI64
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:210
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
flow-hash.h
TmModuleFlowRecyclerRegister
void TmModuleFlowRecyclerRegister(void)
Definition: flow-manager.c:1267
FlowTimeoutCounters_::flows_removed
uint32_t flows_removed
Definition: flow-manager.c:128
Flow_::use_cnt
FlowRefCount use_cnt
Definition: flow.h:386
FlowBypassInfo_::tosrcbytecnt
uint64_t tosrcbytecnt
Definition: flow.h:542
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
FlowGetMemuse
uint64_t FlowGetMemuse(void)
Definition: flow.c:137
MIN
#define MIN(x, y)
Definition: suricata-common.h:380
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:79
FlowTimeoutCounters_::flows_timeout
uint32_t flows_timeout
Definition: flow-manager.c:126
util-privs.h
defrag-timeout.h
FlowRecyclerThreadData_::counter_flows
uint16_t counter_flows
Definition: flow-manager.c:992
stream-tcp-reassemble.h
StatsSyncCountersIfSignalled
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:140
FlowCounters_::flow_mgr_flows_timeout_inuse
uint16_t flow_mgr_flows_timeout_inuse
Definition: flow-manager.c:614
LiveDevSubBypassStats
void LiveDevSubBypassStats(LiveDevice *dev, uint64_t cnt, int family)
Definition: util-device.c:563
MAX
#define MAX(x, y)
Definition: suricata-common.h:384
SC_ERR_SHUTDOWN
@ SC_ERR_SHUTDOWN
Definition: util-error.h:220
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:83
FlowQueuePrivate_::len
uint32_t len
Definition: flow-queue.h:44
Flow_::protoctx
void * protoctx
Definition: flow.h:454
FlowCounters_
Definition: flow-manager.c:603
FlowManagerTimeoutThread
struct FlowManagerTimeoutThread FlowManagerTimeoutThread
SC_ERR_INVALID_ARGUMENTS
@ SC_ERR_INVALID_ARGUMENTS
Definition: util-error.h:82
FlowCounters_::flow_mgr_flows_aside
uint16_t flow_mgr_flows_aside
Definition: flow-manager.c:615
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:90
FLOW_TIMEOUT_REASSEMBLY_DONE
#define FLOW_TIMEOUT_REASSEMBLY_DONE
Definition: flow.h:93
runmode-unix-socket.h
util-unittest.h
FlowTimeoutCounters_::bypassed_count
uint32_t bypassed_count
Definition: flow-manager.c:132
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:37
TM_THREAD_NAME_MAX
#define TM_THREAD_NAME_MAX
Definition: tm-threads.h:49
util-unittest-helper.h
FlowCnf_::prealloc
uint32_t prealloc
Definition: flow.h:294
FlowQueueTimeoutCounters::flows_removed
uint32_t flows_removed
Definition: flow-manager.c:599
FLOWLOCK_UNLOCK
#define FLOWLOCK_UNLOCK(fb)
Definition: flow.h:271
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:82
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:284
Flow_::flow_state
FlowStateType flow_state
Definition: flow.h:425
FQLOCK_LOCK
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:73
FlowDisableFlowRecyclerThread
void FlowDisableFlowRecyclerThread(void)
Used to disable flow recycler thread(s).
Definition: flow-manager.c:1191
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
FlowTimeoutCounters_::rows_maxlen
uint32_t rows_maxlen
Definition: flow-manager.c:122
FlowCounters_::flow_bypassed_bytes
uint16_t flow_bypassed_bytes
Definition: flow-manager.c:622
FlowTimeoutCounters_::flows_checked
uint32_t flows_checked
Definition: flow-manager.c:124
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:45
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:106
util-signal.h
SC_ERR_THREAD_INIT
@ SC_ERR_THREAD_INIT
Definition: util-error.h:79
flow-spare-pool.h
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
StatsDecr
void StatsDecr(ThreadVars *tv, uint16_t id)
Decrements the local counter.
Definition: counters.c:188
Flow_::fb
struct FlowBucket_ * fb
Definition: flow.h:501
FlowCounters_::flow_mgr_full_pass
uint16_t flow_mgr_full_pass
Definition: flow-manager.c:604
StatsRegisterMaxCounter
uint16_t StatsRegisterMaxCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the maximum of all the values assigned to it.
Definition: counters.c:995
SC_ATOMIC_MEMORY_ORDER_RELAXED
#define SC_ATOMIC_MEMORY_ORDER_RELAXED
Definition: util-atomic.h:166
decode.h
util-device.h
util-debug.h
FlowBypassInfo_::todstbytecnt
uint64_t todstbytecnt
Definition: flow.h:544
DefragTimeoutHash
uint32_t DefragTimeoutHash(struct timeval *ts)
time out tracker from the hash
Definition: defrag-timeout.c:120
FlowBypassInfo_::BypassUpdate
bool(* BypassUpdate)(Flow *f, void *data, time_t tsec)
Definition: flow.h:538
FLOW_PROTO_MAX
@ FLOW_PROTO_MAX
Definition: flow-private.h:76
FlowRecyclerThreadData_::counter_queue_avg
uint16_t counter_queue_avg
Definition: flow-manager.c:993
OutputFlowLogThreadInit
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void *initdata, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
Definition: output-flow.c:124
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
FlowCounters_::flow_mgr_rows_maxlen
uint16_t flow_mgr_rows_maxlen
Definition: flow-manager.c:618
FLOWLOCK_WRLOCK
#define FLOWLOCK_WRLOCK(fb)
Definition: flow.h:268
FlowTimeoutsReset
#define FlowTimeoutsReset()
Definition: flow-manager.h:35
FlowForceReassemblyNeedReassembly
int FlowForceReassemblyNeedReassembly(Flow *f)
Check if a flow needs forced reassembly, or any other processing.
Definition: flow-timeout.c:296
FlowDisableFlowManagerThread
void FlowDisableFlowManagerThread(void)
Used to disable flow manager thread(s).
Definition: flow-manager.c:143
SCCtrlCondT
#define SCCtrlCondT
Definition: threads-debug.h:382
detect.h
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1744
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
TimeModeIsReady
bool TimeModeIsReady(void)
Definition: util-time.c:92
Flow_::flow_end_flags
uint8_t flow_end_flags
Definition: flow.h:460
THV_KILL
#define THV_KILL
Definition: threadvars.h:39
MemcapsGetPressure
float MemcapsGetPressure(void)
Definition: runmode-unix-socket.c:134
FlowBypassInfo_::todstpktcnt
uint64_t todstpktcnt
Definition: flow.h:543
util-time.h
FlowQueuePrivateGetFromTop
Flow * FlowQueuePrivateGetFromTop(FlowQueuePrivate *fqc)
Definition: flow-queue.c:151
FlowBypassInfo_::bypass_data
void * bypass_data
Definition: flow.h:540
FlowQueueAppendPrivate
void FlowQueueAppendPrivate(FlowQueue *fq, FlowQueuePrivate *fqc)
Definition: flow-queue.c:119
FlowQueueTimeoutCounters
struct FlowQueueTimeoutCounters FlowQueueTimeoutCounters
FlowCounters_::flow_mgr_flows_aside_needs_work
uint16_t flow_mgr_flows_aside_needs_work
Definition: flow-manager.c:616
app-layer-parser.h
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
FlowRecyclerThreadData_
Definition: flow-manager.c:989
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:289
FLOW_IS_IPV4
#define FLOW_IS_IPV4(f)
Definition: flow.h:159
FlowRecyclerThreadData_::counter_tcp_active_sessions
uint16_t counter_tcp_active_sessions
Definition: flow-manager.c:997
FlowCounters_::flow_bypassed_cnt_clo
uint16_t flow_bypassed_cnt_clo
Definition: flow-manager.c:620
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:82
SC_ATOMIC_SUB
#define SC_ATOMIC_SUB(name, val)
sub a value from our atomic variable
Definition: util-atomic.h:342
flow_timeouts_emerg
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
Definition: flow.c:95
TimeModeIsLive
bool TimeModeIsLive(void)
Definition: util-time.c:111
stream.h
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:376
FlowTimeoutsEmergency
void FlowTimeoutsEmergency(void)
Definition: flow-manager.c:105
FlowTimeoutCounters
struct FlowTimeoutCounters_ FlowTimeoutCounters
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:33
FlowManagerThreadSpawn
void FlowManagerThreadSpawn()
spawn the flow manager thread
Definition: flow-manager.c:954
stream-tcp-private.h
conf.h
FlowRecyclerThreadData_::output_thread_data
void * output_thread_data
Definition: flow-manager.c:990
FlowTimeoutCounters_::rows_checked
uint32_t rows_checked
Definition: flow-manager.c:119
FBLOCK_UNLOCK
#define FBLOCK_UNLOCK(fb)
Definition: flow-hash.h:75
TmEcode
TmEcode
Definition: tm-threads-common.h:81
FlowClearMemory
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition: flow.c:1106
flow_timeouts_delta
FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]
Definition: flow.c:96
output-flow.h
detect-engine-state.h
Data structures and function prototypes for keeping state for the detection engine.
flow-timeout.h
flow-queue.h
TmModule_::name
const char * name
Definition: tm-modules.h:44
FlowRecyclerThreadData
struct FlowRecyclerThreadData_ FlowRecyclerThreadData
FlowBypassInfo_::tosrcpktcnt
uint64_t tosrcpktcnt
Definition: flow.h:541
host-timeout.h
SCCtrlCondTimedwait
#define SCCtrlCondTimedwait
Definition: threads-debug.h:385
FlowRecyclerThreadSpawn
void FlowRecyclerThreadSpawn()
spawn the flow recycler thread
Definition: flow-manager.c:1150
StreamTcpThreadCacheCleanup
void StreamTcpThreadCacheCleanup(void)
Definition: stream-tcp-cache.c:134
runmodes.h
FlowTimeoutCounters_::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-manager.c:130
FlowGetStorageById
void * FlowGetStorageById(const Flow *f, FlowStorageId id)
Definition: flow-storage.c:40
HttpRangeContainersTimeoutHash
uint32_t HttpRangeContainersTimeoutHash(struct timeval *ts)
Definition: app-layer-htp-range.c:190
Flow_::next
struct Flow_ * next
Definition: flow.h:407
FlowQueue_
Definition: flow-queue.h:49
FlowCounters_::flow_bypassed_pkts
uint16_t flow_bypassed_pkts
Definition: flow-manager.c:621
TMM_FLOWMANAGER
@ TMM_FLOWMANAGER
Definition: tm-threads-common.h:70
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:38
HostTimeoutHash
uint32_t HostTimeoutHash(struct timeval *ts)
time out hosts from the hash
Definition: host-timeout.c:156
flow_hash
FlowBucket * flow_hash
Definition: flow-hash.c:57
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:378
FlowQueueExtractPrivate
FlowQueuePrivate FlowQueueExtractPrivate(FlowQueue *fq)
Definition: flow-queue.c:140
flow-storage.h
FlowTimeoutCounters_
Definition: flow-manager.c:118
FlowManagerThreadData_
Definition: flow-manager.c:628
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flow-manager.h
suricata-common.h
SCCtrlMutex
#define SCCtrlMutex
Definition: threads-debug.h:373
FlowManagerThreadData
struct FlowManagerThreadData_ FlowManagerThreadData
flow_config
FlowConfig flow_config
Definition: flow.c:99
FlowQueueTimeoutCounters::flows_timeout
uint32_t flows_timeout
Definition: flow-manager.c:600
TmModuleFlowManagerRegister
void TmModuleFlowManagerRegister(void)
Definition: flow-manager.c:1253
FlowCounters_::memcap_pressure
uint16_t memcap_pressure
Definition: flow-manager.c:624
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:222
FlowTimeoutsInit
void FlowTimeoutsInit(void)
Definition: flow-manager.c:100
SC_ATOMIC_LOAD_EXPLICIT
#define SC_ATOMIC_LOAD_EXPLICIT(name, order)
Definition: util-atomic.h:379
SCLogError
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:255
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
FatalError
#define FatalError(x,...)
Definition: util-debug.h:530
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
Flow_::livedev
struct LiveDevice_ * livedev
Definition: flow.h:409
threadvars.h
util-validate.h
FlowQueuePrivate_
Definition: flow-queue.h:41
StatsAddUI64
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:146
SCLogConfig
struct SCLogConfig_ SCLogConfig
Holds the config state used by the logging api.
FlowRecyclerThreadData_::counter_queue_max
uint16_t counter_queue_max
Definition: flow-manager.c:994
FlowTimeoutCounters_::bypassed_bytes
uint64_t bypassed_bytes
Definition: flow-manager.c:134
FlowManagerThreadData_::min
uint32_t min
Definition: flow-manager.c:630
OutputFlowLogThreadDeinit
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
Definition: output-flow.c:166
flow_recycle_q
FlowQueue flow_recycle_q
Definition: flow-manager.c:81
SCFree
#define SCFree(p)
Definition: util-mem.h:61
FlowTimeoutCounters_::bypassed_pkts
uint64_t bypassed_pkts
Definition: flow-manager.c:133
SC_ERR_FATAL
@ SC_ERR_FATAL
Definition: util-error.h:203
FlowCounters_::memcap_pressure_max
uint16_t memcap_pressure_max
Definition: flow-manager.c:625
Flow_::flags
uint32_t flags
Definition: flow.h:434
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:318
FlowManagerThreadData_::timeout
FlowManagerTimeoutThread timeout
Definition: flow-manager.c:635
util-random.h
FlowCounters
struct FlowCounters_ FlowCounters
FlowCounters_::flow_emerg_mode_enter
uint16_t flow_emerg_mode_enter
Definition: flow-manager.c:608
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
TimeGet
void TimeGet(struct timeval *tv)
Definition: util-time.c:155
FLOW_END_FLAG_SHUTDOWN
#define FLOW_END_FLAG_SHUTDOWN
Definition: flow.h:243
FLOW_EMERGENCY
#define FLOW_EMERGENCY
Definition: flow-private.h:37
IPPairTimeoutHash
uint32_t IPPairTimeoutHash(struct timeval *ts)
time out ippairs from the hash
Definition: ippair-timeout.c:142
suricata.h
ippair-timeout.h
FlowRecyclerThreadData_::counter_flow_active
uint16_t counter_flow_active
Definition: flow-manager.c:996
FlowCounters_::flow_mgr_flows_timeout
uint16_t flow_mgr_flows_timeout
Definition: flow-manager.c:613
FlowManagerTimeoutThread::aside_queue
FlowQueuePrivate aside_queue
Definition: flow-manager.c:273
timeradd
#define timeradd(a, b, r)
Definition: util-time.h:60
FLOW_END_FLAG_TIMEOUT
#define FLOW_END_FLAG_TIMEOUT
Definition: flow.h:241
StatsSyncCounters
#define StatsSyncCounters(tv)
Definition: counters.h:137
FlowTimeoutCounters_::flows_notimeout
uint32_t flows_notimeout
Definition: flow-manager.c:125
flow_timeouts_normal
FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX]
Definition: flow.c:94
StatsRegisterAvgCounter
uint16_t StatsRegisterAvgCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the average of all the values assigned to it.
Definition: counters.c:975
FlowCounters_::flow_mgr_spare
uint16_t flow_mgr_spare
Definition: flow-manager.c:607
FlowTimeoutCounters_::flows_aside
uint32_t flows_aside
Definition: flow-manager.c:129
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:376
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:317
flow.h
FlowQueuePrivateAppendFlow
void FlowQueuePrivateAppendFlow(FlowQueuePrivate *fqc, Flow *f)
Definition: flow-queue.c:65
FlowEndCountersRegister
void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
Definition: flow-util.c:242
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:90
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:230
evicted
Flow * evicted
Definition: flow-hash.h:4
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:955
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
BITS
#define BITS
FlowTimeoutCounters_::flows_timeout_inuse
uint32_t flows_timeout_inuse
Definition: flow-manager.c:127
Flow_::timeout_at
uint32_t timeout_at
Definition: flow.h:402
FlowEndCounters_
Definition: flow-util.h:156
flow_recycler_ctrl_mutex
SCCtrlMutex flow_recycler_ctrl_mutex
Definition: flow-manager.c:98
flow-var.h
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:111
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:70
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:360
FlowCounters_::flow_emerg_mode_over
uint16_t flow_emerg_mode_over
Definition: flow-manager.c:609
TM_FLAG_MANAGEMENT_TM
#define TM_FLAG_MANAGEMENT_TM
Definition: tm-modules.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:161
FlowTimeoutCounters_::rows_skipped
uint32_t rows_skipped
Definition: flow-manager.c:120
FlowCounters_::flow_mgr_flows_checked
uint16_t flow_mgr_flows_checked
Definition: flow-manager.c:611
FlowCounters_::flow_mgr_rows_sec
uint16_t flow_mgr_rows_sec
Definition: flow-manager.c:605
FlowQueueTimeoutCounters
Definition: flow-manager.c:598
FlowTimeoutCounters::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-worker.c:60
FQLOCK_UNLOCK
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:75