Go to the documentation of this file.
76 static uint32_t flowmgr_number = 1;
81 static uint32_t flowrec_number = 1;
103 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
104 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
106 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
107 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
108 #define NEW_FLOW_COUNT_COND 10
147 struct timeval start_ts;
148 struct timeval cur_ts;
149 gettimeofday(&start_ts, NULL);
152 gettimeofday(&cur_ts, NULL);
153 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
155 "threads to shutdown in time");
187 static int FlowManagerFlowTimeout(
Flow *f,
SCTime_t ts, uint32_t *next_ts,
const bool emerg)
194 if (*next_ts == 0 || flow_times_out_at < *next_ts)
195 *next_ts = flow_times_out_at;
217 #ifdef CAPTURE_OFFLOAD
218 if (f->
flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
231 SCLogDebug(
"Updated flow: %"PRId64
"", FlowGetId(f));
238 pkts_tosrc + pkts_todst);
240 counters->bypassed_pkts += pkts_tosrc + pkts_todst;
241 counters->bypassed_bytes += bytes_tosrc + bytes_todst;
244 SCLogDebug(
"No new packet, dead flow %"PRId64
"", FlowGetId(f));
252 counters->bypassed_count++;
276 if (f->
proto == IPPROTO_TCP &&
290 if (recycle.
len == 100) {
316 uint32_t checked = 0;
328 if (FlowManagerFlowTimeout(f,
ts, next_ts, emergency) == 0) {
330 counters->flows_notimeout++;
343 if (!FlowBypassedTimeout(f,
ts, counters)) {
352 counters->flows_timeout++;
354 RemoveFromHash(f, prev_f);
362 counters->flows_checked += checked;
363 if (checked > counters->rows_maxlen)
364 counters->rows_maxlen = checked;
367 static void FlowManagerHashRowClearEvictedList(
398 const uint32_t rows_checked = hash_max - hash_min;
399 uint32_t rows_skipped = 0;
400 uint32_t rows_empty = 0;
404 #define TYPE uint64_t
407 #define TYPE uint32_t
411 for (uint32_t idx = hash_min; idx < hash_max; idx+=
BITS) {
413 const uint32_t check =
MIN(
BITS, (hash_max - idx));
414 for (uint32_t i = 0; i < check; i++) {
423 for (uint32_t i = 0; i < check; i++) {
428 if (fb->evicted != NULL || fb->head != NULL) {
429 if (fb->evicted != NULL) {
435 if (fb->head != NULL) {
436 uint32_t next_ts = 0;
437 FlowManagerHashRowTimeout(td, fb->head,
ts, emergency, counters, &next_ts);
442 if (fb->evicted == NULL && fb->head == NULL) {
452 FlowManagerHashRowClearEvictedList(td,
evicted,
ts, counters);
459 cnt += ProcessAsideQueue(td, counters);
463 counters->rows_checked += rows_checked;
464 counters->rows_skipped += rows_skipped;
465 counters->rows_empty += rows_empty;
468 cnt += ProcessAsideQueue(td, counters);
470 counters->flows_removed += cnt;
480 const uint32_t rows, uint32_t *pos)
485 uint32_t rows_left = rows;
488 start = hash_min + (*pos);
489 if (start >= hash_max) {
492 end = start + rows_left;
493 if (end > hash_max) {
496 *pos = (end == hash_max) ? hash_min : end;
497 rows_left = rows_left - (end - start);
499 cnt += FlowTimeoutHash(td,
ts, start, end, counters);
526 RemoveFromHash(f, NULL);
528 FlowBucket *fb = f->
fb;
553 static uint32_t FlowCleanupHash(
void)
563 if (fb->head != NULL) {
565 cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
567 if (fb->evicted != NULL) {
569 cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
573 if (local_queue.
len >= 25) {
647 static void FlowCountersUpdate(
665 static TmEcode FlowManagerThreadInit(
ThreadVars *t,
const void *initdata,
void **data)
682 if ((ftd->
instance + 1) == flowmgr_number) {
692 FlowCountersInit(t, &ftd->
cnt);
720 static void GetWorkUnitSizing(
const uint32_t rows,
const uint32_t mp,
const bool emergency,
721 uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
729 const uint32_t emp =
MAX(mp, 10);
730 const uint32_t rows_per_sec = (uint32_t)((
float)rows * (float)((
float)emp / (float)100));
733 const uint32_t work_per_unit =
MIN(rows_per_sec / 1000, 1000);
736 const uint32_t sleep_per_unit =
MAX(250, 1000 - work_per_unit);
737 SCLogDebug(
"mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
740 *wu_sleep = sleep_per_unit;
741 *wu_rows = rows_per_sec;
742 *rows_sec = rows_per_sec;
754 const uint32_t rows = ftd->
max - ftd->
min;
757 uint32_t emerg_over_cnt = 0;
758 uint64_t next_run_ms = 0;
760 uint32_t rows_sec = 0;
761 uint32_t rows_per_wu = 0;
762 uint64_t sleep_per_wu = 0;
763 bool prev_emerg =
false;
764 uint32_t other_last_sec = 0;
779 GetWorkUnitSizing(rows, mp,
false, &sleep_per_wu, &rows_per_wu, &rows_sec);
798 const bool emerge_p = (emerg && !prev_emerg);
805 if (ts_ms >= next_run_ms) {
810 if (spare_perc < 90 || spare_perc > 110) {
817 FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, };
825 SCLogDebug(
"hash %u:%u slice starting at %u with %u rows", ftd->
min, ftd->
max, pos,
828 const uint32_t ppos = pos;
829 FlowTimeoutHashInChunks(
839 FlowCountersUpdate(th_v, ftd, &counters);
842 SCLogDebug(
"flow_sparse_q.len = %" PRIu32
" prealloc: %" PRIu32
843 "flow_spare_q status: %" PRIu32
"%% flows at the queue",
856 if (emerg_over_cnt >= 30) {
863 SCLogNotice(
"Flow emergency mode over, back to normal... unsetting"
864 " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX
", "
865 "ts.tv_usec:%" PRIuMAX
") flow_spare_q status(): %" PRIu32
866 "%% flows at the queue",
875 const uint32_t pmp = mp;
881 GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
886 next_run_ms = ts_ms + sleep_per_wu;
888 if (other_last_sec == 0 || other_last_sec < (uint32_t)
SCTIME_SECS(
ts)) {
903 if (emerg || !time_is_live) {
906 struct timeval cond_tv;
907 gettimeofday(&cond_tv, NULL);
908 struct timeval add_tv;
910 add_tv.tv_usec = (sleep_per_wu * 1000);
911 timeradd(&cond_tv, &add_tv, &cond_tv);
918 if (rc == ETIMEDOUT || rc < 0)
937 intmax_t setting = 1;
940 if (setting < 1 || setting > 1024) {
941 FatalError(
"invalid flow.managers setting %" PRIdMAX, setting);
943 flowmgr_number = (uint32_t)setting;
948 SCLogConfig(
"using %u flow manager threads", flowmgr_number);
951 for (uint32_t u = 0; u < flowmgr_number; u++) {
957 BUG_ON(tv_flowmgr == NULL);
959 if (tv_flowmgr == NULL) {
960 FatalError(
"flow manager thread creation failed");
963 FatalError(
"flow manager thread spawn failed");
981 static TmEcode FlowRecyclerThreadInit(
ThreadVars *t,
const void *initdata,
void **data)
987 SCLogError(
"initializing flow log API for thread failed");
1024 FlowEndCountersUpdate(
tv, &ftd->
fec, f);
1045 uint64_t recycled_cnt = 0;
1071 Recycler(th_v, ftd, f);
1080 if (ret_queue.
len > 0) {
1084 recycled_cnt += cnt;
1094 if (emerg || !time_is_live) {
1097 struct timeval cond_tv;
1098 gettimeofday(&cond_tv, NULL);
1099 cond_tv.tv_sec += 1;
1105 if (rc == ETIMEDOUT || rc < 0) {
1123 SCLogPerf(
"%"PRIu64
" flows processed", recycled_cnt);
1127 static bool FlowRecyclerReadyToShutdown(
void)
1137 return ((
len == 0));
1143 intmax_t setting = 1;
1144 (void)
ConfGetInt(
"flow.recyclers", &setting);
1146 if (setting < 1 || setting > 1024) {
1147 FatalError(
"invalid flow.recyclers setting %" PRIdMAX, setting);
1149 flowrec_number = (uint32_t)setting;
1154 SCLogConfig(
"using %u flow recycler threads", flowrec_number);
1156 for (uint32_t u = 0; u < flowrec_number; u++) {
1163 if (tv_flowrec == NULL) {
1164 FatalError(
"flow recycler thread creation failed");
1167 FatalError(
"flow recycler thread spawn failed");
1185 (void)FlowCleanupHash();
1187 uint32_t flows = FlowCleanupHash();
1195 }
while (FlowRecyclerReadyToShutdown() ==
false);
1208 struct timeval start_ts;
1209 struct timeval cur_ts;
1210 gettimeofday(&start_ts, NULL);
1213 gettimeofday(&cur_ts, NULL);
1214 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1215 FatalError(
"unable to get all flow recycler "
1216 "threads to shutdown in time");
void FlowSparePoolUpdate(uint32_t size)
void FlowManagerThreadSpawn(void)
spawn the flow manager thread
int ConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
#define FROM_TIMEVAL(timev)
initialize a 'struct timespec' from a 'struct timeval'.
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
SCCtrlMutex flow_manager_ctrl_mutex
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
void FlowForceReassemblyForFlow(Flow *f)
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
const char * thread_name_flow_mgr
uint32_t FlowSpareGetPoolSize(void)
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
#define FlowWakeupFlowRecyclerThread()
FlowStorageId GetFlowBypassInfoID(void)
uint32_t emergency_recovery
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
uint32_t IPPairTimeoutHash(SCTime_t ts)
time out ippairs from the hash
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
uint16_t StatsRegisterGlobalCounter(const char *name, uint64_t(*Func)(void))
Registers a counter, which represents a global value.
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt)
SCCtrlCondT flow_manager_ctrl_cond
uint16_t flow_mgr_flows_notimeout
SCCtrlCondT flow_recycler_ctrl_cond
SC_ATOMIC_EXTERN(unsigned int, flow_flags)
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
const char * thread_name_flow_rec
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
void TmModuleFlowRecyclerRegister(void)
uint32_t flow_spare_pool_block_size
uint64_t FlowGetMemuse(void)
ThreadVars * tv_root[TVT_MAX]
#define StatsSyncCountersIfSignalled(tv)
void LiveDevSubBypassStats(LiveDevice *dev, uint64_t cnt, int family)
struct FlowManagerTimeoutThread FlowManagerTimeoutThread
uint16_t flow_mgr_flows_aside
#define FLOW_TIMEOUT_REASSEMBLY_DONE
void FlowSparePoolReturnFlows(FlowQueuePrivate *fqp)
#define TM_THREAD_NAME_MAX
#define FLOWLOCK_UNLOCK(fb)
void PacketPoolInit(void)
void FlowDisableFlowRecyclerThread(void)
Used to disable flow recycler thread(s).
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
uint16_t flow_bypassed_bytes
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
void StatsDecr(ThreadVars *tv, uint16_t id)
Decrements the local counter.
uint16_t flow_mgr_full_pass
uint16_t StatsRegisterMaxCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the maximum of all the values assigned to it.
#define SC_ATOMIC_MEMORY_ORDER_RELAXED
bool(* BypassUpdate)(Flow *f, void *data, time_t tsec)
uint16_t counter_queue_avg
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void *initdata, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
#define SCMutexUnlock(mut)
uint16_t flow_mgr_rows_maxlen
#define FLOWLOCK_WRLOCK(fb)
#define FlowTimeoutsReset()
int FlowForceReassemblyNeedReassembly(Flow *f)
Check if a flow needs forced reassembly, or any other processing.
void FlowDisableFlowManagerThread(void)
Used to disable flow manager thread(s).
Per thread variable structure.
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
TmEcode(* Management)(ThreadVars *, void *)
bool TimeModeIsReady(void)
float MemcapsGetPressure(void)
Flow * FlowQueuePrivateGetFromTop(FlowQueuePrivate *fqc)
void FlowQueueAppendPrivate(FlowQueue *fq, FlowQueuePrivate *fqc)
struct FlowQueueTimeoutCounters FlowQueueTimeoutCounters
uint16_t flow_mgr_flows_aside_needs_work
struct ThreadVars_ * next
uint16_t counter_tcp_active_sessions
uint16_t flow_bypassed_cnt_clo
#define SC_ATOMIC_SUB(name, val)
sub a value from our atomic variable
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
bool TimeModeIsLive(void)
#define SCCtrlMutexLock(mut)
void FlowTimeoutsEmergency(void)
struct FlowTimeoutCounters_ FlowTimeoutCounters
TmModule tmm_modules[TMM_SIZE]
void * output_thread_data
#define FBLOCK_UNLOCK(fb)
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]
void FlowRecyclerThreadSpawn(void)
spawn the flow recycler thread
Data structures and function prototypes for keeping state for the detection engine.
struct FlowRecyclerThreadData_ FlowRecyclerThreadData
#define SCCtrlCondTimedwait
void StreamTcpThreadCacheCleanup(void)
uint32_t flows_aside_needs_work
void * FlowGetStorageById(const Flow *f, FlowStorageId id)
uint16_t flow_bypassed_pkts
#define SCCtrlMutexUnlock(mut)
FlowQueuePrivate FlowQueueExtractPrivate(FlowQueue *fq)
uint32_t DefragTimeoutHash(SCTime_t ts)
time out tracker from the hash
struct FlowManagerThreadData_ FlowManagerThreadData
void TmModuleFlowManagerRegister(void)
void FlowTimeoutsInit(void)
#define SC_ATOMIC_LOAD_EXPLICIT(name, order)
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
struct LiveDevice_ * livedev
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
struct SCLogConfig_ SCLogConfig
Holds the config state used by the logging api.
uint16_t counter_queue_max
#define SCLogError(...)
Macro used to log ERROR messages.
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
uint16_t memcap_pressure_max
uint32_t HostTimeoutHash(SCTime_t ts)
time out hosts from the hash
#define SC_ATOMIC_INITPTR(name)
FlowManagerTimeoutThread timeout
struct FlowCounters_ FlowCounters
uint16_t flow_emerg_mode_enter
#define SCCtrlMutexInit(mut, mutattr)
#define FLOW_END_FLAG_SHUTDOWN
uint16_t counter_flow_active
uint16_t flow_mgr_flows_timeout
FlowQueuePrivate aside_queue
#define timeradd(a, b, r)
#define FLOW_END_FLAG_TIMEOUT
#define StatsSyncCounters(tv)
FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX]
uint16_t StatsRegisterAvgCounter(const char *name, struct ThreadVars_ *tv)
Registers a counter, whose value holds the average of all the values assigned to it.
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
void PacketPoolDestroy(void)
void FlowQueuePrivateAppendFlow(FlowQueuePrivate *fqc, Flow *f)
void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
#define SCLogNotice(...)
Macro used to log NOTICE messages.
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
SCCtrlMutex flow_recycler_ctrl_mutex
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
uint16_t flow_emerg_mode_over
#define TM_FLAG_MANAGEMENT_TM
volatile uint8_t suricata_ctl_flags
uint16_t flow_mgr_flows_checked
uint16_t flow_mgr_rows_sec
uint32_t flows_aside_needs_work
uint32_t HttpRangeContainersTimeoutHash(const SCTime_t ts)