80 static uint32_t flowmgr_number = 1;
85 static uint32_t flowrec_number = 1;
102 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
103 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
105 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
106 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
107 #define NEW_FLOW_COUNT_COND 10
153 struct timeval start_ts;
154 struct timeval cur_ts;
155 gettimeofday(&start_ts, NULL);
158 gettimeofday(&cur_ts, NULL);
159 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
161 "threads to shutdown in time");
193 static int FlowManagerFlowTimeout(
Flow *f,
struct timeval *
ts, int32_t *next_ts,
const bool emerg)
200 if (*next_ts == 0 || flow_times_out_at < *next_ts)
201 *next_ts = flow_times_out_at;
204 if (flow_times_out_at >=
ts->tv_sec) {
211 static inline int FlowBypassedTimeout(
Flow *f,
struct timeval *
ts,
214 #ifdef CAPTURE_OFFLOAD
215 if (f->
flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
228 SCLogDebug(
"Updated flow: %"PRId64
"", FlowGetId(f));
235 pkts_tosrc + pkts_todst);
237 counters->bypassed_pkts += pkts_tosrc + pkts_todst;
238 counters->bypassed_bytes += bytes_tosrc + bytes_todst;
241 SCLogDebug(
"No new packet, dead flow %"PRId64
"", FlowGetId(f));
249 counters->bypassed_count++;
268 static inline int FlowManagerFlowTimedOut(
Flow *f,
struct timeval *
ts,
277 if (!FlowBypassedTimeout(f,
ts, counters)) {
281 int server = 0, client = 0;
284 #ifdef CAPTURE_OFFLOAD
285 f->
flow_state != FLOW_STATE_CAPTURE_BYPASSED &&
301 static inline int FMTryLockBucket(FlowBucket *fb)
306 static inline void FMFlowLock(
Flow *f)
327 if (f->
proto == IPPROTO_TCP &&
329 #ifdef CAPTURE_OFFLOAD
330 f->
flow_state != FLOW_STATE_CAPTURE_BYPASSED &&
345 if (recycle.
len == 100) {
367 Flow *f,
struct timeval *
ts,
370 uint32_t checked = 0;
382 if (FlowManagerFlowTimeout(f,
ts, next_ts, emergency) == 0) {
384 counters->flows_notimeout++;
395 counters->flows_timeout++;
399 if (f->
use_cnt > 0 || !FlowBypassedTimeout(f,
ts, counters)) {
402 counters->flows_timeout_inuse++;
407 RemoveFromHash(f, prev_f);
415 counters->flows_checked += checked;
416 if (checked > counters->rows_maxlen)
417 counters->rows_maxlen = checked;
450 const uint32_t hash_min,
const uint32_t hash_max,
455 const uint32_t rows_checked = hash_max - hash_min;
456 uint32_t rows_skipped = 0;
457 uint32_t rows_busy = 0;
458 uint32_t rows_empty = 0;
462 #define TYPE uint64_t
465 #define TYPE uint32_t
468 for (uint32_t idx = hash_min; idx < hash_max; idx+=
BITS) {
470 const uint32_t check =
MIN(
BITS, (hash_max - idx));
471 for (uint32_t i = 0; i < check; i++) {
478 for (uint32_t i = 0; i < check; i++) {
481 if (FMTryLockBucket(fb) == 0) {
483 if (fb->evicted != NULL || fb->head != NULL) {
489 if (fb->evicted != NULL) {
494 }
else if (fb->head != NULL) {
496 FlowManagerHashRowTimeout(td,
497 fb->head,
ts, emergency, counters, &next_ts);
502 if (fb->evicted == NULL && fb->head == NULL) {
504 }
else if (fb->evicted != NULL && fb->head == NULL) {
514 FlowManagerHashRowClearEvictedList(td,
evicted,
ts, counters);
524 cnt += ProcessAsideQueue(td, counters);
528 counters->rows_checked += rows_checked;
529 counters->rows_skipped += rows_skipped;
530 counters->rows_busy += rows_busy;
531 counters->rows_empty += rows_empty;
534 cnt += ProcessAsideQueue(td, counters);
536 counters->flows_removed += cnt;
543 const uint32_t hash_min,
const uint32_t hash_max,
546 const uint32_t rows = hash_max - hash_min;
547 const uint32_t chunk_size = rows / chunks;
549 const uint32_t min = iter * chunk_size + hash_min;
550 uint32_t max = min + chunk_size;
551 if (iter + 1 == chunks) {
554 const uint32_t cnt = FlowTimeoutHash(td,
ts, min, max, counters);
578 RemoveFromHash(f, NULL);
580 FlowBucket *fb = f->
fb;
605 static uint32_t FlowCleanupHash(
void)
615 if (fb->head != NULL) {
617 cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
619 if (fb->evicted != NULL) {
621 cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
625 if (local_queue.
len >= 25) {
710 static TmEcode FlowManagerThreadInit(
ThreadVars *t,
const void *initdata,
void **data)
724 else if ((ftd->
instance + 1) == flowmgr_number) {
738 FlowCountersInit(t, &ftd->
cnt);
751 static uint32_t FlowTimeoutsMin(
void)
756 m =
MIN(
m, t[i].new_timeout);
757 m =
MIN(
m, t[i].est_timeout);
760 m =
MIN(
m, t[i].closed_timeout);
763 m =
MIN(
m, t[i].bypassed_timeout);
781 uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
783 bool prev_emerg =
false;
784 uint32_t other_last_sec = 0;
785 uint32_t flow_last_sec = 0;
792 memset(&
ts, 0,
sizeof(
ts));
793 uint32_t hash_passes = 0;
794 uint32_t hash_row_checks = 0;
795 uint32_t hash_passes_chunks = 0;
796 uint32_t hash_full_passes = 0;
798 const uint32_t min_timeout = FlowTimeoutsMin();
799 const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60;
807 SCLogDebug(
"FM %s/%d starting. min_timeout %us. Full hash pass in %us", th_v->
name,
808 ftd->
instance, min_timeout, pass_in_sec);
811 struct timeval endts;
812 struct timeval active;
813 struct timeval paused;
814 struct timeval sleeping;
815 memset(&endts, 0,
sizeof(endts));
816 memset(&active, 0,
sizeof(active));
817 memset(&paused, 0,
sizeof(paused));
818 memset(&sleeping, 0,
sizeof(sleeping));
821 struct timeval startts;
822 memset(&startts, 0,
sizeof(startts));
823 gettimeofday(&startts, NULL);
825 uint32_t hash_pass_iter = 0;
826 uint32_t emerg_over_cnt = 0;
827 uint64_t next_run_ms = 0;
834 struct timeval pause_startts;
835 memset(&pause_startts, 0,
sizeof(pause_startts));
836 gettimeofday(&pause_startts, NULL);
840 struct timeval pause_endts;
841 memset(&pause_endts, 0,
sizeof(pause_endts));
842 gettimeofday(&pause_endts, NULL);
843 struct timeval pause_time;
844 memset(&pause_time, 0,
sizeof(pause_time));
845 timersub(&pause_endts, &pause_startts, &pause_time);
846 timeradd(&paused, &pause_time, &paused);
855 struct timeval run_startts;
856 memset(&run_startts, 0,
sizeof(run_startts));
857 gettimeofday(&run_startts, NULL);
860 memset(&
ts, 0,
sizeof(
ts));
863 const uint64_t ts_ms =
ts.tv_sec * 1000 +
ts.tv_usec / 1000;
864 const uint32_t rt = (uint32_t)
ts.tv_sec;
865 const bool emerge_p = (emerg && !prev_emerg);
872 if (ts_ms >= next_run_ms) {
877 if (spare_perc < 90 || spare_perc > 110) {
881 const uint32_t secs_passed = rt - flow_last_sec;
884 FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0};
891 hash_passes_chunks += 1;
893 hash_row_checks += counters.rows_checked;
897 const uint32_t chunks =
MIN(secs_passed, pass_in_sec);
898 for (uint32_t i = 0; i < chunks; i++) {
900 &counters, hash_pass_iter, pass_in_sec);
902 if (hash_pass_iter == pass_in_sec) {
909 hash_row_checks += counters.rows_checked;
910 hash_passes_chunks += chunks;
947 SCLogDebug(
"flow_sparse_q.len = %"PRIu32
" prealloc: %"PRIu32
948 "flow_spare_q status: %"PRIu32
"%% flows at the queue",
959 if (emerg_over_cnt >= 30) {
967 SCLogNotice(
"Flow emergency mode over, back to normal... unsetting"
968 " FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX
", "
969 "ts.tv_usec:%"PRIuMAX
") flow_spare_q status(): %"PRIu32
970 "%% flows at the queue", (uintmax_t)
ts.tv_sec,
976 next_run_ms = ts_ms + 667;
978 next_run_ms = ts_ms + 250;
980 if (flow_last_sec == 0) {
985 (other_last_sec == 0 || other_last_sec < (uint32_t)
ts.tv_sec)) {
990 other_last_sec = (uint32_t)
ts.tv_sec;
995 struct timeval run_endts;
996 memset(&run_endts, 0,
sizeof(run_endts));
997 gettimeofday(&run_endts, NULL);
998 struct timeval run_time;
999 memset(&run_time, 0,
sizeof(run_time));
1000 timersub(&run_endts, &run_startts, &run_time);
1001 timeradd(&active, &run_time, &active);
1010 struct timeval sleep_startts;
1011 memset(&sleep_startts, 0,
sizeof(sleep_startts));
1012 gettimeofday(&sleep_startts, NULL);
1017 struct timeval sleep_endts;
1018 memset(&sleep_endts, 0,
sizeof(sleep_endts));
1019 gettimeofday(&sleep_endts, NULL);
1021 struct timeval sleep_time;
1022 memset(&sleep_time, 0,
sizeof(sleep_time));
1023 timersub(&sleep_endts, &sleep_startts, &sleep_time);
1024 timeradd(&sleeping, &sleep_time, &sleeping);
1030 SCLogPerf(
"%" PRIu32
" new flows, %" PRIu32
" established flows were "
1031 "timed out, %"PRIu32
" flows in closed state", new_cnt,
1032 established_cnt, closing_cnt);
1035 SCLogNotice(
"hash passes %u avg chunks %u full %u rows %u (rows/s %u)",
1036 hash_passes, hash_passes_chunks / (hash_passes ? hash_passes : 1),
1037 hash_full_passes, hash_row_checks,
1038 hash_row_checks / ((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
1040 gettimeofday(&endts, NULL);
1041 struct timeval total_run_time;
1042 timersub(&endts, &startts, &total_run_time);
1044 SCLogNotice(
"FM: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
1045 (uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
1046 (uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
1047 (uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
1048 (uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
1056 intmax_t setting = 1;
1059 if (setting < 1 || setting > 1024) {
1061 "invalid flow.managers setting %"PRIdMAX, setting);
1063 flowmgr_number = (uint32_t)setting;
1065 SCLogConfig(
"using %u flow manager threads", flowmgr_number);
1068 for (uint32_t u = 0; u < flowmgr_number; u++) {
1074 BUG_ON(tv_flowmgr == NULL);
1076 if (tv_flowmgr == NULL) {
1090 static TmEcode FlowRecyclerThreadInit(
ThreadVars *t,
const void *initdata,
void **data)
1123 uint64_t recycled_cnt = 0;
1127 memset(&
ts, 0,
sizeof(
ts));
1128 uint32_t fr_passes = 0;
1131 struct timeval endts;
1132 struct timeval active;
1133 struct timeval paused;
1134 struct timeval sleeping;
1135 memset(&endts, 0,
sizeof(endts));
1136 memset(&active, 0,
sizeof(active));
1137 memset(&paused, 0,
sizeof(paused));
1138 memset(&sleeping, 0,
sizeof(sleeping));
1140 struct timeval startts;
1141 memset(&startts, 0,
sizeof(startts));
1142 gettimeofday(&startts, NULL);
1149 struct timeval pause_startts;
1150 memset(&pause_startts, 0,
sizeof(pause_startts));
1151 gettimeofday(&pause_startts, NULL);
1156 struct timeval pause_endts;
1157 memset(&pause_endts, 0,
sizeof(pause_endts));
1158 gettimeofday(&pause_endts, NULL);
1160 struct timeval pause_time;
1161 memset(&pause_time, 0,
sizeof(pause_time));
1162 timersub(&pause_endts, &pause_startts, &pause_time);
1163 timeradd(&paused, &pause_time, &paused);
1169 struct timeval run_startts;
1170 memset(&run_startts, 0,
sizeof(run_startts));
1171 gettimeofday(&run_startts, NULL);
1179 memset(&
ts, 0,
sizeof(
ts));
1191 struct timeval run_endts;
1192 memset(&run_endts, 0,
sizeof(run_endts));
1193 gettimeofday(&run_endts, NULL);
1195 struct timeval run_time;
1196 memset(&run_time, 0,
sizeof(run_time));
1197 timersub(&run_endts, &run_startts, &run_time);
1198 timeradd(&active, &run_time, &active);
1206 struct timeval sleep_startts;
1207 memset(&sleep_startts, 0,
sizeof(sleep_startts));
1208 gettimeofday(&sleep_startts, NULL);
1212 struct timeval sleep_endts;
1213 memset(&sleep_endts, 0,
sizeof(sleep_endts));
1214 gettimeofday(&sleep_endts, NULL);
1215 struct timeval sleep_time;
1216 memset(&sleep_time, 0,
sizeof(sleep_time));
1217 timersub(&sleep_endts, &sleep_startts, &sleep_time);
1218 timeradd(&sleeping, &sleep_time, &sleeping);
1227 gettimeofday(&endts, NULL);
1228 struct timeval total_run_time;
1229 timersub(&endts, &startts, &total_run_time);
1230 SCLogNotice(
"FR: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
1231 (uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
1232 (uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
1233 (uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
1234 (uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
1236 SCLogNotice(
"FR passes %u passes/s %u", fr_passes,
1237 (uint32_t)fr_passes/((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
1239 SCLogPerf(
"%"PRIu64
" flows processed", recycled_cnt);
1243 static bool FlowRecyclerReadyToShutdown(
void)
1253 return ((
len == 0));
1259 intmax_t setting = 1;
1260 (void)
ConfGetInt(
"flow.recyclers", &setting);
1262 if (setting < 1 || setting > 1024) {
1264 "invalid flow.recyclers setting %"PRIdMAX, setting);
1266 flowrec_number = (uint32_t)setting;
1268 SCLogConfig(
"using %u flow recycler threads", flowrec_number);
1270 for (uint32_t u = 0; u < flowrec_number; u++) {
1277 if (tv_flowrec == NULL) {
1301 (void)FlowCleanupHash();
1303 uint32_t flows = FlowCleanupHash();
1310 }
while (FlowRecyclerReadyToShutdown() ==
false);
1324 struct timeval start_ts;
1325 struct timeval cur_ts;
1326 gettimeofday(&start_ts, NULL);
1329 gettimeofday(&cur_ts, NULL);
1330 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1332 "threads to shutdown in time");