Go to the documentation of this file.
69 static uint32_t flowmgr_number = 1;
74 static uint32_t flowrec_number = 1;
80 static SCCtrlCondT flow_manager_ctrl_cond = PTHREAD_COND_INITIALIZER;
81 static SCCtrlMutex flow_manager_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
82 static SCCtrlCondT flow_recycler_ctrl_cond = PTHREAD_COND_INITIALIZER;
83 static SCCtrlMutex flow_recycler_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
146 struct timeval start_ts;
147 struct timeval cur_ts;
148 gettimeofday(&start_ts, NULL);
151 gettimeofday(&cur_ts, NULL);
152 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
154 "threads to shutdown in time");
191 static bool FlowManagerFlowTimeout(
Flow *f,
SCTime_t ts, uint32_t *next_ts,
const bool emerg)
203 if (*next_ts == 0 || (uint32_t)
SCTIME_SECS(timesout_at) < *next_ts)
224 #ifdef CAPTURE_OFFLOAD
237 if (f->
flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
250 SCLogDebug(
"Updated flow: %" PRIu64
"", FlowGetId(f));
257 pkts_tosrc + pkts_todst);
259 counters->bypassed_pkts += pkts_tosrc + pkts_todst;
260 counters->bypassed_bytes += bytes_tosrc + bytes_todst;
263 SCLogDebug(
"No new packet, dead flow %" PRIu64
"", FlowGetId(f));
271 counters->bypassed_count++;
306 if (f->
proto == IPPROTO_TCP &&
320 if (recycle.
len == 100) {
346 uint32_t checked = 0;
360 if (!FlowManagerFlowTimeout(f,
ts, next_ts, emergency)) {
362 counters->flows_notimeout++;
371 #ifdef CAPTURE_OFFLOAD
374 if (!FlowBypassedTimeout(f,
ts, counters)) {
383 counters->flows_timeout++;
385 RemoveFromHash(f, prev_f);
393 counters->flows_checked += checked;
394 if (checked > counters->rows_maxlen)
395 counters->rows_maxlen = checked;
438 const uint32_t rows_checked = hash_max - hash_min;
439 uint32_t rows_skipped = 0;
440 uint32_t rows_empty = 0;
444 #define TYPE uint64_t
447 #define TYPE uint32_t
451 for (uint32_t idx = hash_min; idx < hash_max; idx+=
BITS) {
453 const uint32_t check =
MIN(
BITS, (hash_max - idx));
454 for (uint32_t i = 0; i < check; i++) {
463 for (uint32_t i = 0; i < check; i++) {
468 if (fb->evicted != NULL || fb->head != NULL) {
469 if (fb->evicted != NULL) {
475 if (fb->head != NULL) {
476 uint32_t next_ts = 0;
477 FlowManagerHashRowTimeout(td, fb->head,
ts, emergency, counters, &next_ts);
482 if (fb->evicted == NULL && fb->head == NULL) {
493 FlowManagerHashRowClearEvictedList(td,
evicted);
500 cnt += ProcessAsideQueue(td, counters);
504 counters->rows_checked += rows_checked;
505 counters->rows_skipped += rows_skipped;
506 counters->rows_empty += rows_empty;
509 cnt += ProcessAsideQueue(td, counters);
511 counters->flows_removed +=
cnt;
533 const uint32_t rows, uint32_t *pos,
const uint32_t instance)
538 uint32_t rows_left = rows;
542 if (start >= hash_max) {
545 end = start + rows_left;
546 if (end > hash_max) {
549 *pos = (end == hash_max) ? hash_min : end;
550 rows_left = rows_left - (end - start);
552 SCLogDebug(
"instance %u: %u:%u (hash_min %u, hash_max %u *pos %u)", instance, start, end,
553 hash_min, hash_max, *pos);
555 cnt += FlowTimeoutHash(td,
ts, start, end, counters);
584 RemoveFromHash(f, NULL);
586 FlowBucket *fb = f->
fb;
606 #define RECYCLE_MAX_QUEUE_ITEMS 25
612 static uint32_t FlowCleanupHash(
void)
622 if (fb->head != NULL) {
624 cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
626 if (fb->evicted != NULL) {
628 cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
705 static void FlowCountersUpdate(
729 static TmEcode FlowManagerThreadInit(
ThreadVars *t,
const void *initdata,
void **data)
746 if ((ftd->
instance + 1) == flowmgr_number) {
756 FlowCountersInit(t, &ftd->
cnt);
793 static void GetWorkUnitSizing(
const uint32_t rows,
const uint32_t mp,
const bool emergency,
794 uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
802 const uint32_t emp =
MAX(mp, 10);
803 const uint32_t rows_per_sec = (uint32_t)((
float)rows * (float)((
float)emp / (float)100));
806 const uint32_t work_per_unit =
MIN(rows_per_sec / 1000, 1000);
809 const uint32_t sleep_per_unit =
MAX(250, 1000 - work_per_unit);
810 SCLogDebug(
"mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
813 *wu_sleep = sleep_per_unit;
814 *wu_rows = rows_per_sec;
815 *rows_sec = rows_per_sec;
827 const uint32_t rows = ftd->
max - ftd->
min;
830 uint32_t emerg_over_cnt = 0;
831 uint64_t next_run_ms = 0;
832 uint32_t pos = ftd->
min;
833 uint32_t rows_sec = 0;
834 uint32_t rows_per_wu = 0;
835 uint64_t sleep_per_wu = 0;
836 bool prev_emerg =
false;
837 uint32_t other_last_sec = 0;
844 GetWorkUnitSizing(rows, mp,
false, &sleep_per_wu, &rows_per_wu, &rows_sec);
865 const bool emerge_p = (emerg && !prev_emerg);
872 if (ts_ms >= next_run_ms) {
877 if (spare_perc < 90 || spare_perc > 110) {
884 FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, };
892 SCLogDebug(
"hash %u:%u slice starting at %u with %u rows", ftd->
min, ftd->
max, pos,
895 const uint32_t ppos = pos;
896 FlowTimeoutHashInChunks(&ftd->
timeout,
ts, ftd->
min, ftd->
max, &counters,
906 FlowCountersUpdate(th_v, ftd, &counters);
909 SCLogDebug(
"flow_sparse_q.len = %" PRIu32
" prealloc: %" PRIu32
910 "flow_spare_q status: %" PRIu32
"%% flows at the queue",
923 if (emerg_over_cnt >= 30) {
930 SCLogNotice(
"Flow emergency mode over, back to normal... unsetting"
931 " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX
", "
932 "ts.tv_usec:%" PRIuMAX
") flow_spare_q status(): %" PRIu32
933 "%% flows at the queue",
942 const uint32_t pmp = mp;
948 GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
953 next_run_ms = ts_ms + sleep_per_wu;
955 if (other_last_sec == 0 || other_last_sec < (uint32_t)
SCTIME_SECS(
ts)) {
977 if (emerg || !time_is_live) {
980 struct timeval cond_tv;
981 gettimeofday(&cond_tv, NULL);
982 struct timeval add_tv;
983 add_tv.tv_sec = sleep_per_wu / 1000;
984 add_tv.tv_usec = (sleep_per_wu % 1000) * 1000;
985 timeradd(&cond_tv, &add_tv, &cond_tv);
994 &flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time);
995 if (rc == ETIMEDOUT || rc < 0) {
1012 intmax_t setting = 1;
1015 if (setting < 1 || setting > 1024) {
1016 FatalError(
"invalid flow.managers setting %" PRIdMAX, setting);
1018 flowmgr_number = (uint32_t)setting;
1020 SCLogConfig(
"using %u flow manager threads", flowmgr_number);
1023 for (uint32_t u = 0; u < flowmgr_number; u++) {
1029 BUG_ON(tv_flowmgr == NULL);
1031 if (tv_flowmgr == NULL) {
1032 FatalError(
"flow manager thread creation failed");
1035 FatalError(
"flow manager thread spawn failed");
1052 static TmEcode FlowRecyclerThreadInit(
ThreadVars *t,
const void *initdata,
void **data)
1058 SCLogError(
"initializing flow log API for thread failed");
1095 FlowEndCountersUpdate(
tv, &ftd->
fec, f);
1114 uint64_t recycled_cnt = 0;
1135 Recycler(th_v, ftd, f);
1144 if (ret_queue.
len > 0) {
1148 recycled_cnt +=
cnt;
1158 if (emerg || !time_is_live) {
1161 struct timeval cond_tv;
1162 gettimeofday(&cond_tv, NULL);
1163 cond_tv.tv_sec += 1;
1174 &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time);
1175 if (rc == ETIMEDOUT || rc < 0) {
1187 SCLogPerf(
"%"PRIu64
" flows processed", recycled_cnt);
1191 static bool FlowRecyclerReadyToShutdown(
void)
1201 return ((
len == 0));
1207 intmax_t setting = 1;
1210 if (setting < 1 || setting > 1024) {
1211 FatalError(
"invalid flow.recyclers setting %" PRIdMAX, setting);
1213 flowrec_number = (uint32_t)setting;
1215 SCLogConfig(
"using %u flow recycler threads", flowrec_number);
1217 for (uint32_t u = 0; u < flowrec_number; u++) {
1224 if (tv_flowrec == NULL) {
1225 FatalError(
"flow recycler thread creation failed");
1228 FatalError(
"flow recycler thread spawn failed");
1245 (void)FlowCleanupHash();
1247 uint32_t flows = FlowCleanupHash();
1255 }
while (!FlowRecyclerReadyToShutdown());
1268 struct timeval start_ts;
1269 struct timeval cur_ts;
1270 gettimeofday(&start_ts, NULL);
1273 gettimeofday(&cur_ts, NULL);
1274 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1275 FatalError(
"unable to get all flow recycler "
1276 "threads to shutdown in time");
void FlowSparePoolUpdate(uint32_t size)
void FlowManagerThreadSpawn(void)
spawn the flow manager thread
#define FROM_TIMEVAL(timev)
initialize a 'struct timespec' from a 'struct timeval'.
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
StatsCounterGlobalId StatsRegisterGlobalCounter(const char *name, uint64_t(*Func)(void))
Registers a counter, which represents a global value.
StatsCounterMaxId flow_mgr_rows_maxlen
void StatsCounterMaxUpdateI64(StatsThreadContext *stats, StatsCounterMaxId id, int64_t x)
update the value of the localmax counter
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
const char * thread_name_flow_mgr
void StatsSyncCountersIfSignalled(StatsThreadContext *stats)
uint32_t FlowSpareGetPoolSize(void)
StatsCounterId flow_mgr_flows_timeout
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
StatsCounterId memcap_pressure
FlowStorageId GetFlowBypassInfoID(void)
uint32_t emergency_recovery
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
uint32_t IPPairTimeoutHash(SCTime_t ts)
time out ippairs from the hash
StatsCounterId StatsRegisterCounter(const char *name, StatsThreadContext *stats)
Registers a normal, unqualified counter.
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt)
void FlowSendToLocalThread(Flow *f)
SC_ATOMIC_EXTERN(unsigned int, flow_flags)
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
const char * thread_name_flow_rec
StatsCounterAvgId counter_queue_avg
void TmModuleFlowRecyclerRegister(void)
StatsCounterId flow_mgr_full_pass
uint64_t FlowGetMemuse(void)
ThreadVars * tv_root[TVT_MAX]
StatsCounterId flow_bypassed_pkts
void LiveDevSubBypassStats(LiveDevice *dev, uint64_t cnt, int family)
StatsCounterMaxId memcap_pressure_max
StatsCounterId flow_mgr_rows_sec
struct FlowManagerTimeoutThread FlowManagerTimeoutThread
StatsCounterAvgId StatsRegisterAvgCounter(const char *name, StatsThreadContext *stats)
Registers a counter, whose value holds the average of all the values assigned to it.
void StatsCounterDecr(StatsThreadContext *stats, StatsCounterId id)
Decrements the local counter.
#define FLOW_TIMEOUT_REASSEMBLY_DONE
SCTime_t TmThreadsGetThreadTime(const int idx)
void FlowSparePoolReturnFlows(FlowQueuePrivate *fqp)
#define TM_THREAD_NAME_MAX
#define FLOWLOCK_UNLOCK(fb)
StatsCounterId flow_bypassed_bytes
void PacketPoolInit(void)
void FlowWakeupFlowManagerThread(void)
void FlowDisableFlowRecyclerThread(void)
Used to disable flow recycler thread(s).
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
void StatsCounterAvgAddI64(StatsThreadContext *stats, StatsCounterAvgId id, int64_t x)
#define SC_ATOMIC_MEMORY_ORDER_RELAXED
void FlowWakeupFlowRecyclerThread(void)
void SCFlowRunFinishCallbacks(ThreadVars *tv, Flow *f)
bool(* BypassUpdate)(Flow *f, void *data, time_t tsec)
#define SCMutexUnlock(mut)
int SCConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
#define FLOWLOCK_WRLOCK(fb)
#define FlowTimeoutsReset()
void FlowDisableFlowManagerThread(void)
Used to disable flow manager thread(s).
Per thread variable structure.
TmEcode(* Management)(ThreadVars *, void *)
bool TimeModeIsReady(void)
float MemcapsGetPressure(void)
void StatsCounterIncr(StatsThreadContext *stats, StatsCounterId id)
Increments the local counter.
Flow * FlowQueuePrivateGetFromTop(FlowQueuePrivate *fqc)
void FlowQueueAppendPrivate(FlowQueue *fq, FlowQueuePrivate *fqc)
uint64_t DefragTrackerGetMemcap(void)
Return memcap value.
struct ThreadVars_ * next
#define SC_ATOMIC_SUB(name, val)
sub a value from our atomic variable
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
bool TimeModeIsLive(void)
#define SCCtrlMutexLock(mut)
void FlowTimeoutsEmergency(void)
struct FlowTimeoutCounters_ FlowTimeoutCounters
TmModule tmm_modules[TMM_SIZE]
void * output_thread_data
#define FBLOCK_UNLOCK(fb)
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
void FlowRecyclerThreadSpawn(void)
spawn the flow recycler thread
struct FlowRecyclerThreadData_ FlowRecyclerThreadData
#define SCCtrlCondTimedwait
void StreamTcpThreadCacheCleanup(void)
uint32_t flows_aside_needs_work
void * FlowGetStorageById(const Flow *f, FlowStorageId id)
StatsCounterId flow_mgr_flows_aside_needs_work
StatsCounterId flow_bypassed_cnt_clo
#define SCTIME_CMP_LT(a, b)
StatsCounterId flow_mgr_flows_checked
#define RECYCLE_MAX_QUEUE_ITEMS
#define SCCtrlMutexUnlock(mut)
StatsCounterId flow_mgr_spare
FlowQueuePrivate FlowQueueExtractPrivate(FlowQueue *fq)
StatsCounterId flow_emerg_mode_over
StatsCounterId flow_mgr_flows_notimeout
StatsCounterId counter_defrag_memuse
uint32_t DefragTimeoutHash(SCTime_t ts)
time out tracker from the hash
struct FlowManagerThreadData_ FlowManagerThreadData
bool TmThreadsWaitForUnpause(ThreadVars *tv)
Wait for a thread to become unpaused.
void TmModuleFlowManagerRegister(void)
void FlowTimeoutsInit(void)
void StatsSyncCounters(StatsThreadContext *stats)
#define SC_ATOMIC_LOAD_EXPLICIT(name, order)
StatsCounterId flow_emerg_mode_enter
StatsCounterMaxId counter_queue_max
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
StatsCounterId counter_flows
struct LiveDevice_ * livedev
struct SCLogConfig_ SCLogConfig
Holds the config state used by the logging api.
void StatsCounterSetI64(StatsThreadContext *stats, StatsCounterId id, int64_t x)
set, so overwrite, the value of the local counter
StatsCounterId counter_flow_active
#define SCLogError(...)
Macro used to log ERROR messages.
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
uint32_t HostTimeoutHash(SCTime_t ts)
time out hosts from the hash
StatsCounterId counter_defrag_timeout
StatsCounterMaxId StatsRegisterMaxCounter(const char *name, StatsThreadContext *stats)
Registers a counter, whose value holds the maximum of all the values assigned to it.
#define SC_ATOMIC_INITPTR(name)
FlowManagerTimeoutThread timeout
struct FlowCounters_ FlowCounters
StatsCounterId counter_tcp_active_sessions
#define FLOW_END_FLAG_SHUTDOWN
FlowQueuePrivate aside_queue
#define timeradd(a, b, r)
#define FLOW_END_FLAG_TIMEOUT
FlowProtoTimeout flow_timeouts_normal[FLOW_PROTO_MAX]
bool FlowNeedsReassembly(Flow *f)
Check if a flow needs forced reassembly, or any other processing.
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
void PacketPoolDestroy(void)
void FlowQueuePrivateAppendFlow(FlowQueuePrivate *fqc, Flow *f)
void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
#define SCLogNotice(...)
Macro used to log NOTICE messages.
#define SCTIME_ADD_SECS(ts, s)
StatsCounterId flow_mgr_flows_aside
#define FLOW_SPARE_POOL_BLOCK_SIZE
void StatsCounterAddI64(StatsThreadContext *stats, StatsCounterId id, int64_t x)
Adds a value of type uint64_t to the local counter.
#define DEBUG_VALIDATE_BUG_ON(exp)
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
#define TM_FLAG_MANAGEMENT_TM
volatile uint8_t suricata_ctl_flags
FlowThreadId thread_id[2]
uint32_t ThresholdsExpire(const SCTime_t ts)
uint32_t flows_aside_needs_work
uint32_t HttpRangeContainersTimeoutHash(const SCTime_t ts)