81 static uint32_t flowmgr_number = 1;
86 static uint32_t flowrec_number = 1;
103 #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
104 #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
106 #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
107 #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
108 #define NEW_FLOW_COUNT_COND 10
153 struct timeval start_ts;
154 struct timeval cur_ts;
155 gettimeofday(&start_ts, NULL);
158 gettimeofday(&cur_ts, NULL);
159 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
161 "threads to shutdown in time");
193 static int FlowManagerFlowTimeout(
Flow *f,
struct timeval *
ts, int32_t *next_ts,
const bool emerg)
200 if (*next_ts == 0 || flow_times_out_at < *next_ts)
201 *next_ts = flow_times_out_at;
204 if (flow_times_out_at >=
ts->tv_sec) {
221 static inline int FlowBypassedTimeout(
Flow *f,
struct timeval *
ts,
224 #ifdef CAPTURE_OFFLOAD
225 if (f->
flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
238 SCLogDebug(
"Updated flow: %"PRId64
"", FlowGetId(f));
245 pkts_tosrc + pkts_todst);
247 counters->bypassed_pkts += pkts_tosrc + pkts_todst;
248 counters->bypassed_bytes += bytes_tosrc + bytes_todst;
251 SCLogDebug(
"No new packet, dead flow %"PRId64
"", FlowGetId(f));
259 counters->bypassed_count++;
296 if (recycle.
len == 100) {
318 Flow *f,
struct timeval *
ts,
321 uint32_t checked = 0;
333 if (FlowManagerFlowTimeout(f,
ts, next_ts, emergency) == 0) {
335 counters->flows_notimeout++;
348 if (f->
use_cnt > 0 || !FlowBypassedTimeout(f,
ts, counters)) {
352 counters->flows_timeout_inuse++;
360 counters->flows_timeout++;
362 RemoveFromHash(f, prev_f);
370 counters->flows_checked += checked;
371 if (checked > counters->rows_maxlen)
372 counters->rows_maxlen = checked;
405 const uint32_t hash_min,
const uint32_t hash_max,
410 const uint32_t rows_checked = hash_max - hash_min;
411 uint32_t rows_skipped = 0;
412 uint32_t rows_empty = 0;
416 #define TYPE uint64_t
419 #define TYPE uint32_t
422 for (uint32_t idx = hash_min; idx < hash_max; idx+=
BITS) {
424 const uint32_t check =
MIN(
BITS, (hash_max - idx));
425 for (uint32_t i = 0; i < check; i++) {
432 for (uint32_t i = 0; i < check; i++) {
437 if (fb->evicted != NULL || fb->head != NULL) {
438 if (fb->evicted != NULL) {
444 if (fb->head != NULL) {
446 FlowManagerHashRowTimeout(td, fb->head,
ts, emergency, counters, &next_ts);
451 if (fb->evicted == NULL && fb->head == NULL) {
461 FlowManagerHashRowClearEvictedList(td,
evicted,
ts, counters);
468 cnt += ProcessAsideQueue(td, counters);
472 counters->rows_checked += rows_checked;
473 counters->rows_skipped += rows_skipped;
474 counters->rows_empty += rows_empty;
477 cnt += ProcessAsideQueue(td, counters);
479 counters->flows_removed += cnt;
486 const uint32_t hash_min,
const uint32_t hash_max,
489 const uint32_t rows = hash_max - hash_min;
490 const uint32_t chunk_size = rows / chunks;
492 const uint32_t min = iter * chunk_size + hash_min;
493 uint32_t max = min + chunk_size;
496 if (iter + 1 == chunks) {
499 const uint32_t cnt = FlowTimeoutHash(td,
ts, min, max, counters);
523 RemoveFromHash(f, NULL);
525 FlowBucket *fb = f->
fb;
550 static uint32_t FlowCleanupHash(
void)
560 if (fb->head != NULL) {
562 cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
564 if (fb->evicted != NULL) {
566 cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
570 if (local_queue.
len >= 25) {
653 static TmEcode FlowManagerThreadInit(
ThreadVars *t,
const void *initdata,
void **data)
670 if ((ftd->
instance + 1) == flowmgr_number) {
680 FlowCountersInit(t, &ftd->
cnt);
693 static uint32_t FlowTimeoutsMin(
void)
698 m =
MIN(
m, t[i].new_timeout);
699 m =
MIN(
m, t[i].est_timeout);
702 m =
MIN(
m, t[i].closed_timeout);
705 m =
MIN(
m, t[i].bypassed_timeout);
723 uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
725 bool prev_emerg =
false;
726 uint32_t other_last_sec = 0;
727 uint32_t flow_last_sec = 0;
734 memset(&
ts, 0,
sizeof(
ts));
735 uint32_t hash_passes = 0;
737 uint32_t hash_row_checks = 0;
738 uint32_t hash_passes_chunks = 0;
740 uint32_t hash_full_passes = 0;
742 const uint32_t min_timeout = FlowTimeoutsMin();
743 const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60;
751 SCLogDebug(
"FM %s/%d starting. min_timeout %us. Full hash pass in %us", th_v->
name,
752 ftd->
instance, min_timeout, pass_in_sec);
755 struct timeval endts;
756 struct timeval active;
757 struct timeval paused;
758 struct timeval sleeping;
759 memset(&endts, 0,
sizeof(endts));
760 memset(&active, 0,
sizeof(active));
761 memset(&paused, 0,
sizeof(paused));
762 memset(&sleeping, 0,
sizeof(sleeping));
765 struct timeval startts;
766 memset(&startts, 0,
sizeof(startts));
767 gettimeofday(&startts, NULL);
769 uint32_t hash_pass_iter = 0;
770 uint32_t emerg_over_cnt = 0;
771 uint64_t next_run_ms = 0;
778 struct timeval pause_startts;
779 memset(&pause_startts, 0,
sizeof(pause_startts));
780 gettimeofday(&pause_startts, NULL);
784 struct timeval pause_endts;
785 memset(&pause_endts, 0,
sizeof(pause_endts));
786 gettimeofday(&pause_endts, NULL);
787 struct timeval pause_time;
788 memset(&pause_time, 0,
sizeof(pause_time));
789 timersub(&pause_endts, &pause_startts, &pause_time);
790 timeradd(&paused, &pause_time, &paused);
799 struct timeval run_startts;
800 memset(&run_startts, 0,
sizeof(run_startts));
801 gettimeofday(&run_startts, NULL);
804 memset(&
ts, 0,
sizeof(
ts));
807 const uint64_t ts_ms =
ts.tv_sec * 1000 +
ts.tv_usec / 1000;
808 const uint32_t rt = (uint32_t)
ts.tv_sec;
809 const bool emerge_p = (emerg && !prev_emerg);
816 if (ts_ms >= next_run_ms) {
821 if (spare_perc < 90 || spare_perc > 110) {
825 const uint32_t secs_passed = rt - flow_last_sec;
828 FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
837 hash_passes_chunks += 1;
838 hash_row_checks += counters.rows_checked;
843 const uint32_t chunks =
MIN(secs_passed, pass_in_sec);
844 for (uint32_t i = 0; i < chunks; i++) {
846 &counters, hash_pass_iter, pass_in_sec);
848 if (hash_pass_iter == pass_in_sec) {
856 hash_row_checks += counters.rows_checked;
857 hash_passes_chunks += chunks;
895 SCLogDebug(
"flow_sparse_q.len = %"PRIu32
" prealloc: %"PRIu32
896 "flow_spare_q status: %"PRIu32
"%% flows at the queue",
907 if (emerg_over_cnt >= 30) {
915 SCLogNotice(
"Flow emergency mode over, back to normal... unsetting"
916 " FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX
", "
917 "ts.tv_usec:%"PRIuMAX
") flow_spare_q status(): %"PRIu32
918 "%% flows at the queue", (uintmax_t)
ts.tv_sec,
924 next_run_ms = ts_ms + 667;
926 next_run_ms = ts_ms + 250;
928 if (flow_last_sec == 0) {
933 (other_last_sec == 0 || other_last_sec < (uint32_t)
ts.tv_sec)) {
939 other_last_sec = (uint32_t)
ts.tv_sec;
944 struct timeval run_endts;
945 memset(&run_endts, 0,
sizeof(run_endts));
946 gettimeofday(&run_endts, NULL);
947 struct timeval run_time;
948 memset(&run_time, 0,
sizeof(run_time));
949 timersub(&run_endts, &run_startts, &run_time);
950 timeradd(&active, &run_time, &active);
959 struct timeval sleep_startts;
960 memset(&sleep_startts, 0,
sizeof(sleep_startts));
961 gettimeofday(&sleep_startts, NULL);
966 struct timeval sleep_endts;
967 memset(&sleep_endts, 0,
sizeof(sleep_endts));
968 gettimeofday(&sleep_endts, NULL);
970 struct timeval sleep_time;
971 memset(&sleep_time, 0,
sizeof(sleep_time));
972 timersub(&sleep_endts, &sleep_startts, &sleep_time);
973 timeradd(&sleeping, &sleep_time, &sleeping);
979 SCLogPerf(
"%" PRIu32
" new flows, %" PRIu32
" established flows were "
980 "timed out, %"PRIu32
" flows in closed state", new_cnt,
981 established_cnt, closing_cnt);
984 SCLogNotice(
"hash passes %u avg chunks %u full %u rows %u (rows/s %u)",
985 hash_passes, hash_passes_chunks / (hash_passes ? hash_passes : 1),
986 hash_full_passes, hash_row_checks,
987 hash_row_checks / ((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
989 gettimeofday(&endts, NULL);
990 struct timeval total_run_time;
991 timersub(&endts, &startts, &total_run_time);
993 SCLogNotice(
"FM: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
994 (uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
995 (uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
996 (uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
997 (uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
1005 intmax_t setting = 1;
1008 if (setting < 1 || setting > 1024) {
1010 "invalid flow.managers setting %"PRIdMAX, setting);
1012 flowmgr_number = (uint32_t)setting;
1014 SCLogConfig(
"using %u flow manager threads", flowmgr_number);
1017 for (uint32_t u = 0; u < flowmgr_number; u++) {
1023 BUG_ON(tv_flowmgr == NULL);
1025 if (tv_flowmgr == NULL) {
1039 static TmEcode FlowRecyclerThreadInit(
ThreadVars *t,
const void *initdata,
void **data)
1072 uint64_t recycled_cnt = 0;
1076 memset(&
ts, 0,
sizeof(
ts));
1077 uint32_t fr_passes = 0;
1080 struct timeval endts;
1081 struct timeval active;
1082 struct timeval paused;
1083 struct timeval sleeping;
1084 memset(&endts, 0,
sizeof(endts));
1085 memset(&active, 0,
sizeof(active));
1086 memset(&paused, 0,
sizeof(paused));
1087 memset(&sleeping, 0,
sizeof(sleeping));
1089 struct timeval startts;
1090 memset(&startts, 0,
sizeof(startts));
1091 gettimeofday(&startts, NULL);
1098 struct timeval pause_startts;
1099 memset(&pause_startts, 0,
sizeof(pause_startts));
1100 gettimeofday(&pause_startts, NULL);
1105 struct timeval pause_endts;
1106 memset(&pause_endts, 0,
sizeof(pause_endts));
1107 gettimeofday(&pause_endts, NULL);
1109 struct timeval pause_time;
1110 memset(&pause_time, 0,
sizeof(pause_time));
1111 timersub(&pause_endts, &pause_startts, &pause_time);
1112 timeradd(&paused, &pause_time, &paused);
1118 struct timeval run_startts;
1119 memset(&run_startts, 0,
sizeof(run_startts));
1120 gettimeofday(&run_startts, NULL);
1128 memset(&
ts, 0,
sizeof(
ts));
1140 struct timeval run_endts;
1141 memset(&run_endts, 0,
sizeof(run_endts));
1142 gettimeofday(&run_endts, NULL);
1144 struct timeval run_time;
1145 memset(&run_time, 0,
sizeof(run_time));
1146 timersub(&run_endts, &run_startts, &run_time);
1147 timeradd(&active, &run_time, &active);
1155 struct timeval sleep_startts;
1156 memset(&sleep_startts, 0,
sizeof(sleep_startts));
1157 gettimeofday(&sleep_startts, NULL);
1161 struct timeval sleep_endts;
1162 memset(&sleep_endts, 0,
sizeof(sleep_endts));
1163 gettimeofday(&sleep_endts, NULL);
1164 struct timeval sleep_time;
1165 memset(&sleep_time, 0,
sizeof(sleep_time));
1166 timersub(&sleep_endts, &sleep_startts, &sleep_time);
1167 timeradd(&sleeping, &sleep_time, &sleeping);
1176 gettimeofday(&endts, NULL);
1177 struct timeval total_run_time;
1178 timersub(&endts, &startts, &total_run_time);
1179 SCLogNotice(
"FR: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
1180 (uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
1181 (uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
1182 (uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
1183 (uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
1185 SCLogNotice(
"FR passes %u passes/s %u", fr_passes,
1186 (uint32_t)fr_passes/((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
1188 SCLogPerf(
"%"PRIu64
" flows processed", recycled_cnt);
1192 static bool FlowRecyclerReadyToShutdown(
void)
1202 return ((
len == 0));
1208 intmax_t setting = 1;
1209 (void)
ConfGetInt(
"flow.recyclers", &setting);
1211 if (setting < 1 || setting > 1024) {
1213 "invalid flow.recyclers setting %"PRIdMAX, setting);
1215 flowrec_number = (uint32_t)setting;
1217 SCLogConfig(
"using %u flow recycler threads", flowrec_number);
1219 for (uint32_t u = 0; u < flowrec_number; u++) {
1226 if (tv_flowrec == NULL) {
1250 (void)FlowCleanupHash();
1252 uint32_t flows = FlowCleanupHash();
1259 }
while (FlowRecyclerReadyToShutdown() ==
false);
1273 struct timeval start_ts;
1274 struct timeval cur_ts;
1275 gettimeofday(&start_ts, NULL);
1278 gettimeofday(&cur_ts, NULL);
1279 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1281 "threads to shutdown in time");