Go to the documentation of this file.
46 #ifdef PROFILE_LOCKING
66 #include <sys/param.h>
67 #include <sys/resource.h>
68 #include <sys/cpuset.h>
70 #define cpu_set_t cpuset_t
74 static int SetCPUAffinity(uint16_t cpu);
123 TmThreadsSlotProcessPktFail(
tv, s, NULL);
134 if (s->slot_next != NULL) {
137 TmThreadsSlotProcessPktFail(
tv, s, extra_p);
177 if ((r = TmThreadsSlotProcessPkt(
tv, fw_slot, p) !=
TM_ECODE_OK)) {
223 static void *TmThreadsSlotPktAcqLoop(
void *td)
247 " PktAcqLoop=%p, tmqh_in=%p,"
251 pthread_exit((
void *) -1);
255 for (slot = s; slot != NULL; slot = slot->
slot_next) {
257 void *slot_data = NULL;
280 pthread_exit((
void *) -1);
295 pthread_exit((
void *) -1);
330 TmThreadTimeoutLoop(
tv, s);
337 for (slot = s; slot != NULL; slot = slot->
slot_next) {
354 pthread_exit((
void *) 0);
359 pthread_exit((
void *) -1);
363 static void *TmThreadsSlotVar(
void *td)
387 pthread_exit((
void *) -1);
393 void *slot_data = NULL;
413 pthread_exit((
void *) -1);
428 pthread_exit((
void *) -1);
463 TmThreadsHandleInjectedPackets(
tv);
496 pthread_exit((
void *) 0);
501 pthread_exit((
void *) -1);
505 static void *TmThreadsManagement(
void *td)
527 void *slot_data = NULL;
531 pthread_exit((
void *) -1);
562 pthread_exit((
void *) -1);
568 pthread_exit((
void *) 0);
586 printf(
"Both slot name and function pointer can't be NULL inside "
587 "TmThreadSetSlots\n");
594 if (strcmp(name,
"varslot") == 0) {
596 }
else if (strcmp(name,
"pktacqloop") == 0) {
598 }
else if (strcmp(name,
"management") == 0) {
600 }
else if (strcmp(name,
"command") == 0) {
602 }
else if (strcmp(name,
"custom") == 0) {
607 printf(
"Error: Slot \"%s\" not supported\n", name);
620 for (
int i = 0; i <
TVT_MAX; i++) {
624 while (slots != NULL) {
625 if (slots == tm_slot) {
652 memset(slot, 0,
sizeof(
TmSlot));
699 for (
int i = 0; i <
TVT_MAX; i++) {
703 while (slots != NULL) {
704 if (slots->
tm_id == tm_id) {
717 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
718 static int SetCPUAffinitySet(cpu_set_t *cs)
720 #if defined OS_FREEBSD
721 int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
724 int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
725 (
void*)cs, THREAD_AFFINITY_POLICY_COUNT);
727 pid_t tid = syscall(SYS_gettid);
728 int r = sched_setaffinity(tid,
sizeof(cpu_set_t), cs);
732 printf(
"Warning: sched_setaffinity failed (%" PRId32
"): %s\n", r,
749 static int SetCPUAffinity(uint16_t cpuid)
751 #if defined __OpenBSD__ || defined sun
754 int cpu = (int)cpuid;
756 #if defined OS_WIN32 || defined __CYGWIN__
759 int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
761 printf(
"Warning: sched_setaffinity failed (%" PRId32
"): %s\n", r,
765 SCLogDebug(
"CPU Affinity for thread %lu set to CPU %" PRId32,
775 return SetCPUAffinitySet(&cs);
806 "thread %s: %s",
tv->
name, strerror(errno));
808 SCLogDebug(
"Priority set to %"PRId32
" for thread %s",
818 SCLogDebug(
"Nice value set to %"PRId32
" for thread %s",
879 SCLogPerf(
"Setting affinity for thread \"%s\"to cpu/core "
885 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
898 }
else if (CPU_ISSET(cpu, &taf->
hiprio_cpu)) {
903 SCLogPerf(
"Setting prio %d for thread \"%s\" to cpu/core "
907 SetCPUAffinitySet(&taf->
cpu_set);
909 SCLogPerf(
"Setting prio %d for thread \"%s\", "
936 const char *outq_name,
const char *outqh_name,
const char *slots,
937 void * (*fn_p)(
void *),
int mucond)
943 SCLogDebug(
"creating thread \"%s\"...", name);
961 if (inq_name != NULL && strcmp(inq_name,
"packetpool") != 0) {
976 if (inqh_name != NULL) {
993 if (outqh_name != NULL) {
1008 if (outq_name != NULL && strcmp(outq_name,
"packetpool") != 0) {
1066 const char *inqh_name,
const char *outq_name,
1067 const char *outqh_name,
const char *slots)
1128 tv =
TmThreadCreate(name, NULL, NULL, NULL, NULL,
"management", NULL, mucond);
1161 tv =
TmThreadCreate(name, NULL, NULL, NULL, NULL,
"command", NULL, mucond);
1198 if (t->
next == NULL) {
1270 if (
tv->
inq != NULL) {
1294 pthread_join(
tv->
t, NULL);
1305 static void TmThreadDrainPacketThreads(
void)
1308 struct timeval start_ts;
1309 struct timeval cur_ts;
1310 gettimeofday(&start_ts, NULL);
1313 gettimeofday(&cur_ts, NULL);
1314 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1316 "to process their packets in time");
1325 if (ThreadStillHasPackets(
tv)) {
1376 struct timeval start_ts;
1377 struct timeval cur_ts;
1378 gettimeofday(&start_ts, NULL);
1381 gettimeofday(&cur_ts, NULL);
1382 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1384 "thread - \"%s\". Killing engine",
tv->
name);
1401 while (slots != NULL) {
1414 if (ThreadStillHasPackets(
tv)) {
1455 if (
tv->
inq != NULL) {
1480 TmThreadDrainPacketThreads();
1484 static void TmThreadDebugValidateNoMorePackets(
void)
1486 #ifdef DEBUG_VALIDATION
1489 if (ThreadStillHasPackets(
tv)) {
1504 struct timeval start_ts;
1505 struct timeval cur_ts;
1508 TmThreadDrainPacketThreads();
1513 TmThreadDebugValidateNoMorePackets();
1515 gettimeofday(&start_ts, NULL);
1517 gettimeofday(&cur_ts, NULL);
1518 if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1520 "threads. Killing engine");
1531 if (
tv->
inq != NULL) {
1562 while (slots != NULL) {
1565 char *found = strstr(tm->
name, tm_name);
1580 #define MIN_WAIT_TIME 100
1581 #define MAX_WAIT_TIME 999999
1594 int r = TmThreadKillThread(
tv);
1608 #undef MIN_WAIT_TIME
1609 #undef MAX_WAIT_TIME
1615 for (i = 0; i <
TVT_MAX; i++) {
1638 TmThreadDeinitMC(
tv);
1667 char *thread_group_name = NULL;
1675 thread_group_name =
SCStrdup(name);
1676 if (
unlikely(thread_group_name == NULL)) {
1688 if ((family < 0) || (family >=
TVT_MAX))
1710 pthread_attr_t attr;
1712 printf(
"ERROR: no thread function set\n");
1717 pthread_attr_init(&attr);
1719 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1723 printf(
"ERROR; return code from pthread_create() is %" PRId32
"\n", rc);
1745 "Fatal error encountered in TmThreadInitMC. "
1750 printf(
"Error initializing the tv->m mutex\n");
1756 "Fatal error encountered in TmThreadInitMC. "
1834 for (
int i = 0; i <
TVT_MAX; i++) {
1836 while (
tv != NULL) {
1864 for (
int i = 0; i <
TVT_MAX; i++) {
1866 while (
tv != NULL) {
1880 for (
int i = 0; i <
TVT_MAX; i++) {
1903 uint16_t RX_num = 0;
1905 uint16_t FM_num = 0;
1906 uint16_t FR_num = 0;
1907 uint16_t TX_num = 0;
1909 struct timeval start_ts;
1910 struct timeval cur_ts;
1911 gettimeofday(&start_ts, NULL);
1915 for (
int i = 0; i <
TVT_MAX; i++) {
1917 while (
tv != NULL) {
1922 "initialize: flags %04x",
tv->
name,
1930 gettimeofday(&cur_ts, NULL);
1931 if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
1933 "initialize in time: flags %04x",
tv->
name,
1947 "initialize.",
tv->
name);
1953 "initialization.",
tv->
name);
1975 uint16_t app_len = 32;
1976 uint16_t buf_len = 256;
1978 char append_str[app_len];
1979 char thread_counts[buf_len];
1981 strlcpy(thread_counts,
"Threads created -> ", strlen(
"Threads created -> ") + 1);
1983 snprintf(append_str, app_len,
"RX: %u ", RX_num);
1984 strlcat(thread_counts, append_str, buf_len);
1987 snprintf(append_str, app_len,
"W: %u ", W_num);
1988 strlcat(thread_counts, append_str, buf_len);
1991 snprintf(append_str, app_len,
"TX: %u ", TX_num);
1992 strlcat(thread_counts, append_str, buf_len);
1995 snprintf(append_str, app_len,
"FM: %u ", FM_num);
1996 strlcat(thread_counts, append_str, buf_len);
1999 snprintf(append_str, app_len,
"FR: %u ", FR_num);
2000 strlcat(thread_counts, append_str, buf_len);
2002 snprintf(append_str, app_len,
" Engine started.");
2003 strlcat(thread_counts, append_str, buf_len);
2017 pthread_t
self = pthread_self();
2020 for (
int i = 0; i <
TVT_MAX; i++) {
2023 if (pthread_equal(
self,
tv->
t)) {
2041 for (
int i = 0; i <
TVT_MAX; i++) {
2043 while (
tv != NULL) {
2066 for (
int i = 0; i <
TVT_MAX; i++) {
2068 while (
tv != NULL) {
2070 SCLogNotice(
"tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2076 SCLogNotice(
"tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2081 SCLogNotice(
"tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2084 TmThreadDoDumpSlots(
tv);
2098 struct timeval
pktts;
2110 static Threads thread_store = { NULL, 0, 0 };
2116 for (
size_t s = 0; s < thread_store.
threads_size; s++) {
2118 if (t == NULL || t->
in_use == 0)
2121 SCLogNotice(
"Thread %"PRIuMAX
", %s type %d, tv %p in_use %d",
2126 SCLogNotice(
"tv %p type %u name %s tmm_flags %02X flags %X",
2140 if (thread_store.
threads == NULL) {
2163 thread_store.
threads = newmem;
2224 struct timeval systs;
2225 gettimeofday(&systs, NULL);
2234 for (
size_t s = 0; s < thread_store.
threads_size; s++) {
2249 struct timeval systs;
2250 gettimeofday(&systs, NULL);
2252 for (
size_t s = 0; s < thread_store.
threads_size; s++) {
2264 struct timeval local, nullts;
2265 memset(&local, 0,
sizeof(local));
2266 memset(&nullts, 0,
sizeof(nullts));
2269 struct timeval systs;
2270 gettimeofday(&systs, NULL);
2277 if (!(timercmp(&t->
pktts, &nullts, ==))) {
2286 if (timercmp(&t->
pktts, &local, <)) {
2294 SCLogDebug(
"ts->tv_sec %"PRIuMAX, (uintmax_t)
ts->tv_sec);
2302 if (thread_max == 0)
2306 if (thread_max > 1024) {
2332 while (*packets != NULL) {
2339 if (
tv->
inq != NULL) {
2361 if (
tv->
inq != NULL) {
const char * thread_name_workers
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
struct FlowQueue_ * flow_queue
bool TmThreadsTimeSubsysIsReady(void)
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
thread_local uint64_t spin_lock_cnt
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
int TmqhNameToID(const char *name)
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
const char * thread_name_flow_mgr
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
#define SCCtrlMutexDestroy
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
ThreadVars * TmThreadsGetCallingThread(void)
Returns the TV for the calling thread.
thread_local uint64_t rwr_lock_cnt
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
void PacketEnqueue(PacketQueue *q, Packet *p)
ThreadVars * TmThreadsGetTVContainingSlot(TmSlot *tm_slot)
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
#define SCSetThreadName(n)
const char * thread_name_flow_rec
void(* OutHandler)(ThreadVars *, Packet *)
int AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
const char * thread_name_autofp
void TmThreadsInitThreadsTimestamp(const struct timeval *ts)
int StatsSetupPrivate(ThreadVars *tv)
struct PacketQueue_ * stream_pq_local
ThreadVars * tv_root[TVT_MAX]
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
#define SCCtrlCondDestroy
#define SCMUTEX_INITIALIZER
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
#define THREAD_SET_PRIORITY
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
thread_local uint64_t rww_lock_contention
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
void TmThreadDisablePacketThreads(void)
Disable all packet threads.
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
void PacketPoolInit(void)
Packet *(* InHandler)(ThreadVars *)
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
int TmThreadsInjectPacketsById(Packet **packets, const int id)
thread_local uint64_t rwr_lock_contention
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
void TmThreadContinueThreads()
Unpauses all threads present in tv_root.
size_t strlcpy(char *dst, const char *src, size_t siz)
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
thread_local uint64_t spin_lock_wait_ticks
#define PKT_SET_SRC(p, src_val)
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
void(* InShutdownHandler)(ThreadVars *)
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
size_t strlcat(char *, const char *src, size_t siz)
struct TmSlot_ * tm_slots
TmSlot * TmSlotGetSlotForTM(int tm_id)
Returns the slot holding a TM with the particular tm_id.
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
TmEcode(* Management)(ThreadVars *, void *)
#define SCMutexUnlock(mut)
void TmThreadPauseThreads()
Pauses all threads present in tv_root.
StatsPublicThreadContext perf_public_ctx
#define PKT_PSEUDO_STREAM_END
void TmThreadDumpThreads(void)
thread_local uint64_t rww_lock_cnt
Per thread variable structure.
thread_local uint64_t mutex_lock_contention
Tmq * TmqGetQueueByName(const char *name)
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
TmEcode(* Management)(ThreadVars *, void *)
thread_local uint64_t spin_lock_contention
TmEcode(* Func)(ThreadVars *, Packet *, void *)
void TmThreadClearThreadsFamily(int family)
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
const char * PktSrcToString(enum PktSrcEnum pkt_src)
void TmThreadsUnregisterThread(const int id)
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
int threading_set_cpu_affinity
void(* OutHandlerCtxFree)(void *)
@ SC_ERR_INVALID_ARGUMENT
struct ThreadVars_ * next
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
void TmThreadKillThreads(void)
#define PACKET_PROFILING_TMM_END(p, id)
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
uint8_t thread_setup_flags
thread_local uint64_t rww_lock_wait_ticks
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
thread_local uint64_t rwr_lock_wait_ticks
#define SCMutexInit(mut, mutattrs)
#define TM_FLAG_RECEIVE_TM
void TmThreadPause(ThreadVars *tv)
Pauses a thread.
#define SCGetThreadIdLong(...)
#define SCRealloc(ptr, sz)
struct PacketQueue_ * stream_pq
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
TmSlot * TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name)
void TmThreadKillThreadsFamily(int family)
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
struct TmSlot_ * tm_flowworker
int TmThreadGetNbThreads(uint8_t type)
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
const void * slot_initdata
#define FatalError(x,...)
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
#define THREAD_SET_AFFTYPE
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Packet * PacketDequeue(PacketQueue *q)
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
thread_local uint64_t mutex_lock_wait_ticks
#define PACKET_PROFILING_TMM_START(p, id)
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
void *(* OutHandlerCtxSetup)(const char *)
Tmq * TmqCreateQueue(const char *name)
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
@ PKT_SRC_DETECT_RELOAD_FLUSH
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
const char * thread_name_verdict
@ SC_ERR_THREAD_NICE_PRIO
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Tmqh * TmqhGetQueueHandlerByID(const int id)
#define SC_ATOMIC_INITPTR(name)
PacketQueueNoLock decode_pq
#define SCCtrlMutexInit(mut, mutattr)
FlowQueue * FlowQueueNew()
void EngineDone(void)
Used to indicate that the current task is done.
#define THREAD_SET_AFFINITY
#define StatsSyncCounters(tv)
void TmThreadsListThreads(void)
struct TmSlot_ * slot_next
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
uint16_t TmThreadsGetWorkerThreadMax()
void PacketPoolDestroy(void)
thread_local uint64_t mutex_lock_cnt
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Tmqh * TmqhGetQueueHandlerByName(const char *name)
void StatsThreadCleanup(ThreadVars *tv)
void *(* tm_func)(void *)
float threading_detect_ratio
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
volatile uint8_t suricata_ctl_flags
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.