68 static uint32_t flowmgr_number = 1;
73 static uint32_t flowrec_number = 1;
79 static SCCtrlCondT flow_manager_ctrl_cond = PTHREAD_COND_INITIALIZER;
80 static SCCtrlMutex flow_manager_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
81 static SCCtrlCondT flow_recycler_ctrl_cond = PTHREAD_COND_INITIALIZER;
82 static SCCtrlMutex flow_recycler_ctrl_mutex = PTHREAD_MUTEX_INITIALIZER;
145 struct timeval start_ts;
146 struct timeval cur_ts;
147 gettimeofday(&start_ts, NULL);
150 gettimeofday(&cur_ts, NULL);
151 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
153 "threads to shutdown in time");
184 static bool FlowManagerFlowTimeout(
Flow *f,
SCTime_t ts, uint32_t *next_ts,
const bool emerg)
191 if (*next_ts == 0 || flow_times_out_at < *next_ts)
192 *next_ts = flow_times_out_at;
202 #ifdef CAPTURE_OFFLOAD
215 if (f->
flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
228 SCLogDebug(
"Updated flow: %"PRId64
"", FlowGetId(f));
235 pkts_tosrc + pkts_todst);
237 counters->bypassed_pkts += pkts_tosrc + pkts_todst;
238 counters->bypassed_bytes += bytes_tosrc + bytes_todst;
241 SCLogDebug(
"No new packet, dead flow %" PRId64
"", FlowGetId(f));
249 counters->bypassed_count++;
284 if (f->
proto == IPPROTO_TCP &&
298 if (recycle.
len == 100) {
324 uint32_t checked = 0;
336 if (FlowManagerFlowTimeout(f,
ts, next_ts, emergency) ==
false) {
337 counters->flows_notimeout++;
348 #ifdef CAPTURE_OFFLOAD
351 if (!FlowBypassedTimeout(f,
ts, counters)) {
360 counters->flows_timeout++;
362 RemoveFromHash(f, prev_f);
370 counters->flows_checked += checked;
371 if (checked > counters->rows_maxlen)
372 counters->rows_maxlen = checked;
415 const uint32_t rows_checked = hash_max - hash_min;
416 uint32_t rows_skipped = 0;
417 uint32_t rows_empty = 0;
421 #define TYPE uint64_t
424 #define TYPE uint32_t
428 for (uint32_t idx = hash_min; idx < hash_max; idx+=
BITS) {
430 const uint32_t check =
MIN(
BITS, (hash_max - idx));
431 for (uint32_t i = 0; i < check; i++) {
440 for (uint32_t i = 0; i < check; i++) {
445 if (fb->evicted != NULL || fb->head != NULL) {
446 if (fb->evicted != NULL) {
452 if (fb->head != NULL) {
453 uint32_t next_ts = 0;
454 FlowManagerHashRowTimeout(td, fb->head,
ts, emergency, counters, &next_ts);
459 if (fb->evicted == NULL && fb->head == NULL) {
470 FlowManagerHashRowClearEvictedList(td,
evicted);
477 cnt += ProcessAsideQueue(td, counters);
481 counters->rows_checked += rows_checked;
482 counters->rows_skipped += rows_skipped;
483 counters->rows_empty += rows_empty;
486 cnt += ProcessAsideQueue(td, counters);
488 counters->flows_removed +=
cnt;
510 const uint32_t rows, uint32_t *pos,
const uint32_t instance)
515 uint32_t rows_left = rows;
519 if (start >= hash_max) {
522 end = start + rows_left;
523 if (end > hash_max) {
526 *pos = (end == hash_max) ? hash_min : end;
527 rows_left = rows_left - (end - start);
529 SCLogDebug(
"instance %u: %u:%u (hash_min %u, hash_max %u *pos %u)", instance, start, end,
530 hash_min, hash_max, *pos);
532 cnt += FlowTimeoutHash(td,
ts, start, end, counters);
561 RemoveFromHash(f, NULL);
563 FlowBucket *fb = f->
fb;
583 #define RECYCLE_MAX_QUEUE_ITEMS 25
589 static uint32_t FlowCleanupHash(
void)
599 if (fb->head != NULL) {
601 cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
603 if (fb->evicted != NULL) {
605 cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
681 static void FlowCountersUpdate(
699 static TmEcode FlowManagerThreadInit(
ThreadVars *t,
const void *initdata,
void **data)
716 if ((ftd->
instance + 1) == flowmgr_number) {
726 FlowCountersInit(t, &ftd->
cnt);
763 static void GetWorkUnitSizing(
const uint32_t rows,
const uint32_t mp,
const bool emergency,
764 uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
772 const uint32_t emp =
MAX(mp, 10);
773 const uint32_t rows_per_sec = (uint32_t)((
float)rows * (float)((
float)emp / (float)100));
776 const uint32_t work_per_unit =
MIN(rows_per_sec / 1000, 1000);
779 const uint32_t sleep_per_unit =
MAX(250, 1000 - work_per_unit);
780 SCLogDebug(
"mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
783 *wu_sleep = sleep_per_unit;
784 *wu_rows = rows_per_sec;
785 *rows_sec = rows_per_sec;
797 const uint32_t rows = ftd->
max - ftd->
min;
800 uint32_t emerg_over_cnt = 0;
801 uint64_t next_run_ms = 0;
802 uint32_t pos = ftd->
min;
803 uint32_t rows_sec = 0;
804 uint32_t rows_per_wu = 0;
805 uint64_t sleep_per_wu = 0;
806 bool prev_emerg =
false;
807 uint32_t other_last_sec = 0;
822 GetWorkUnitSizing(rows, mp,
false, &sleep_per_wu, &rows_per_wu, &rows_sec);
835 const bool emerge_p = (emerg && !prev_emerg);
842 if (ts_ms >= next_run_ms) {
847 if (spare_perc < 90 || spare_perc > 110) {
854 FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, };
862 SCLogDebug(
"hash %u:%u slice starting at %u with %u rows", ftd->
min, ftd->
max, pos,
865 const uint32_t ppos = pos;
866 FlowTimeoutHashInChunks(&ftd->
timeout,
ts, ftd->
min, ftd->
max, &counters,
876 FlowCountersUpdate(th_v, ftd, &counters);
879 SCLogDebug(
"flow_sparse_q.len = %" PRIu32
" prealloc: %" PRIu32
880 "flow_spare_q status: %" PRIu32
"%% flows at the queue",
893 if (emerg_over_cnt >= 30) {
900 SCLogNotice(
"Flow emergency mode over, back to normal... unsetting"
901 " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX
", "
902 "ts.tv_usec:%" PRIuMAX
") flow_spare_q status(): %" PRIu32
903 "%% flows at the queue",
912 const uint32_t pmp = mp;
918 GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
923 next_run_ms = ts_ms + sleep_per_wu;
925 if (other_last_sec == 0 || other_last_sec < (uint32_t)
SCTIME_SECS(
ts)) {
945 if (emerg || !time_is_live) {
948 struct timeval cond_tv;
949 gettimeofday(&cond_tv, NULL);
950 struct timeval add_tv;
951 add_tv.tv_sec = sleep_per_wu / 1000;
952 add_tv.tv_usec = (sleep_per_wu % 1000) * 1000;
953 timeradd(&cond_tv, &add_tv, &cond_tv);
959 &flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time);
960 if (rc == ETIMEDOUT || rc < 0)
979 intmax_t setting = 1;
982 if (setting < 1 || setting > 1024) {
983 FatalError(
"invalid flow.managers setting %" PRIdMAX, setting);
985 flowmgr_number = (uint32_t)setting;
987 SCLogConfig(
"using %u flow manager threads", flowmgr_number);
990 for (uint32_t u = 0; u < flowmgr_number; u++) {
996 BUG_ON(tv_flowmgr == NULL);
998 if (tv_flowmgr == NULL) {
999 FatalError(
"flow manager thread creation failed");
1002 FatalError(
"flow manager thread spawn failed");
1019 static TmEcode FlowRecyclerThreadInit(
ThreadVars *t,
const void *initdata,
void **data)
1025 SCLogError(
"initializing flow log API for thread failed");
1062 FlowEndCountersUpdate(
tv, &ftd->
fec, f);
1081 uint64_t recycled_cnt = 0;
1102 Recycler(th_v, ftd, f);
1111 if (ret_queue.
len > 0) {
1115 recycled_cnt +=
cnt;
1125 if (emerg || !time_is_live) {
1128 struct timeval cond_tv;
1129 gettimeofday(&cond_tv, NULL);
1130 cond_tv.tv_sec += 1;
1135 &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time);
1136 if (rc == ETIMEDOUT || rc < 0) {
1154 SCLogPerf(
"%"PRIu64
" flows processed", recycled_cnt);
1158 static bool FlowRecyclerReadyToShutdown(
void)
1168 return ((
len == 0));
1174 intmax_t setting = 1;
1175 (void)
ConfGetInt(
"flow.recyclers", &setting);
1177 if (setting < 1 || setting > 1024) {
1178 FatalError(
"invalid flow.recyclers setting %" PRIdMAX, setting);
1180 flowrec_number = (uint32_t)setting;
1182 SCLogConfig(
"using %u flow recycler threads", flowrec_number);
1184 for (uint32_t u = 0; u < flowrec_number; u++) {
1191 if (tv_flowrec == NULL) {
1192 FatalError(
"flow recycler thread creation failed");
1195 FatalError(
"flow recycler thread spawn failed");
1212 (void)FlowCleanupHash();
1214 uint32_t flows = FlowCleanupHash();
1222 }
while (FlowRecyclerReadyToShutdown() ==
false);
1235 struct timeval start_ts;
1236 struct timeval cur_ts;
1237 gettimeofday(&start_ts, NULL);
1240 gettimeofday(&cur_ts, NULL);
1241 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1242 FatalError(
"unable to get all flow recycler "
1243 "threads to shutdown in time");