suricata
flow-manager.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2024 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22  * \author Victor Julien <victor@inliniac.net>
23  */
24 
25 #include "suricata-common.h"
26 #include "conf.h"
27 #include "threadvars.h"
28 #include "tm-threads.h"
29 #include "runmodes.h"
30 
31 #include "util-time.h"
32 
33 #include "flow.h"
34 #include "flow-queue.h"
35 #include "flow-hash.h"
36 #include "flow-util.h"
37 #include "flow-private.h"
38 #include "flow-timeout.h"
39 #include "flow-manager.h"
40 #include "flow-storage.h"
41 #include "flow-spare-pool.h"
42 
43 #include "stream-tcp.h"
44 
45 #include "util-device.h"
46 
47 #include "util-debug.h"
48 
49 #include "threads.h"
51 
52 #include "host-timeout.h"
53 #include "defrag-hash.h"
54 #include "defrag-timeout.h"
55 #include "ippair-timeout.h"
56 #include "app-layer-htp-range.h"
57 
58 #include "output-flow.h"
59 
60 #include "runmode-unix-socket.h"
61 
62 /** queue to pass flows to cleanup/log thread(s) */
64 
65 /* multi flow manager support */
66 static uint32_t flowmgr_number = 1;
67 /* atomic counter for flow managers, to assign instance id */
68 SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
69 
70 /* multi flow recycler support */
71 static uint32_t flowrec_number = 1;
72 /* atomic counter for flow recyclers, to assign instance id */
73 SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
74 SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
75 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
76 
77 static SCCtrlCondT flow_manager_ctrl_cond = PTHREAD_COND_INITIALIZER;
78 static SCCtrlMutex flow_manager_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
79 static SCCtrlCondT flow_recycler_ctrl_cond = PTHREAD_COND_INITIALIZER;
80 static SCCtrlMutex flow_recycler_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
81 
83 {
84  SCCtrlMutexLock(&flow_manager_ctrl_mutex);
85  SCCtrlCondSignal(&flow_manager_ctrl_cond);
86  SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
87 }
88 
90 {
91  SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
92  SCCtrlCondSignal(&flow_recycler_ctrl_cond);
93  SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
94 }
95 
96 void FlowTimeoutsInit(void)
97 {
98  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
99 }
100 
102 {
103  SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
104 }
105 
106 typedef struct FlowTimeoutCounters_ {
107  uint32_t rows_checked;
108  uint32_t rows_skipped;
109  uint32_t rows_empty;
110  uint32_t rows_maxlen;
111 
112  uint32_t flows_checked;
113  uint32_t flows_notimeout;
114  uint32_t flows_timeout;
115  uint32_t flows_removed;
116  uint32_t flows_aside;
118 
119  uint32_t bypassed_count;
120  uint64_t bypassed_pkts;
121  uint64_t bypassed_bytes;
123 
124 /**
125  * \brief Used to disable flow manager thread(s).
126  *
127  * \todo Kinda hackish since it uses the tv name to identify flow manager
128  * thread. We need an all weather identification scheme.
129  */
131 {
133  /* flow manager thread(s) is/are a part of mgmt threads */
134  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
135  if (strncasecmp(tv->name, thread_name_flow_mgr,
136  strlen(thread_name_flow_mgr)) == 0)
137  {
139  }
140  }
142 
143  struct timeval start_ts;
144  struct timeval cur_ts;
145  gettimeofday(&start_ts, NULL);
146 
147 again:
148  gettimeofday(&cur_ts, NULL);
149  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
150  FatalError("unable to get all flow manager "
151  "threads to shutdown in time");
152  }
153 
155  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
156  if (strncasecmp(tv->name, thread_name_flow_mgr,
157  strlen(thread_name_flow_mgr)) == 0)
158  {
161  /* sleep outside lock */
162  SleepMsec(1);
163  goto again;
164  }
165  }
166  }
168 
169  /* reset count, so we can kill and respawn (unix socket) */
170  SC_ATOMIC_SET(flowmgr_cnt, 0);
171 }
172 
173 /** \internal
174  * \brief check if a flow is timed out
175  *
176  * \param f flow
177  * \param ts timestamp
178  *
179  * \retval false not timed out
180  * \retval true timed out
181  */
182 static bool FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
183 {
184  uint32_t flow_times_out_at = f->timeout_at;
185  if (emerg) {
187  flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
188  }
189  if (*next_ts == 0 || flow_times_out_at < *next_ts)
190  *next_ts = flow_times_out_at;
191 
192  /* do the timeout check */
193  if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) {
194  return false;
195  }
196 
197  return true;
198 }
199 
200 #ifdef CAPTURE_OFFLOAD
201 /** \internal
202  * \brief check timeout of captured bypassed flow by querying capture method
203  *
204  * \param f Flow
205  * \param ts timestamp
206  * \param counters Flow timeout counters
207  *
208  * \retval false not timeout
209  * \retval true timeout (or not capture bypassed)
210  */
211 static inline bool FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
212 {
213  if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
214  return true;
215  }
216 
218  if (fc && fc->BypassUpdate) {
219  /* flow will be possibly updated */
220  uint64_t pkts_tosrc = fc->tosrcpktcnt;
221  uint64_t bytes_tosrc = fc->tosrcbytecnt;
222  uint64_t pkts_todst = fc->todstpktcnt;
223  uint64_t bytes_todst = fc->todstbytecnt;
224  bool update = fc->BypassUpdate(f, fc->bypass_data, SCTIME_SECS(ts));
225  if (update) {
226  SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f));
227  pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
228  bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
229  pkts_todst = fc->todstpktcnt - pkts_todst;
230  bytes_todst = fc->todstbytecnt - bytes_todst;
231  if (f->livedev) {
232  SC_ATOMIC_ADD(f->livedev->bypassed,
233  pkts_tosrc + pkts_todst);
234  }
235  counters->bypassed_pkts += pkts_tosrc + pkts_todst;
236  counters->bypassed_bytes += bytes_tosrc + bytes_todst;
237  return false;
238  }
239  SCLogDebug("No new packet, dead flow %" PRId64 "", FlowGetId(f));
240  if (f->livedev) {
241  if (FLOW_IS_IPV4(f)) {
242  LiveDevSubBypassStats(f->livedev, 1, AF_INET);
243  } else if (FLOW_IS_IPV6(f)) {
244  LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
245  }
246  }
247  counters->bypassed_count++;
248  }
249  return true;
250 }
251 #endif /* CAPTURE_OFFLOAD */
252 
253 typedef struct FlowManagerTimeoutThread {
254  /* used to temporarily store flows that have timed out and are
255  * removed from the hash to reduce locking contention */
258 
259 /**
260  * \internal
261  *
262  * \brief Process the temporary Aside Queue
263  * This means that as long as a flow f is not waiting on detection
264  * engine to finish dealing with it, f will be put in the recycle
265  * queue for further processing later on.
266  *
267  * \param td FM Timeout Thread instance
268  * \param counters Flow Timeout counters to be updated
269  *
270  * \retval Number of flows that were recycled
271  */
272 static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
273 {
274  FlowQueuePrivate recycle = { NULL, NULL, 0 };
275  counters->flows_aside += td->aside_queue.len;
276 
277  uint32_t cnt = 0;
278  Flow *f;
279  while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
280  /* flow is still locked */
281 
282  if (f->proto == IPPROTO_TCP &&
284  !FlowIsBypassed(f) && FlowNeedsReassembly(f)) {
285  /* Send the flow to its thread */
287  FLOWLOCK_UNLOCK(f);
288  /* flow ownership is already passed to the worker thread */
289 
290  counters->flows_aside_needs_work++;
291  continue;
292  }
293  FLOWLOCK_UNLOCK(f);
294 
295  FlowQueuePrivateAppendFlow(&recycle, f);
296  if (recycle.len == 100) {
299  }
300  cnt++;
301  }
302  if (recycle.len) {
305  }
306  return cnt;
307 }
308 
309 /**
310  * \internal
311  *
312  * \brief check all flows in a hash row for timing out
313  *
314  * \param f last flow in the hash row
315  * \param ts timestamp
316  * \param emergency bool indicating emergency mode
317  * \param counters ptr to FlowTimeoutCounters structure
318  */
319 static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts,
320  int emergency, FlowTimeoutCounters *counters, uint32_t *next_ts)
321 {
322  uint32_t checked = 0;
323  Flow *prev_f = NULL;
324 
325  do {
326  checked++;
327 
328  /* check flow timeout based on lastts and state. Both can be
329  * accessed w/o Flow lock as we do have the hash row lock (so flow
330  * can't disappear) and flow_state is atomic. lastts can only
331  * be modified when we have both the flow and hash row lock */
332 
333  /* timeout logic goes here */
334  if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == false) {
335  counters->flows_notimeout++;
336 
337  prev_f = f;
338  f = f->next;
339  continue;
340  }
341 
342  FLOWLOCK_WRLOCK(f);
343 
344  Flow *next_flow = f->next;
345 
346 #ifdef CAPTURE_OFFLOAD
347  /* never prune a flow that is used by a packet we
348  * are currently processing in one of the threads */
349  if (!FlowBypassedTimeout(f, ts, counters)) {
350  FLOWLOCK_UNLOCK(f);
351  prev_f = f;
352  f = f->next;
353  continue;
354  }
355 #endif
357 
358  counters->flows_timeout++;
359 
360  RemoveFromHash(f, prev_f);
361 
363  /* flow is still locked in the queue */
364 
365  f = next_flow;
366  } while (f != NULL);
367 
368  counters->flows_checked += checked;
369  if (checked > counters->rows_maxlen)
370  counters->rows_maxlen = checked;
371 }
372 
373 /**
374  * \internal
375  *
376  * \brief Clear evicted list from Flow Manager.
377  * All the evicted flows are removed from the Flow bucket and added
378  * to the temporary Aside Queue.
379  *
380  * \param td FM timeout thread instance
381  * \param f head of the evicted list
382  */
383 static void FlowManagerHashRowClearEvictedList(FlowManagerTimeoutThread *td, Flow *f)
384 {
385  do {
386  FLOWLOCK_WRLOCK(f);
387  Flow *next_flow = f->next;
388  f->next = NULL;
389  f->fb = NULL;
390 
392  /* flow is still locked in the queue */
393 
394  f = next_flow;
395  } while (f != NULL);
396 }
397 
398 /**
399  * \brief time out flows from the hash
400  *
401  * \param ts timestamp
402  * \param hash_min min hash index to consider
403  * \param hash_max max hash index to consider
404  * \param counters ptr to FlowTimeoutCounters structure
405  *
406  * \retval cnt number of timed out flow
407  */
408 static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const uint32_t hash_min,
409  const uint32_t hash_max, FlowTimeoutCounters *counters)
410 {
411  uint32_t cnt = 0;
412  const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
413  const uint32_t rows_checked = hash_max - hash_min;
414  uint32_t rows_skipped = 0;
415  uint32_t rows_empty = 0;
416 
417 #if __WORDSIZE==64
418 #define BITS 64
419 #define TYPE uint64_t
420 #else
421 #define BITS 32
422 #define TYPE uint32_t
423 #endif
424 
425  const uint32_t ts_secs = (uint32_t)SCTIME_SECS(ts);
426  for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
427  TYPE check_bits = 0;
428  const uint32_t check = MIN(BITS, (hash_max - idx));
429  for (uint32_t i = 0; i < check; i++) {
430  FlowBucket *fb = &flow_hash[idx+i];
431  check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(
432  fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs)
433  << (TYPE)i;
434  }
435  if (check_bits == 0)
436  continue;
437 
438  for (uint32_t i = 0; i < check; i++) {
439  FlowBucket *fb = &flow_hash[idx+i];
440  if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) {
441  FBLOCK_LOCK(fb);
442  Flow *evicted = NULL;
443  if (fb->evicted != NULL || fb->head != NULL) {
444  if (fb->evicted != NULL) {
445  /* transfer out of bucket so we can do additional work outside
446  * of the bucket lock */
447  evicted = fb->evicted;
448  fb->evicted = NULL;
449  }
450  if (fb->head != NULL) {
451  uint32_t next_ts = 0;
452  FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
453 
454  if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
455  SC_ATOMIC_SET(fb->next_ts, next_ts);
456  }
457  if (fb->evicted == NULL && fb->head == NULL) {
458  /* row is empty */
459  SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
460  }
461  } else {
462  SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
463  rows_empty++;
464  }
465  FBLOCK_UNLOCK(fb);
466  /* processed evicted list */
467  if (evicted) {
468  FlowManagerHashRowClearEvictedList(td, evicted);
469  }
470  } else {
471  rows_skipped++;
472  }
473  }
474  if (td->aside_queue.len) {
475  cnt += ProcessAsideQueue(td, counters);
476  }
477  }
478 
479  counters->rows_checked += rows_checked;
480  counters->rows_skipped += rows_skipped;
481  counters->rows_empty += rows_empty;
482 
483  if (td->aside_queue.len) {
484  cnt += ProcessAsideQueue(td, counters);
485  }
486  counters->flows_removed += cnt;
487  /* coverity[missing_unlock : FALSE] */
488  return cnt;
489 }
490 
491 /** \internal
492  *
493  * \brief handle timeout for a slice of hash rows
494  * If we wrap around we call FlowTimeoutHash twice
495  * \param td FM timeout thread
496  * \param ts timeout in seconds
497  * \param hash_min lower bound of the row slice
498  * \param hash_max upper bound of the row slice
499  * \param counters Flow timeout counters to be passed
500  * \param rows number of rows for this worker unit
501  * \param pos position of the beginning of row slice in the hash table
502  *
503  * \retval number of successfully timed out flows
504  */
505 static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts,
506  const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
507  const uint32_t rows, uint32_t *pos)
508 {
509  uint32_t start = 0;
510  uint32_t end = 0;
511  uint32_t cnt = 0;
512  uint32_t rows_left = rows;
513 
514 again:
515  start = hash_min + (*pos);
516  if (start >= hash_max) {
517  start = hash_min;
518  }
519  end = start + rows_left;
520  if (end > hash_max) {
521  end = hash_max;
522  }
523  *pos = (end == hash_max) ? hash_min : end;
524  rows_left = rows_left - (end - start);
525 
526  cnt += FlowTimeoutHash(td, ts, start, end, counters);
527  if (rows_left) {
528  goto again;
529  }
530  return cnt;
531 }
532 
533 /**
534  * \internal
535  *
536  * \brief move all flows out of a hash row
537  *
538  * \param f last flow in the hash row
539  * \param recycle_q Flow recycle queue
540  * \param mode emergency or not
541  *
542  * \retval cnt number of flows removed from the hash and added to the recycle queue
543  */
544 static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
545 {
546  uint32_t cnt = 0;
547 
548  do {
549  FLOWLOCK_WRLOCK(f);
550 
551  Flow *next_flow = f->next;
552 
553  /* remove from the hash */
554  if (mode == 0) {
555  RemoveFromHash(f, NULL);
556  } else {
557  FlowBucket *fb = f->fb;
558  fb->evicted = f->next;
559  f->next = NULL;
560  f->fb = NULL;
561  }
563 
564  /* no one is referring to this flow, removed from hash
565  * so we can unlock it and move it to the recycle queue. */
566  FLOWLOCK_UNLOCK(f);
567  FlowQueuePrivateAppendFlow(recycle_q, f);
568 
569  cnt++;
570 
571  f = next_flow;
572  } while (f != NULL);
573 
574  return cnt;
575 }
576 
577 #define RECYCLE_MAX_QUEUE_ITEMS 25
578 /**
579  * \brief remove all flows from the hash
580  *
581  * \retval cnt number of removes out flows
582  */
583 static uint32_t FlowCleanupHash(void)
584 {
585  FlowQueuePrivate local_queue = { NULL, NULL, 0 };
586  uint32_t cnt = 0;
587 
588  for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
589  FlowBucket *fb = &flow_hash[idx];
590 
591  FBLOCK_LOCK(fb);
592 
593  if (fb->head != NULL) {
594  /* we have a flow, or more than one */
595  cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
596  }
597  if (fb->evicted != NULL) {
598  /* we have a flow, or more than one */
599  cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
600  }
601 
602  FBLOCK_UNLOCK(fb);
603  if (local_queue.len >= RECYCLE_MAX_QUEUE_ITEMS) {
604  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
606  }
607  }
609  FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
611 
612  return cnt;
613 }
614 
615 typedef struct FlowCounters_ {
618 
619  uint16_t flow_mgr_spare;
622 
628 
630 
634 
635  uint16_t memcap_pressure;
638 
639 typedef struct FlowManagerThreadData_ {
640  uint32_t instance;
641  uint32_t min;
642  uint32_t max;
643 
645 
650 
651 static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
652 {
653  fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t);
654  fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", t);
655 
656  fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
657  fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
658  fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
659 
660  fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t);
661  fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t);
662  fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t);
663  fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t);
664  fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t);
665  fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t);
666 
667  fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
668  fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
669  fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
670 
671  fc->memcap_pressure = StatsRegisterCounter("memcap.pressure", t);
672  fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap.pressure_max", t);
673 }
674 
675 static void FlowCountersUpdate(
676  ThreadVars *th_v, const FlowManagerThreadData *ftd, const FlowTimeoutCounters *counters)
677 {
678  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters->flows_checked);
679  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters->flows_notimeout);
680 
681  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters->flows_timeout);
682  StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters->flows_aside);
684  (uint64_t)counters->flows_aside_needs_work);
685 
686  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters->bypassed_count);
687  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters->bypassed_pkts);
688  StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters->bypassed_bytes);
689 
690  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters->rows_maxlen);
691 }
692 
693 static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
694 {
696  if (ftd == NULL)
697  return TM_ECODE_FAILED;
698 
699  ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
700  SCLogDebug("flow manager instance %u", ftd->instance);
701 
702  /* set the min and max value used for hash row walking
703  * each thread has it's own section of the flow hash */
704  uint32_t range = flow_config.hash_size / flowmgr_number;
705 
706  ftd->min = ftd->instance * range;
707  ftd->max = (ftd->instance + 1) * range;
708 
709  /* last flow-manager takes on hash_size % flowmgr_number extra rows */
710  if ((ftd->instance + 1) == flowmgr_number) {
711  ftd->max = flow_config.hash_size;
712  }
714 
715  SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
716 
717  /* pass thread data back to caller */
718  *data = ftd;
719 
720  FlowCountersInit(t, &ftd->cnt);
721  ftd->counter_defrag_timeout = StatsRegisterCounter("defrag.mgr.tracker_timeout", t);
722  ftd->counter_defrag_memuse = StatsRegisterCounter("defrag.memuse", t);
723 
724  PacketPoolInit();
725  return TM_ECODE_OK;
726 }
727 
728 static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
729 {
732  SCFree(data);
733  return TM_ECODE_OK;
734 }
735 
736 /** \internal
737  * \brief calculate number of rows to scan and how much time to sleep
738  * based on the busy score `mp` (0 idle, 100 max busy).
739  *
740  * We try to to make sure we scan the hash once a second. The number size
741  * of the slice of the hash scanned is determined by our busy score 'mp'.
742  * We sleep for the remainder of the second after processing the slice,
743  * or at least an approximation of it.
744  * A minimum busy score of 10 is assumed to avoid a longer than 10 second
745  * full hash pass. This is to avoid burstiness in scanning when there is
746  * a rapid increase of the busy score, which could lead to the flow manager
747  * suddenly scanning a much larger slice of the hash leading to a burst
748  * in scan/eviction work.
749  *
750  * \param rows number of rows for the work unit
751  * \param mp current memcap pressure value
752  * \param emergency emergency mode is set or not
753  * \param wu_sleep holds value of sleep time per worker unit
754  * \param wu_rows holds value of calculated rows to be processed per second
755  * \param rows_sec same as wu_rows, only used for counter updates
756  */
757 static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
758  uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
759 {
760  if (emergency) {
761  *wu_rows = rows;
762  *wu_sleep = 250;
763  return;
764  }
765  /* minimum busy score is 10 */
766  const uint32_t emp = MAX(mp, 10);
767  const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));
768  /* calc how much time we estimate the work will take, in ms. We assume
769  * each row takes an average of 1usec. Maxing out at 1sec. */
770  const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);
771  /* calc how much time we need to sleep to get to the per second cadence
772  * but sleeping for at least 250ms. */
773  const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
774  SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
775  sleep_per_unit);
776 
777  *wu_sleep = sleep_per_unit;
778  *wu_rows = rows_per_sec;
779  *rows_sec = rows_per_sec;
780 }
781 
782 /** \brief Thread that manages the flow table and times out flows.
783  *
784  * \param td ThreadVars cast to void ptr
785  *
786  * Keeps an eye on the spare list, alloc flows if needed...
787  */
788 static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
789 {
790  FlowManagerThreadData *ftd = thread_data;
791  const uint32_t rows = ftd->max - ftd->min;
792  const bool time_is_live = TimeModeIsLive();
793 
794  uint32_t emerg_over_cnt = 0;
795  uint64_t next_run_ms = 0;
796  uint32_t pos = 0;
797  uint32_t rows_sec = 0;
798  uint32_t rows_per_wu = 0;
799  uint64_t sleep_per_wu = 0;
800  bool prev_emerg = false;
801  uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
802  SCTime_t ts;
803 
804  /* don't start our activities until time is setup */
805  while (!TimeModeIsReady()) {
806  if (suricata_ctl_flags != 0)
807  return TM_ECODE_OK;
808  usleep(10);
809  }
810 
811  uint32_t mp = MemcapsGetPressure() * 100;
812  if (ftd->instance == 0) {
813  StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
814  StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
815  }
816  GetWorkUnitSizing(rows, mp, false, &sleep_per_wu, &rows_per_wu, &rows_sec);
817  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
818 
820 
821  while (1)
822  {
823  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
827  }
828 
829  bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);
830 
831  /* Get the time */
832  ts = TimeGet();
833  SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts));
834  uint64_t ts_ms = SCTIME_MSECS(ts);
835  const bool emerge_p = (emerg && !prev_emerg);
836  if (emerge_p) {
837  next_run_ms = 0;
838  prev_emerg = true;
839  SCLogNotice("Flow emergency mode entered...");
840  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter);
841  }
842  if (ts_ms >= next_run_ms) {
843  if (ftd->instance == 0) {
844  const uint32_t sq_len = FlowSpareGetPoolSize();
845  const uint32_t spare_perc = sq_len * 100 / MAX(flow_config.prealloc, 1);
846  /* see if we still have enough spare flows */
847  if (spare_perc < 90 || spare_perc > 110) {
848  FlowSparePoolUpdate(sq_len);
849  }
850  }
851 
852  /* try to time out flows */
853  // clang-format off
854  FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, };
855  // clang-format on
856 
857  if (emerg) {
858  /* in emergency mode, do a full pass of the hash table */
859  FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters);
860  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
861  } else {
862  SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,
863  rows_per_wu);
864 
865  const uint32_t ppos = pos;
866  FlowTimeoutHashInChunks(
867  &ftd->timeout, ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos);
868  if (ppos > pos) {
869  StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
870  }
871  }
872 
873  const uint32_t spare_pool_len = FlowSpareGetPoolSize();
874  StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len);
875 
876  FlowCountersUpdate(th_v, ftd, &counters);
877 
878  if (emerg == true) {
879  SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32
880  "flow_spare_q status: %" PRIu32 "%% flows at the queue",
881  spare_pool_len, flow_config.prealloc,
882  spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
883 
884  /* only if we have pruned this "emergency_recovery" percentage
885  * of flows, we will unset the emergency bit */
886  if ((spare_pool_len * 100 / MAX(flow_config.prealloc, 1)) >
888  emerg_over_cnt++;
889  } else {
890  emerg_over_cnt = 0;
891  }
892 
893  if (emerg_over_cnt >= 30) {
894  SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
896 
897  emerg = false;
898  prev_emerg = false;
899  emerg_over_cnt = 0;
900  SCLogNotice("Flow emergency mode over, back to normal... unsetting"
901  " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", "
902  "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32
903  "%% flows at the queue",
904  (uintmax_t)SCTIME_SECS(ts), (uintmax_t)SCTIME_USECS(ts),
905  spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
906 
907  StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
908  }
909  }
910 
911  /* update work units */
912  const uint32_t pmp = mp;
913  mp = MemcapsGetPressure() * 100;
914  if (ftd->instance == 0) {
915  StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
916  StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
917  }
918  GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
919  if (pmp != mp) {
920  StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
921  }
922 
923  next_run_ms = ts_ms + sleep_per_wu;
924  }
925  if (other_last_sec == 0 || other_last_sec < (uint32_t)SCTIME_SECS(ts)) {
926  if (ftd->instance == 0) {
928  uint32_t defrag_cnt = DefragTimeoutHash(ts);
929  if (defrag_cnt) {
930  StatsAddUI64(th_v, ftd->counter_defrag_timeout, defrag_cnt);
931  }
936  other_last_sec = (uint32_t)SCTIME_SECS(ts);
937  }
938  }
939 
940  if (TmThreadsCheckFlag(th_v, THV_KILL)) {
941  StatsSyncCounters(th_v);
942  break;
943  }
944 
945  if (emerg || !time_is_live) {
946  usleep(250);
947  } else {
948  struct timeval cond_tv;
949  gettimeofday(&cond_tv, NULL);
950  struct timeval add_tv;
951  add_tv.tv_sec = sleep_per_wu / 1000;
952  add_tv.tv_usec = (sleep_per_wu % 1000) * 1000;
953  timeradd(&cond_tv, &add_tv, &cond_tv);
954 
955  struct timespec cond_time = FROM_TIMEVAL(cond_tv);
956  SCCtrlMutexLock(&flow_manager_ctrl_mutex);
957  while (1) {
958  int rc = SCCtrlCondTimedwait(
959  &flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time);
960  if (rc == ETIMEDOUT || rc < 0)
961  break;
962  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
963  break;
964  }
965  }
966  SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
967  }
968 
969  SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
970 
972  }
973  return TM_ECODE_OK;
974 }
975 
976 /** \brief spawn the flow manager thread */
978 {
979  intmax_t setting = 1;
980  (void)ConfGetInt("flow.managers", &setting);
981 
982  if (setting < 1 || setting > 1024) {
983  FatalError("invalid flow.managers setting %" PRIdMAX, setting);
984  }
985  flowmgr_number = (uint32_t)setting;
986 
987  SCLogConfig("using %u flow manager threads", flowmgr_number);
989 
990  for (uint32_t u = 0; u < flowmgr_number; u++) {
991  char name[TM_THREAD_NAME_MAX];
992  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
993 
994  ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
995  "FlowManager", 0);
996  BUG_ON(tv_flowmgr == NULL);
997 
998  if (tv_flowmgr == NULL) {
999  FatalError("flow manager thread creation failed");
1000  }
1001  if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
1002  FatalError("flow manager thread spawn failed");
1003  }
1004  }
1005 }
1006 
1007 typedef struct FlowRecyclerThreadData_ {
1009 
1010  uint16_t counter_flows;
1013 
1018 
1019 static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
1020 {
1022  if (ftd == NULL)
1023  return TM_ECODE_FAILED;
1025  SCLogError("initializing flow log API for thread failed");
1026  SCFree(ftd);
1027  return TM_ECODE_FAILED;
1028  }
1029  SCLogDebug("output_thread_data %p", ftd->output_thread_data);
1030 
1031  ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t);
1032  ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t);
1033  ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t);
1034 
1035  ftd->counter_flow_active = StatsRegisterCounter("flow.active", t);
1036  ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t);
1037 
1038  FlowEndCountersRegister(t, &ftd->fec);
1039 
1040  *data = ftd;
1041  return TM_ECODE_OK;
1042 }
1043 
1044 static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
1045 {
1047 
1049  if (ftd->output_thread_data != NULL)
1051 
1052  SCFree(data);
1053  return TM_ECODE_OK;
1054 }
1055 
1056 static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
1057 {
1058  FLOWLOCK_WRLOCK(f);
1059 
1060  (void)OutputFlowLog(tv, ftd->output_thread_data, f);
1061 
1062  FlowEndCountersUpdate(tv, &ftd->fec, f);
1063  if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
1065  }
1067 
1068  FlowClearMemory(f, f->protomap);
1069  FLOWLOCK_UNLOCK(f);
1070 }
1071 
1072 /** \brief Thread that manages timed out flows.
1073  *
1074  * \param td ThreadVars cast to void ptr
1075  */
1076 static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
1077 {
1078  FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
1079  BUG_ON(ftd == NULL);
1080  const bool time_is_live = TimeModeIsLive();
1081  uint64_t recycled_cnt = 0;
1082  FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
1083 
1085 
1086  while (1)
1087  {
1088  if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
1092  }
1093  SC_ATOMIC_ADD(flowrec_busy,1);
1095 
1096  StatsAddUI64(th_v, ftd->counter_queue_avg, list.len);
1097  StatsSetUI64(th_v, ftd->counter_queue_max, list.len);
1098 
1099  const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
1100 
1101  /* Get the time */
1102  SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(TimeGet()));
1103 
1104  uint64_t cnt = 0;
1105  Flow *f;
1106  while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
1107  Recycler(th_v, ftd, f);
1108  cnt++;
1109 
1110  /* for every full sized block, add it to the spare pool */
1111  FlowQueuePrivateAppendFlow(&ret_queue, f);
1112  if (ret_queue.len == FLOW_SPARE_POOL_BLOCK_SIZE) {
1113  FlowSparePoolReturnFlows(&ret_queue);
1114  }
1115  }
1116  if (ret_queue.len > 0) {
1117  FlowSparePoolReturnFlows(&ret_queue);
1118  }
1119  if (cnt > 0) {
1120  recycled_cnt += cnt;
1121  StatsAddUI64(th_v, ftd->counter_flows, cnt);
1122  }
1123  SC_ATOMIC_SUB(flowrec_busy,1);
1124 
1125  if (bail) {
1126  break;
1127  }
1128 
1129  const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY);
1130  if (emerg || !time_is_live) {
1131  usleep(250);
1132  } else {
1133  struct timeval cond_tv;
1134  gettimeofday(&cond_tv, NULL);
1135  cond_tv.tv_sec += 1;
1136  struct timespec cond_time = FROM_TIMEVAL(cond_tv);
1137  SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
1138  while (1) {
1139  int rc = SCCtrlCondTimedwait(
1140  &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time);
1141  if (rc == ETIMEDOUT || rc < 0) {
1142  break;
1143  }
1144  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
1145  break;
1146  }
1147  if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) {
1148  break;
1149  }
1150  }
1151  SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
1152  }
1153 
1154  SCLogDebug("woke up...");
1155 
1157  }
1158  StatsSyncCounters(th_v);
1159  SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
1160  return TM_ECODE_OK;
1161 }
1162 
1163 static bool FlowRecyclerReadyToShutdown(void)
1164 {
1165  if (SC_ATOMIC_GET(flowrec_busy) != 0) {
1166  return false;
1167  }
1168  uint32_t len = 0;
1170  len = flow_recycle_q.qlen;
1172 
1173  return ((len == 0));
1174 }
1175 
1176 /** \brief spawn the flow recycler thread */
1178 {
1179  intmax_t setting = 1;
1180  (void)ConfGetInt("flow.recyclers", &setting);
1181 
1182  if (setting < 1 || setting > 1024) {
1183  FatalError("invalid flow.recyclers setting %" PRIdMAX, setting);
1184  }
1185  flowrec_number = (uint32_t)setting;
1186 
1187  SCLogConfig("using %u flow recycler threads", flowrec_number);
1188 
1189  for (uint32_t u = 0; u < flowrec_number; u++) {
1190  char name[TM_THREAD_NAME_MAX];
1191  snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
1192 
1193  ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name,
1194  "FlowRecycler", 0);
1195 
1196  if (tv_flowrec == NULL) {
1197  FatalError("flow recycler thread creation failed");
1198  }
1199  if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) {
1200  FatalError("flow recycler thread spawn failed");
1201  }
1202  }
1203 }
1204 
1205 /**
1206  * \brief Used to disable flow recycler thread(s).
1207  *
1208  * \note this should only be called when the flow manager is already gone
1209  *
1210  * \todo Kinda hackish since it uses the tv name to identify flow recycler
1211  * thread. We need an all weather identification scheme.
1212  */
1214 {
1215  /* move all flows still in the hash to the recycler queue */
1216 #ifndef DEBUG
1217  (void)FlowCleanupHash();
1218 #else
1219  uint32_t flows = FlowCleanupHash();
1220  SCLogDebug("flows to progress: %u", flows);
1221 #endif
1222 
1223  /* make sure all flows are processed */
1224  do {
1226  usleep(10);
1227  } while (FlowRecyclerReadyToShutdown() == false);
1228 
1230  /* flow recycler thread(s) is/are a part of mgmt threads */
1231  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1232  if (strncasecmp(tv->name, thread_name_flow_rec,
1233  strlen(thread_name_flow_rec)) == 0)
1234  {
1236  }
1237  }
1239 
1240  struct timeval start_ts;
1241  struct timeval cur_ts;
1242  gettimeofday(&start_ts, NULL);
1243 
1244 again:
1245  gettimeofday(&cur_ts, NULL);
1246  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1247  FatalError("unable to get all flow recycler "
1248  "threads to shutdown in time");
1249  }
1250 
1252  for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1253  if (strncasecmp(tv->name, thread_name_flow_rec,
1254  strlen(thread_name_flow_rec)) == 0)
1255  {
1259  /* sleep outside lock */
1260  SleepMsec(1);
1261  goto again;
1262  }
1263  }
1264  }
1266 
1267  /* reset count, so we can kill and respawn (unix socket) */
1268  SC_ATOMIC_SET(flowrec_cnt, 0);
1269 }
1270 
1272 {
1273  tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1274  tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1275  tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1276  tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1279  SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1280 
1281  SC_ATOMIC_INIT(flowmgr_cnt);
1282  SC_ATOMIC_INITPTR(flow_timeouts);
1283 }
1284 
1286 {
1287  tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1288  tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1289  tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1290  tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1293  SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1294 
1295  SC_ATOMIC_INIT(flowrec_cnt);
1296  SC_ATOMIC_INIT(flowrec_busy);
1297 }
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:74
FlowTimeoutCounters_::rows_empty
uint32_t rows_empty
Definition: flow-manager.c:109
FlowSparePoolUpdate
void FlowSparePoolUpdate(uint32_t size)
Definition: flow-spare-pool.c:217
FlowManagerThreadSpawn
void FlowManagerThreadSpawn(void)
spawn the flow manager thread
Definition: flow-manager.c:977
tm-threads.h
ConfGetInt
int ConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition: conf.c:399
FROM_TIMEVAL
#define FROM_TIMEVAL(timev)
initialize a 'struct timespec' from a 'struct timeval'.
Definition: util-time.h:116
OutputFlowLog
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition: output-flow.c:87
FlowRecyclerThreadData_::fec
FlowEndCounters fec
Definition: flow-manager.c:1016
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1650
app-layer-htp-range.h
FlowTimeoutCounters
Definition: flow-worker.c:61
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1090
FlowBucket_::evicted
Flow * evicted
Definition: flow-hash.h:48
OutputFlowLogThreadInit
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
Definition: output-flow.c:125
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:166
FLOW_IS_IPV6
#define FLOW_IS_IPV6(f)
Definition: flow.h:171
FlowManagerThreadData_::max
uint32_t max
Definition: flow-manager.c:642
FlowManagerTimeoutThread
Definition: flow-manager.c:253
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:69
FlowSpareGetPoolSize
uint32_t FlowSpareGetPoolSize(void)
Definition: flow-spare-pool.c:46
flow-util.h
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:314
FlowManagerThreadData_::cnt
FlowCounters cnt
Definition: flow-manager.c:644
FBLOCK_LOCK
#define FBLOCK_LOCK(fb)
Definition: flow-hash.h:73
SCTIME_MSECS
#define SCTIME_MSECS(t)
Definition: util-time.h:58
TMM_FLOWRECYCLER
@ TMM_FLOWRECYCLER
Definition: tm-threads-common.h:71
stream-tcp.h
GetFlowBypassInfoID
FlowStorageId GetFlowBypassInfoID(void)
Definition: flow-util.c:211
FlowBypassInfo_
Definition: flow.h:535
FlowCnf_::emergency_recovery
uint32_t emergency_recovery
Definition: flow.h:304
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
FlowCnf_::hash_size
uint32_t hash_size
Definition: flow.h:298
FlowManagerThreadData_::instance
uint32_t instance
Definition: flow-manager.c:640
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
IPPairTimeoutHash
uint32_t IPPairTimeoutHash(SCTime_t ts)
time out ippairs from the hash
Definition: ippair-timeout.c:124
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:99
StatsRegisterGlobalCounter
uint16_t StatsRegisterGlobalCounter(const char *name, uint64_t(*Func)(void))
Registers a counter, which represents a global value.
Definition: counters.c:1019
Flow_::proto
uint8_t proto
Definition: flow.h:382
SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt)
threads.h
FlowSendToLocalThread
void FlowSendToLocalThread(Flow *f)
Definition: flow-timeout.c:344
flow-private.h
FlowCounters_::flow_mgr_flows_notimeout
uint16_t flow_mgr_flows_notimeout
Definition: flow-manager.c:624
Flow_
Flow data structure.
Definition: flow.h:360
TYPE
#define TYPE
Flow_::protomap
uint8_t protomap
Definition: flow.h:454
SC_ATOMIC_EXTERN
SC_ATOMIC_EXTERN(unsigned int, flow_flags)
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:70
FlowProtoTimeout_
Definition: flow.h:524
StatsSetUI64
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:207
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
flow-hash.h
TmModuleFlowRecyclerRegister
void TmModuleFlowRecyclerRegister(void)
Definition: flow-manager.c:1285
FlowTimeoutCounters_::flows_removed
uint32_t flows_removed
Definition: flow-manager.c:115
FlowBypassInfo_::tosrcbytecnt
uint64_t tosrcbytecnt
Definition: flow.h:540
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
FlowGetMemuse
uint64_t FlowGetMemuse(void)
Definition: flow.c:125
MIN
#define MIN(x, y)
Definition: suricata-common.h:391
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:80
FlowTimeoutCounters_::flows_timeout
uint32_t flows_timeout
Definition: flow-manager.c:114
defrag-timeout.h
FlowRecyclerThreadData_::counter_flows
uint16_t counter_flows
Definition: flow-manager.c:1010
FlowManagerThreadData_::counter_defrag_memuse
uint16_t counter_defrag_memuse
Definition: flow-manager.c:648
FLOW_ACTION_DROP
#define FLOW_ACTION_DROP
Definition: flow.h:69
FLOW_PROTO_MAX
@ FLOW_PROTO_MAX
Definition: flow-private.h:74
LiveDevSubBypassStats
void LiveDevSubBypassStats(LiveDevice *dev, uint64_t cnt, int family)
Definition: util-device.c:515
MAX
#define MAX(x, y)
Definition: suricata-common.h:395
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:83
FlowQueuePrivate_::len
uint32_t len
Definition: flow-queue.h:44
Flow_::protoctx
void * protoctx
Definition: flow.h:450
FlowCounters_
Definition: flow-manager.c:615
FlowManagerTimeoutThread
struct FlowManagerTimeoutThread FlowManagerTimeoutThread
SCCtrlCondSignal
#define SCCtrlCondSignal
Definition: threads-debug.h:384
FlowCounters_::flow_mgr_flows_aside
uint16_t flow_mgr_flows_aside
Definition: flow-manager.c:626
FLOW_TIMEOUT_REASSEMBLY_DONE
#define FLOW_TIMEOUT_REASSEMBLY_DONE
Definition: flow.h:96
runmode-unix-socket.h
FlowTimeoutCounters_::bypassed_count
uint32_t bypassed_count
Definition: flow-manager.c:119
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:37
FlowSparePoolReturnFlows
void FlowSparePoolReturnFlows(FlowQueuePrivate *fqp)
Definition: flow-spare-pool.c:120
TM_THREAD_NAME_MAX
#define TM_THREAD_NAME_MAX
Definition: tm-threads.h:49
FlowCnf_::prealloc
uint32_t prealloc
Definition: flow.h:299
FLOWLOCK_UNLOCK
#define FLOWLOCK_UNLOCK(fb)
Definition: flow.h:277
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:82
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:244
Flow_::flow_state
FlowStateType flow_state
Definition: flow.h:421
FQLOCK_LOCK
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:73
FlowWakeupFlowManagerThread
void FlowWakeupFlowManagerThread(void)
Definition: flow-manager.c:82
FlowDisableFlowRecyclerThread
void FlowDisableFlowRecyclerThread(void)
Used to disable flow recycler thread(s).
Definition: flow-manager.c:1213
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:50
FlowTimeoutCounters_::rows_maxlen
uint32_t rows_maxlen
Definition: flow-manager.c:110
FlowCounters_::flow_bypassed_bytes
uint16_t flow_bypassed_bytes
Definition: flow-manager.c:633
FlowTimeoutCounters_::flows_checked
uint32_t flows_checked
Definition: flow-manager.c:112
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:45
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:107
flow-spare-pool.h
StatsDecr
void StatsDecr(ThreadVars *tv, uint16_t id)
Decrements the local counter.
Definition: counters.c:186
Flow_::fb
struct FlowBucket_ * fb
Definition: flow.h:497
FlowCounters_::flow_mgr_full_pass
uint16_t flow_mgr_full_pass
Definition: flow-manager.c:616
StatsRegisterMaxCounter
uint16_t StatsRegisterMaxCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the maximum of all the values assigned to it.
Definition: counters.c:1001
SC_ATOMIC_MEMORY_ORDER_RELAXED
#define SC_ATOMIC_MEMORY_ORDER_RELAXED
Definition: util-atomic.h:165
util-device.h
util-debug.h
FlowWakeupFlowRecyclerThread
void FlowWakeupFlowRecyclerThread(void)
Definition: flow-manager.c:89
FlowBypassInfo_::todstbytecnt
uint64_t todstbytecnt
Definition: flow.h:542
FlowBypassInfo_::BypassUpdate
bool(* BypassUpdate)(Flow *f, void *data, time_t tsec)
Definition: flow.h:536
FlowRecyclerThreadData_::counter_queue_avg
uint16_t counter_queue_avg
Definition: flow-manager.c:1011
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
FlowCounters_::flow_mgr_rows_maxlen
uint16_t flow_mgr_rows_maxlen
Definition: flow-manager.c:629
FLOWLOCK_WRLOCK
#define FLOWLOCK_WRLOCK(fb)
Definition: flow.h:274
FlowTimeoutsReset
#define FlowTimeoutsReset()
Definition: flow-manager.h:30
FlowDisableFlowManagerThread
void FlowDisableFlowManagerThread(void)
Used to disable flow manager thread(s).
Definition: flow-manager.c:130
SCCtrlCondT
#define SCCtrlCondT
Definition: threads-debug.h:382
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1749
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:66
TimeModeIsReady
bool TimeModeIsReady(void)
Definition: util-time.c:92
Flow_::flow_end_flags
uint8_t flow_end_flags
Definition: flow.h:456
THV_KILL
#define THV_KILL
Definition: threadvars.h:39
MemcapsGetPressure
float MemcapsGetPressure(void)
Definition: runmode-unix-socket.c:135
FlowBypassInfo_::todstpktcnt
uint64_t todstpktcnt
Definition: flow.h:541
util-time.h
FlowQueuePrivateGetFromTop
Flow * FlowQueuePrivateGetFromTop(FlowQueuePrivate *fqc)
Definition: flow-queue.c:151
FlowBypassInfo_::bypass_data
void * bypass_data
Definition: flow.h:538
FlowQueueAppendPrivate
void FlowQueueAppendPrivate(FlowQueue *fq, FlowQueuePrivate *fqc)
Definition: flow-queue.c:119
FlowCounters_::flow_mgr_flows_aside_needs_work
uint16_t flow_mgr_flows_aside_needs_work
Definition: flow-manager.c:627
DefragTrackerGetMemcap
uint64_t DefragTrackerGetMemcap(void)
Return memcap value.
Definition: defrag-hash.c:63
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:90
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
FlowRecyclerThreadData_
Definition: flow-manager.c:1007
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:300
FLOW_IS_IPV4
#define FLOW_IS_IPV4(f)
Definition: flow.h:169
FlowRecyclerThreadData_::counter_tcp_active_sessions
uint16_t counter_tcp_active_sessions
Definition: flow-manager.c:1015
FlowCounters_::flow_bypassed_cnt_clo
uint16_t flow_bypassed_cnt_clo
Definition: flow-manager.c:631
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:83
SC_ATOMIC_SUB
#define SC_ATOMIC_SUB(name, val)
sub a value from our atomic variable
Definition: util-atomic.h:341
flow_timeouts_emerg
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
Definition: flow.c:86
TimeModeIsLive
bool TimeModeIsLive(void)
Definition: util-time.c:111
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:376
FlowTimeoutsEmergency
void FlowTimeoutsEmergency(void)
Definition: flow-manager.c:101
FlowTimeoutCounters
struct FlowTimeoutCounters_ FlowTimeoutCounters
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:33
TimeGet
SCTime_t TimeGet(void)
Definition: util-time.c:152
conf.h
FlowRecyclerThreadData_::output_thread_data
void * output_thread_data
Definition: flow-manager.c:1008
FlowTimeoutCounters_::rows_checked
uint32_t rows_checked
Definition: flow-manager.c:107
FBLOCK_UNLOCK
#define FBLOCK_UNLOCK(fb)
Definition: flow-hash.h:75
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:81
FlowClearMemory
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition: flow.c:1093
flow_timeouts_delta
FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]
Definition: flow.c:87
FlowRecyclerThreadSpawn
void FlowRecyclerThreadSpawn(void)
spawn the flow recycler thread
Definition: flow-manager.c:1177
output-flow.h
flow-timeout.h
flow-queue.h
FlowManagerThreadData_::counter_defrag_timeout
uint16_t counter_defrag_timeout
Definition: flow-manager.c:647
TmModule_::name
const char * name
Definition: tm-modules.h:45
FlowRecyclerThreadData
struct FlowRecyclerThreadData_ FlowRecyclerThreadData
FlowBypassInfo_::tosrcpktcnt
uint64_t tosrcpktcnt
Definition: flow.h:539
host-timeout.h
SCCtrlCondTimedwait
#define SCCtrlCondTimedwait
Definition: threads-debug.h:385
StreamTcpThreadCacheCleanup
void StreamTcpThreadCacheCleanup(void)
Definition: stream-tcp-cache.c:134
runmodes.h
FlowTimeoutCounters_::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-manager.c:117
FlowGetStorageById
void * FlowGetStorageById(const Flow *f, FlowStorageId id)
Definition: flow-storage.c:40
Flow_::next
struct Flow_ * next
Definition: flow.h:405
FlowQueue_
Definition: flow-queue.h:49
FlowCounters_::flow_bypassed_pkts
uint16_t flow_bypassed_pkts
Definition: flow-manager.c:632
TMM_FLOWMANAGER
@ TMM_FLOWMANAGER
Definition: tm-threads-common.h:70
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:38
RECYCLE_MAX_QUEUE_ITEMS
#define RECYCLE_MAX_QUEUE_ITEMS
Definition: flow-manager.c:577
flow_hash
FlowBucket * flow_hash
Definition: flow-hash.c:58
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:378
FlowQueueExtractPrivate
FlowQueuePrivate FlowQueueExtractPrivate(FlowQueue *fq)
Definition: flow-queue.c:140
flow-storage.h
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
FlowTimeoutCounters_
Definition: flow-manager.c:106
FlowManagerThreadData_
Definition: flow-manager.c:639
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flow-manager.h
suricata-common.h
SCCtrlMutex
#define SCCtrlMutex
Definition: threads-debug.h:373
DefragTimeoutHash
uint32_t DefragTimeoutHash(SCTime_t ts)
time out tracker from the hash
Definition: defrag-timeout.c:122
FlowManagerThreadData
struct FlowManagerThreadData_ FlowManagerThreadData
flow_config
FlowConfig flow_config
Definition: flow.c:90
TmModuleFlowManagerRegister
void TmModuleFlowManagerRegister(void)
Definition: flow-manager.c:1271
FlowCounters_::memcap_pressure
uint16_t memcap_pressure
Definition: flow-manager.c:635
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
FlowTimeoutsInit
void FlowTimeoutsInit(void)
Definition: flow-manager.c:96
SCTIME_SECS
#define SCTIME_SECS(t)
Definition: util-time.h:57
SC_ATOMIC_LOAD_EXPLICIT
#define SC_ATOMIC_LOAD_EXPLICIT(name, order)
Definition: util-atomic.h:378
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:48
FatalError
#define FatalError(...)
Definition: util-debug.h:502
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
StatsSyncCounters
void StatsSyncCounters(ThreadVars *tv)
Definition: counters.c:449
Flow_::livedev
struct LiveDevice_ * livedev
Definition: flow.h:407
threadvars.h
FlowQueuePrivate_
Definition: flow-queue.h:41
StatsAddUI64
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:146
SCLogConfig
struct SCLogConfig_ SCLogConfig
Holds the config state used by the logging api.
FlowRecyclerThreadData_::counter_queue_max
uint16_t counter_queue_max
Definition: flow-manager.c:1012
FlowTimeoutCounters_::bypassed_bytes
uint64_t bypassed_bytes
Definition: flow-manager.c:121
FlowManagerThreadData_::min
uint32_t min
Definition: flow-manager.c:641
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
OutputFlowLogThreadDeinit
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
Definition: output-flow.c:165
flow_recycle_q
FlowQueue flow_recycle_q
Definition: flow-manager.c:63
SCFree
#define SCFree(p)
Definition: util-mem.h:61
FlowTimeoutCounters_::bypassed_pkts
uint64_t bypassed_pkts
Definition: flow-manager.c:120
FlowCounters_::memcap_pressure_max
uint16_t memcap_pressure_max
Definition: flow-manager.c:636
Flow_::flags
uint32_t flags
Definition: flow.h:430
HostTimeoutHash
uint32_t HostTimeoutHash(SCTime_t ts)
time out hosts from the hash
Definition: host-timeout.c:126
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:317
FlowManagerThreadData_::timeout
FlowManagerTimeoutThread timeout
Definition: flow-manager.c:646
FlowCounters
struct FlowCounters_ FlowCounters
FlowCounters_::flow_emerg_mode_enter
uint16_t flow_emerg_mode_enter
Definition: flow-manager.c:620
FLOW_END_FLAG_SHUTDOWN
#define FLOW_END_FLAG_SHUTDOWN
Definition: flow.h:249
FLOW_EMERGENCY
#define FLOW_EMERGENCY
Definition: flow-private.h:37
defrag-hash.h
ippair-timeout.h
FlowRecyclerThreadData_::counter_flow_active
uint16_t counter_flow_active
Definition: flow-manager.c:1014
FlowCounters_::flow_mgr_flows_timeout
uint16_t flow_mgr_flows_timeout
Definition: flow-manager.c:625
FlowManagerTimeoutThread::aside_queue
FlowQueuePrivate aside_queue
Definition: flow-manager.c:256
timeradd
#define timeradd(a, b, r)
Definition: util-time.h:128
FLOW_END_FLAG_TIMEOUT
#define FLOW_END_FLAG_TIMEOUT
Definition: flow.h:247
FlowTimeoutCounters_::flows_notimeout
uint32_t flows_notimeout
Definition: flow-manager.c:113
flow_timeouts_normal
FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX]
Definition: flow.c:85
FlowNeedsReassembly
bool FlowNeedsReassembly(Flow *f)
Check if a flow needs forced reassembly, or any other processing.
Definition: flow-timeout.c:288
StatsRegisterAvgCounter
uint16_t StatsRegisterAvgCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the average of all the values assigned to it.
Definition: counters.c:981
StatsSyncCountersIfSignalled
void StatsSyncCountersIfSignalled(ThreadVars *tv)
Definition: counters.c:454
FlowCounters_::flow_mgr_spare
uint16_t flow_mgr_spare
Definition: flow-manager.c:619
FlowTimeoutCounters_::flows_aside
uint32_t flows_aside
Definition: flow-manager.c:116
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:274
flow.h
FlowQueuePrivateAppendFlow
void FlowQueuePrivateAppendFlow(FlowQueuePrivate *fqc, Flow *f)
Definition: flow-queue.c:65
FlowEndCountersRegister
void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
Definition: flow-util.c:235
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:91
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
evicted
Flow * evicted
Definition: flow-hash.h:4
StatsRegisterCounter
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:961
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
BITS
#define BITS
Flow_::timeout_at
uint32_t timeout_at
Definition: flow.h:400
FlowEndCounters_
Definition: flow-util.h:148
FLOW_SPARE_POOL_BLOCK_SIZE
#define FLOW_SPARE_POOL_BLOCK_SIZE
Definition: flow-spare-pool.h:30
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:102
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:77
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:359
FlowCounters_::flow_emerg_mode_over
uint16_t flow_emerg_mode_over
Definition: flow-manager.c:621
TM_FLAG_MANAGEMENT_TM
#define TM_FLAG_MANAGEMENT_TM
Definition: tm-modules.h:37
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:170
detect-engine-threshold.h
FlowTimeoutCounters_::rows_skipped
uint32_t rows_skipped
Definition: flow-manager.c:108
FlowCounters_::flow_mgr_flows_checked
uint16_t flow_mgr_flows_checked
Definition: flow-manager.c:623
ThresholdsExpire
uint32_t ThresholdsExpire(const SCTime_t ts)
Definition: detect-engine-threshold.c:233
SCTIME_USECS
#define SCTIME_USECS(t)
Definition: util-time.h:56
FlowCounters_::flow_mgr_rows_sec
uint16_t flow_mgr_rows_sec
Definition: flow-manager.c:617
FlowTimeoutCounters::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-worker.c:62
FQLOCK_UNLOCK
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:75
HttpRangeContainersTimeoutHash
uint32_t HttpRangeContainersTimeoutHash(const SCTime_t ts)
Definition: app-layer-htp-range.c:187