suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2022 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-affinity.h"
39 #include "util-debug.h"
40 #include "util-privs.h"
41 #include "util-cpu.h"
42 #include "util-optimize.h"
43 #include "util-profiling.h"
44 #include "util-signal.h"
45 #include "queue.h"
46 
47 #ifdef PROFILE_LOCKING
48 thread_local uint64_t mutex_lock_contention;
49 thread_local uint64_t mutex_lock_wait_ticks;
50 thread_local uint64_t mutex_lock_cnt;
51 
52 thread_local uint64_t spin_lock_contention;
53 thread_local uint64_t spin_lock_wait_ticks;
54 thread_local uint64_t spin_lock_cnt;
55 
56 thread_local uint64_t rww_lock_contention;
57 thread_local uint64_t rww_lock_wait_ticks;
58 thread_local uint64_t rww_lock_cnt;
59 
60 thread_local uint64_t rwr_lock_contention;
61 thread_local uint64_t rwr_lock_wait_ticks;
62 thread_local uint64_t rwr_lock_cnt;
63 #endif
64 
65 #ifdef OS_FREEBSD
66 #include <sched.h>
67 #include <sys/param.h>
68 #include <sys/resource.h>
69 #include <sys/cpuset.h>
70 #include <sys/thr.h>
71 #define cpu_set_t cpuset_t
72 #endif /* OS_FREEBSD */
73 
74 /* prototypes */
75 static int SetCPUAffinity(uint16_t cpu);
76 static void TmThreadDeinitMC(ThreadVars *tv);
77 
78 /* root of the threadvars list */
79 ThreadVars *tv_root[TVT_MAX] = { NULL };
80 
81 /* lock to protect tv_root */
83 
84 /**
85  * \brief Check if a thread flag is set.
86  *
87  * \retval 1 flag is set.
88  * \retval 0 flag is not set.
89  */
90 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
91 {
92  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
93 }
94 
95 /**
96  * \brief Set a thread flag.
97  */
98 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
99 {
100  SC_ATOMIC_OR(tv->flags, flag);
101 }
102 
103 /**
104  * \brief Unset a thread flag.
105  */
106 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
107 {
108  SC_ATOMIC_AND(tv->flags, ~flag);
109 }
110 
111 /**
112  * \brief Separate run function so we can call it recursively.
113  */
115 {
116  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
117  PACKET_PROFILING_TMM_START(p, s->tm_id);
118  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
119  PACKET_PROFILING_TMM_END(p, s->tm_id);
120 
121  /* handle error */
122  if (unlikely(r == TM_ECODE_FAILED)) {
123  /* Encountered error. Return packets to packetpool and return */
124  TmThreadsSlotProcessPktFail(tv, s, NULL);
125  return TM_ECODE_FAILED;
126  }
127 
128  /* handle new pseudo packets immediately */
129  while (tv->decode_pq.top != NULL) {
130  Packet *extra_p = PacketDequeueNoLock(&tv->decode_pq);
131  if (unlikely(extra_p == NULL))
132  continue;
133 
134  /* see if we need to process the packet */
135  if (s->slot_next != NULL) {
136  r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
137  if (unlikely(r == TM_ECODE_FAILED)) {
138  TmThreadsSlotProcessPktFail(tv, s, extra_p);
139  return TM_ECODE_FAILED;
140  }
141  }
142  tv->tmqh_out(tv, extra_p);
143  }
144  }
145 
146  return TM_ECODE_OK;
147 }
148 
149 /** \internal
150  *
151  * \brief Process flow timeout packets
152  *
153  * Process flow timeout pseudo packets. During shutdown this loop
154  * is run until the flow engine kills the thread and the queue is
155  * empty.
156  */
157 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
158 {
159  TmSlot *fw_slot = tv->tm_flowworker;
160  int r = TM_ECODE_OK;
161 
162  if (tv->stream_pq == NULL || fw_slot == NULL) {
163  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
164  return r;
165  }
166 
167  SCLogDebug("flow end loop starting");
168  while (1) {
170  uint32_t len = tv->stream_pq->len;
172  if (len > 0) {
173  while (len--) {
177  if (likely(p)) {
178  r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
179  if (r == TM_ECODE_FAILED) {
180  break;
181  }
182  }
183  }
184  } else {
186  break;
187  }
188  SleepUsec(1);
189  }
190  }
191  SCLogDebug("flow end loop complete");
193 
194  return r;
195 }
196 
197 /*
198 
199  pcap/nfq
200 
201  pkt read
202  callback
203  process_pkt
204 
205  pfring
206 
207  pkt read
208  process_pkt
209 
210  slot:
211  setup
212 
213  pkt_ack_loop(tv, slot_data)
214 
215  deinit
216 
217  process_pkt:
218  while(s)
219  run s;
220  queue;
221 
222  */
223 
224 static void *TmThreadsSlotPktAcqLoop(void *td)
225 {
226  ThreadVars *tv = (ThreadVars *)td;
227  TmSlot *s = tv->tm_slots;
228  char run = 1;
229  TmEcode r = TM_ECODE_OK;
230  TmSlot *slot = NULL;
231 
233 
234  if (tv->thread_setup_flags != 0)
236 
237  /* Drop the capabilities for this thread */
238  SCDropCaps(tv);
239 
240  PacketPoolInit();
241 
242  /* check if we are setup properly */
243  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
244  SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
245  " PktAcqLoop=%p, tmqh_in=%p,"
246  " tmqh_out=%p",
247  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
249  pthread_exit((void *) -1);
250  return NULL;
251  }
252 
253  for (slot = s; slot != NULL; slot = slot->slot_next) {
254  if (slot->SlotThreadInit != NULL) {
255  void *slot_data = NULL;
256  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
257  if (r != TM_ECODE_OK) {
258  if (r == TM_ECODE_DONE) {
259  EngineDone();
261  goto error;
262  } else {
264  goto error;
265  }
266  }
267  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
268  }
269 
270  /* if the flowworker module is the first, get the threads input queue */
271  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
272  tv->stream_pq = tv->inq->pq;
273  tv->tm_flowworker = slot;
274  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
276  if (tv->flow_queue == NULL) {
278  pthread_exit((void *) -1);
279  return NULL;
280  }
281  /* setup a queue */
282  } else if (slot->tm_id == TMM_FLOWWORKER) {
283  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
284  if (tv->stream_pq_local == NULL)
285  FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
288  tv->tm_flowworker = slot;
289  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
291  if (tv->flow_queue == NULL) {
293  pthread_exit((void *) -1);
294  return NULL;
295  }
296  }
297  }
298 
300 
302 
303  while(run) {
308  }
309 
310  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
311 
312  if (r == TM_ECODE_FAILED) {
314  run = 0;
315  }
317  run = 0;
318  }
319  if (r == TM_ECODE_DONE) {
320  run = 0;
321  }
322  }
324 
326 
327  /* process all pseudo packets the flow timeout may throw at us */
328  TmThreadTimeoutLoop(tv, s);
329 
332 
334 
335  for (slot = s; slot != NULL; slot = slot->slot_next) {
336  if (slot->SlotThreadExitPrintStats != NULL) {
337  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
338  }
339 
340  if (slot->SlotThreadDeinit != NULL) {
341  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
342  if (r != TM_ECODE_OK) {
344  goto error;
345  }
346  }
347  }
348 
349  tv->stream_pq = NULL;
350  SCLogDebug("%s ending", tv->name);
352  pthread_exit((void *) 0);
353  return NULL;
354 
355 error:
356  tv->stream_pq = NULL;
357  pthread_exit((void *) -1);
358  return NULL;
359 }
360 
361 static void *TmThreadsSlotVar(void *td)
362 {
363  ThreadVars *tv = (ThreadVars *)td;
364  TmSlot *s = (TmSlot *)tv->tm_slots;
365  Packet *p = NULL;
366  char run = 1;
367  TmEcode r = TM_ECODE_OK;
368 
369  PacketPoolInit();//Empty();
370 
372 
373  if (tv->thread_setup_flags != 0)
375 
376  /* Drop the capabilities for this thread */
377  SCDropCaps(tv);
378 
379  /* check if we are setup properly */
380  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
382  pthread_exit((void *) -1);
383  return NULL;
384  }
385 
386  for (; s != NULL; s = s->slot_next) {
387  if (s->SlotThreadInit != NULL) {
388  void *slot_data = NULL;
389  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
390  if (r != TM_ECODE_OK) {
392  goto error;
393  }
394  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
395  }
396 
397  /* special case: we need to access the stream queue
398  * from the flow timeout code */
399 
400  /* if the flowworker module is the first, get the threads input queue */
401  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
402  tv->stream_pq = tv->inq->pq;
403  tv->tm_flowworker = s;
404  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
406  if (tv->flow_queue == NULL) {
408  pthread_exit((void *) -1);
409  return NULL;
410  }
411  /* setup a queue */
412  } else if (s->tm_id == TMM_FLOWWORKER) {
413  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
414  if (tv->stream_pq_local == NULL)
415  FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
418  tv->tm_flowworker = s;
419  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
421  if (tv->flow_queue == NULL) {
423  pthread_exit((void *) -1);
424  return NULL;
425  }
426  }
427  }
428 
430 
431  // Each 'worker' thread uses this func to process/decode the packet read.
432  // Each decode method is different to receive methods in that they do not
433  // enter infinite loops. They use this as the core loop. As a result, at this
434  // point the worker threads can be considered both initialized and running.
436 
437  s = (TmSlot *)tv->tm_slots;
438 
439  while (run) {
444  }
445 
446  /* input a packet */
447  p = tv->tmqh_in(tv);
448 
449  /* if we didn't get a packet see if we need to do some housekeeping */
450  if (unlikely(p == NULL)) {
451  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
453  if (p != NULL) {
456  }
457  }
458  }
459 
460  if (p != NULL) {
461  /* run the thread module(s) */
462  r = TmThreadsSlotVarRun(tv, p, s);
463  if (r == TM_ECODE_FAILED) {
466  break;
467  }
468 
469  /* output the packet */
470  tv->tmqh_out(tv, p);
471 
472  /* now handle the stream pq packets */
473  TmThreadsHandleInjectedPackets(tv);
474  }
475 
477  run = 0;
478  }
479  } /* while (run) */
481 
484 
486 
487  s = (TmSlot *)tv->tm_slots;
488 
489  for ( ; s != NULL; s = s->slot_next) {
490  if (s->SlotThreadExitPrintStats != NULL) {
491  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
492  }
493 
494  if (s->SlotThreadDeinit != NULL) {
495  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
496  if (r != TM_ECODE_OK) {
498  goto error;
499  }
500  }
501  }
502 
503  SCLogDebug("%s ending", tv->name);
504  tv->stream_pq = NULL;
506  pthread_exit((void *) 0);
507  return NULL;
508 
509 error:
510  tv->stream_pq = NULL;
511  pthread_exit((void *) -1);
512  return NULL;
513 }
514 
515 static void *TmThreadsManagement(void *td)
516 {
517  ThreadVars *tv = (ThreadVars *)td;
518  TmSlot *s = (TmSlot *)tv->tm_slots;
519  TmEcode r = TM_ECODE_OK;
520 
521  BUG_ON(s == NULL);
522 
524 
525  if (tv->thread_setup_flags != 0)
527 
528  /* Drop the capabilities for this thread */
529  SCDropCaps(tv);
530 
531  SCLogDebug("%s starting", tv->name);
532 
533  if (s->SlotThreadInit != NULL) {
534  void *slot_data = NULL;
535  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
536  if (r != TM_ECODE_OK) {
538  pthread_exit((void *) -1);
539  return NULL;
540  }
541  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
542  }
543 
545 
547 
548  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
549  /* handle error */
550  if (r == TM_ECODE_FAILED) {
552  }
553 
556  }
557 
560 
561  if (s->SlotThreadExitPrintStats != NULL) {
562  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
563  }
564 
565  if (s->SlotThreadDeinit != NULL) {
566  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
567  if (r != TM_ECODE_OK) {
569  pthread_exit((void *) -1);
570  return NULL;
571  }
572  }
573 
575  pthread_exit((void *) 0);
576  return NULL;
577 }
578 
579 /**
580  * \brief We set the slot functions.
581  *
582  * \param tv Pointer to the TV to set the slot function for.
583  * \param name Name of the slot variant.
584  * \param fn_p Pointer to a custom slot function. Used only if slot variant
585  * "name" is "custom".
586  *
587  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
588  */
589 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
590 {
591  if (name == NULL) {
592  if (fn_p == NULL) {
593  printf("Both slot name and function pointer can't be NULL inside "
594  "TmThreadSetSlots\n");
595  goto error;
596  } else {
597  name = "custom";
598  }
599  }
600 
601  if (strcmp(name, "varslot") == 0) {
602  tv->tm_func = TmThreadsSlotVar;
603  } else if (strcmp(name, "pktacqloop") == 0) {
604  tv->tm_func = TmThreadsSlotPktAcqLoop;
605  } else if (strcmp(name, "management") == 0) {
606  tv->tm_func = TmThreadsManagement;
607  } else if (strcmp(name, "command") == 0) {
608  tv->tm_func = TmThreadsManagement;
609  } else if (strcmp(name, "custom") == 0) {
610  if (fn_p == NULL)
611  goto error;
612  tv->tm_func = fn_p;
613  } else {
614  printf("Error: Slot \"%s\" not supported\n", name);
615  goto error;
616  }
617 
618  return TM_ECODE_OK;
619 
620 error:
621  return TM_ECODE_FAILED;
622 }
623 
624 /**
625  * \brief Appends a new entry to the slots.
626  *
627  * \param tv TV the slot is attached to.
628  * \param tm TM to append.
629  * \param data Data to be passed on to the slot init function.
630  *
631  * \retval The allocated TmSlot or NULL if there is an error
632  */
633 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
634 {
635  TmSlot *slot = SCMalloc(sizeof(TmSlot));
636  if (unlikely(slot == NULL))
637  return;
638  memset(slot, 0, sizeof(TmSlot));
639  SC_ATOMIC_INITPTR(slot->slot_data);
640  slot->SlotThreadInit = tm->ThreadInit;
641  slot->slot_initdata = data;
642  if (tm->Func) {
643  slot->SlotFunc = tm->Func;
644  } else if (tm->PktAcqLoop) {
645  slot->PktAcqLoop = tm->PktAcqLoop;
646  if (tm->PktAcqBreakLoop) {
647  tv->break_loop = true;
648  }
649  } else if (tm->Management) {
650  slot->Management = tm->Management;
651  }
653  slot->SlotThreadDeinit = tm->ThreadDeinit;
654  /* we don't have to check for the return value "-1". We wouldn't have
655  * received a TM as arg, if it didn't exist */
656  slot->tm_id = TmModuleGetIDForTM(tm);
657 
658  tv->tmm_flags |= tm->flags;
659  tv->cap_flags |= tm->cap_flags;
660 
661  if (tv->tm_slots == NULL) {
662  tv->tm_slots = slot;
663  } else {
664  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
665 
666  /* get the last slot */
667  for ( ; a != NULL; a = a->slot_next) {
668  b = a;
669  }
670  /* append the new slot */
671  if (b != NULL) {
672  b->slot_next = slot;
673  }
674  }
675  return;
676 }
677 
678 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
679 static int SetCPUAffinitySet(cpu_set_t *cs)
680 {
681 #if defined OS_FREEBSD
682  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
683  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
684 #elif OS_DARWIN
685  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
686  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
687 #else
688  pid_t tid = syscall(SYS_gettid);
689  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
690 #endif /* OS_FREEBSD */
691 
692  if (r != 0) {
693  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
694  strerror(errno));
695  return -1;
696  }
697 
698  return 0;
699 }
700 #endif
701 
702 
703 /**
704  * \brief Set the thread affinity on the calling thread.
705  *
706  * \param cpuid Id of the core/cpu to setup the affinity.
707  *
708  * \retval 0 If all goes well; -1 if something is wrong.
709  */
710 static int SetCPUAffinity(uint16_t cpuid)
711 {
712 #if defined __OpenBSD__ || defined sun
713  return 0;
714 #else
715  int cpu = (int)cpuid;
716 
717 #if defined OS_WIN32 || defined __CYGWIN__
718  DWORD cs = 1 << cpu;
719 
720  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
721  if (r != 0) {
722  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
723  strerror(errno));
724  return -1;
725  }
726  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
727  SCGetThreadIdLong(), cpu);
728 
729  return 0;
730 
731 #else
732  cpu_set_t cs;
733 
734  CPU_ZERO(&cs);
735  CPU_SET(cpu, &cs);
736  return SetCPUAffinitySet(&cs);
737 #endif /* windows */
738 #endif /* not supported */
739 }
740 
741 
742 /**
743  * \brief Set the thread options (thread priority).
744  *
745  * \param tv Pointer to the ThreadVars to setup the thread priority.
746  *
747  * \retval TM_ECODE_OK.
748  */
750 {
752  tv->thread_priority = prio;
753 
754  return TM_ECODE_OK;
755 }
756 
757 /**
758  * \brief Adjusting nice value for threads.
759  */
761 {
762  SCEnter();
763 #ifndef __CYGWIN__
764 #ifdef OS_WIN32
765  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
766  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for "
767  "thread %s: %s", tv->name, strerror(errno));
768  } else {
769  SCLogDebug("Priority set to %"PRId32" for thread %s",
771  }
772 #else
773  int ret = nice(tv->thread_priority);
774  if (ret == -1) {
775  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value %d "
776  "for thread %s: %s", tv->thread_priority, tv->name,
777  strerror(errno));
778  } else {
779  SCLogDebug("Nice value set to %"PRId32" for thread %s",
781  }
782 #endif /* OS_WIN32 */
783 #endif
784  SCReturn;
785 }
786 
787 
788 /**
789  * \brief Set the thread options (cpu affinity).
790  *
791  * \param tv pointer to the ThreadVars to setup the affinity.
792  * \param cpu cpu on which affinity is set.
793  *
794  * \retval TM_ECODE_OK
795  */
797 {
799  tv->cpu_affinity = cpu;
800 
801  return TM_ECODE_OK;
802 }
803 
804 
806 {
808  return TM_ECODE_OK;
809 
810  if (type > MAX_CPU_SET) {
811  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
812  return TM_ECODE_FAILED;
813  }
814 
816  tv->cpu_affinity = type;
817 
818  return TM_ECODE_OK;
819 }
820 
822 {
823  if (type >= MAX_CPU_SET) {
824  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
825  return 0;
826  }
827 
829 }
830 
831 /**
832  * \brief Set the thread options (cpu affinitythread).
833  * Priority should be already set by pthread_create.
834  *
835  * \param tv pointer to the ThreadVars of the calling thread.
836  */
838 {
840  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
841  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
843  SetCPUAffinity(tv->cpu_affinity);
844  }
845 
846 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
851  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
852  uint16_t cpu = AffinityGetNextCPU(taf);
853  SetCPUAffinity(cpu);
854  /* If CPU is in a set overwrite the default thread prio */
855  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
857  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
859  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
861  } else {
862  tv->thread_priority = taf->prio;
863  }
864  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
865  "%d, thread id %lu", tv->thread_priority,
866  tv->name, cpu, SCGetThreadIdLong());
867  } else {
868  SetCPUAffinitySet(&taf->cpu_set);
869  tv->thread_priority = taf->prio;
870  SCLogPerf("Setting prio %d for thread \"%s\", "
871  "thread id %lu", tv->thread_priority,
873  }
875  }
876 #endif
877 
878  return TM_ECODE_OK;
879 }
880 
881 /**
882  * \brief Creates and returns the TV instance for a new thread.
883  *
884  * \param name Name of this TV instance
885  * \param inq_name Incoming queue name
886  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
887  * \param outq_name Outgoing queue name
888  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
889  * \param slots String representation for the slot function to be used
890  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
891  * \param mucond Flag to indicate whether to initialize the condition
892  * and the mutex variables for this newly created TV.
893  *
894  * \retval the newly created TV instance, or NULL on error
895  */
896 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
897  const char *outq_name, const char *outqh_name, const char *slots,
898  void * (*fn_p)(void *), int mucond)
899 {
900  ThreadVars *tv = NULL;
901  Tmq *tmq = NULL;
902  Tmqh *tmqh = NULL;
903 
904  SCLogDebug("creating thread \"%s\"...", name);
905 
906  /* XXX create separate function for this: allocate a thread container */
907  tv = SCMalloc(sizeof(ThreadVars));
908  if (unlikely(tv == NULL))
909  goto error;
910  memset(tv, 0, sizeof(ThreadVars));
911 
912  SC_ATOMIC_INIT(tv->flags);
913  SCMutexInit(&tv->perf_public_ctx.m, NULL);
914 
915  strlcpy(tv->name, name, sizeof(tv->name));
916 
917  /* default state for every newly created thread */
920 
921  /* set the incoming queue */
922  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
923  SCLogDebug("inq_name \"%s\"", inq_name);
924 
925  tmq = TmqGetQueueByName(inq_name);
926  if (tmq == NULL) {
927  tmq = TmqCreateQueue(inq_name);
928  if (tmq == NULL)
929  goto error;
930  }
931  SCLogDebug("tmq %p", tmq);
932 
933  tv->inq = tmq;
934  tv->inq->reader_cnt++;
935  SCLogDebug("tv->inq %p", tv->inq);
936  }
937  if (inqh_name != NULL) {
938  SCLogDebug("inqh_name \"%s\"", inqh_name);
939 
940  int id = TmqhNameToID(inqh_name);
941  if (id <= 0) {
942  goto error;
943  }
944  tmqh = TmqhGetQueueHandlerByName(inqh_name);
945  if (tmqh == NULL)
946  goto error;
947 
948  tv->tmqh_in = tmqh->InHandler;
949  tv->inq_id = (uint8_t)id;
950  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
951  }
952 
953  /* set the outgoing queue */
954  if (outqh_name != NULL) {
955  SCLogDebug("outqh_name \"%s\"", outqh_name);
956 
957  int id = TmqhNameToID(outqh_name);
958  if (id <= 0) {
959  goto error;
960  }
961 
962  tmqh = TmqhGetQueueHandlerByName(outqh_name);
963  if (tmqh == NULL)
964  goto error;
965 
966  tv->tmqh_out = tmqh->OutHandler;
967  tv->outq_id = (uint8_t)id;
968 
969  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
970  SCLogDebug("outq_name \"%s\"", outq_name);
971 
972  if (tmqh->OutHandlerCtxSetup != NULL) {
973  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
974  if (tv->outctx == NULL)
975  goto error;
976  tv->outq = NULL;
977  } else {
978  tmq = TmqGetQueueByName(outq_name);
979  if (tmq == NULL) {
980  tmq = TmqCreateQueue(outq_name);
981  if (tmq == NULL)
982  goto error;
983  }
984  SCLogDebug("tmq %p", tmq);
985 
986  tv->outq = tmq;
987  tv->outctx = NULL;
988  tv->outq->writer_cnt++;
989  }
990  }
991  }
992 
993  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
994  goto error;
995  }
996 
997  if (mucond != 0)
999 
1000  return tv;
1001 
1002 error:
1003  SCLogError(SC_ERR_THREAD_CREATE, "failed to setup a thread");
1004 
1005  if (tv != NULL)
1006  SCFree(tv);
1007  return NULL;
1008 }
1009 
1010 /**
1011  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1012  * This function doesn't support custom slots, and hence shouldn't be
1013  * supplied \"custom\" as its slot type. All PPT threads are created
1014  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1015  * conditional variables are not used to kill the thread.
1016  *
1017  * \param name Name of this TV instance
1018  * \param inq_name Incoming queue name
1019  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1020  * \param outq_name Outgoing queue name
1021  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1022  * \param slots String representation for the slot function to be used
1023  *
1024  * \retval the newly created TV instance, or NULL on error
1025  */
1026 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1027  const char *inqh_name, const char *outq_name,
1028  const char *outqh_name, const char *slots)
1029 {
1030  ThreadVars *tv = NULL;
1031 
1032  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1033  slots, NULL, 0);
1034 
1035  if (tv != NULL) {
1036  tv->type = TVT_PPT;
1038  }
1039 
1040  return tv;
1041 }
1042 
1043 /**
1044  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1045  * This function supports only custom slot functions and hence a
1046  * function pointer should be sent as an argument.
1047  *
1048  * \param name Name of this TV instance
1049  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1050  * \param mucond Flag to indicate whether to initialize the condition
1051  * and the mutex variables for this newly created TV.
1052  *
1053  * \retval the newly created TV instance, or NULL on error
1054  */
1055 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1056  int mucond)
1057 {
1058  ThreadVars *tv = NULL;
1059 
1060  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1061 
1062  if (tv != NULL) {
1063  tv->type = TVT_MGMT;
1066  }
1067 
1068  return tv;
1069 }
1070 
1071 /**
1072  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1073  * This function supports only custom slot functions and hence a
1074  * function pointer should be sent as an argument.
1075  *
1076  * \param name Name of this TV instance
1077  * \param module Name of TmModule with MANAGEMENT flag set.
1078  * \param mucond Flag to indicate whether to initialize the condition
1079  * and the mutex variables for this newly created TV.
1080  *
1081  * \retval the newly created TV instance, or NULL on error
1082  */
1083 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1084  int mucond)
1085 {
1086  ThreadVars *tv = NULL;
1087 
1088  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1089 
1090  if (tv != NULL) {
1091  tv->type = TVT_MGMT;
1094 
1095  TmModule *m = TmModuleGetByName(module);
1096  if (m) {
1097  TmSlotSetFuncAppend(tv, m, NULL);
1098  }
1099  }
1100 
1101  return tv;
1102 }
1103 
1104 /**
1105  * \brief Creates and returns the TV instance for a Command thread (CMD).
1106  * This function supports only custom slot functions and hence a
1107  * function pointer should be sent as an argument.
1108  *
1109  * \param name Name of this TV instance
1110  * \param module Name of TmModule with COMMAND flag set.
1111  * \param mucond Flag to indicate whether to initialize the condition
1112  * and the mutex variables for this newly created TV.
1113  *
1114  * \retval the newly created TV instance, or NULL on error
1115  */
1116 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1117  int mucond)
1118 {
1119  ThreadVars *tv = NULL;
1120 
1121  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1122 
1123  if (tv != NULL) {
1124  tv->type = TVT_CMD;
1127 
1128  TmModule *m = TmModuleGetByName(module);
1129  if (m) {
1130  TmSlotSetFuncAppend(tv, m, NULL);
1131  }
1132  }
1133 
1134  return tv;
1135 }
1136 
1137 /**
1138  * \brief Appends this TV to tv_root based on its type
1139  *
1140  * \param type holds the type this TV belongs to.
1141  */
1143 {
1145 
1146  if (tv_root[type] == NULL) {
1147  tv_root[type] = tv;
1148  tv->next = NULL;
1149 
1151 
1152  return;
1153  }
1154 
1155  ThreadVars *t = tv_root[type];
1156 
1157  while (t) {
1158  if (t->next == NULL) {
1159  t->next = tv;
1160  tv->next = NULL;
1161  break;
1162  }
1163 
1164  t = t->next;
1165  }
1166 
1168 
1169  return;
1170 }
1171 
1172 static bool ThreadStillHasPackets(ThreadVars *tv)
1173 {
1174  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1175  /* we wait till we dry out all the inq packets, before we
1176  * kill this thread. Do note that you should have disabled
1177  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1178  PacketQueue *q = tv->inq->pq;
1179  SCMutexLock(&q->mutex_q);
1180  uint32_t len = q->len;
1181  SCMutexUnlock(&q->mutex_q);
1182  if (len != 0) {
1183  return true;
1184  }
1185  }
1186 
1187  if (tv->stream_pq != NULL) {
1189  uint32_t len = tv->stream_pq->len;
1191 
1192  if (len != 0) {
1193  return true;
1194  }
1195  }
1196  return false;
1197 }
1198 
1199 /**
1200  * \brief Kill a thread.
1201  *
1202  * \param tv A ThreadVars instance corresponding to the thread that has to be
1203  * killed.
1204  *
1205  * \retval r 1 killed succesfully
1206  * 0 not yet ready, needs another look
1207  */
1208 static int TmThreadKillThread(ThreadVars *tv)
1209 {
1210  BUG_ON(tv == NULL);
1211 
1212  /* kill only once :) */
1213  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1214  return 1;
1215  }
1216 
1217  /* set the thread flag informing the thread that it needs to be
1218  * terminated */
1221 
1222  /* to be sure, signal more */
1223  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1224  if (tv->inq_id != TMQH_NOT_SET) {
1226  if (qh != NULL && qh->InShutdownHandler != NULL) {
1227  qh->InShutdownHandler(tv);
1228  }
1229  }
1230  if (tv->inq != NULL) {
1231  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1232  SCCondSignal(&tv->inq->pq->cond_q);
1233  }
1234  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1235  }
1236 
1237  if (tv->ctrl_cond != NULL ) {
1238  pthread_cond_broadcast(tv->ctrl_cond);
1239  }
1240  return 0;
1241  }
1242 
1243  if (tv->outctx != NULL) {
1244  if (tv->outq_id != TMQH_NOT_SET) {
1246  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1247  qh->OutHandlerCtxFree(tv->outctx);
1248  tv->outctx = NULL;
1249  }
1250  }
1251  }
1252 
1253  /* join it and flag it as dead */
1254  pthread_join(tv->t, NULL);
1255  SCLogDebug("thread %s stopped", tv->name);
1257  return 1;
1258 }
1259 
1260 /** \internal
1261  *
1262  * \brief make sure that all packet threads are done processing their
1263  * in-flight packets, including 'injected' flow packets.
1264  */
1265 static void TmThreadDrainPacketThreads(void)
1266 {
1267  ThreadVars *tv = NULL;
1268  struct timeval start_ts;
1269  struct timeval cur_ts;
1270  gettimeofday(&start_ts, NULL);
1271 
1272 again:
1273  gettimeofday(&cur_ts, NULL);
1274  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1275  SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads "
1276  "to process their packets in time");
1277  return;
1278  }
1279 
1281 
1282  /* all receive threads are part of packet processing threads */
1283  tv = tv_root[TVT_PPT];
1284  while (tv) {
1285  if (ThreadStillHasPackets(tv)) {
1286  /* we wait till we dry out all the inq packets, before we
1287  * kill this thread. Do note that you should have disabled
1288  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1290 
1291  /* sleep outside lock */
1292  SleepMsec(1);
1293  goto again;
1294  }
1295  if (tv->flow_queue) {
1297  bool fq_done = (tv->flow_queue->qlen == 0);
1299  if (!fq_done) {
1301 
1302  Packet *p = PacketGetFromAlloc();
1303  if (p != NULL) {
1306  PacketQueue *q = tv->stream_pq;
1307  SCMutexLock(&q->mutex_q);
1308  PacketEnqueue(q, p);
1309  SCCondSignal(&q->cond_q);
1310  SCMutexUnlock(&q->mutex_q);
1311  }
1312 
1313  /* don't sleep while holding a lock */
1314  SleepMsec(1);
1315  goto again;
1316  }
1317  }
1318  tv = tv->next;
1319  }
1320 
1322  return;
1323 }
1324 
1325 /**
1326  * \brief Disable all threads having the specified TMs.
1327  *
1328  * Breaks out of the packet acquisition loop, and bumps
1329  * into the 'flow loop', where it will process packets
1330  * from the flow engine's shutdown handling.
1331  */
1333 {
1334  ThreadVars *tv = NULL;
1335  struct timeval start_ts;
1336  struct timeval cur_ts;
1337  gettimeofday(&start_ts, NULL);
1338 
1339 again:
1340  gettimeofday(&cur_ts, NULL);
1341  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1342  FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1343  "thread - \"%s\". Killing engine", tv->name);
1344  }
1345 
1347 
1348  /* all receive threads are part of packet processing threads */
1349  tv = tv_root[TVT_PPT];
1350 
1351  /* we do have to keep in mind that TVs are arranged in the order
1352  * right from receive to log. The moment we fail to find a
1353  * receive TM amongst the slots in a tv, it indicates we are done
1354  * with all receive threads */
1355  while (tv) {
1356  int disable = 0;
1357  TmModule *tm = NULL;
1358  /* obtain the slots for this TV */
1359  TmSlot *slots = tv->tm_slots;
1360  while (slots != NULL) {
1361  tm = TmModuleGetById(slots->tm_id);
1362 
1363  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1364  disable = 1;
1365  break;
1366  }
1367 
1368  slots = slots->slot_next;
1369  continue;
1370  }
1371 
1372  if (disable) {
1373  if (ThreadStillHasPackets(tv)) {
1374  /* we wait till we dry out all the inq packets, before we
1375  * kill this thread. Do note that you should have disabled
1376  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1378  /* don't sleep while holding a lock */
1379  SleepMsec(1);
1380  goto again;
1381  }
1382 
1383  if (tv->flow_queue) {
1385  bool fq_done = (tv->flow_queue->qlen == 0);
1387  if (!fq_done) {
1389 
1390  Packet *p = PacketGetFromAlloc();
1391  if (p != NULL) {
1394  PacketQueue *q = tv->stream_pq;
1395  SCMutexLock(&q->mutex_q);
1396  PacketEnqueue(q, p);
1397  SCCondSignal(&q->cond_q);
1398  SCMutexUnlock(&q->mutex_q);
1399  }
1400 
1401  /* don't sleep while holding a lock */
1402  SleepMsec(1);
1403  goto again;
1404  }
1405  }
1406 
1407  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1408  if (tm && tm->PktAcqBreakLoop != NULL) {
1409  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1410  }
1412 
1413  if (tv->inq != NULL) {
1414  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1415  SCCondSignal(&tv->inq->pq->cond_q);
1416  }
1417  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1418  }
1419 
1420  /* wait for it to enter the 'flow loop' stage */
1421  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1423 
1424  SleepMsec(1);
1425  goto again;
1426  }
1427  }
1428 
1429  tv = tv->next;
1430  }
1431 
1433 
1434  /* finally wait for all packet threads to have
1435  * processed all of their 'live' packets so we
1436  * don't process the last live packets together
1437  * with FFR packets */
1438  TmThreadDrainPacketThreads();
1439  return;
1440 }
1441 
1442 #ifdef DEBUG_VALIDATION
1443 static void TmThreadDumpThreads(void);
1444 #endif
1445 
1446 static void TmThreadDebugValidateNoMorePackets(void)
1447 {
1448 #ifdef DEBUG_VALIDATION
1450  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1451  if (ThreadStillHasPackets(tv)) {
1453  TmThreadDumpThreads();
1454  abort();
1455  }
1456  }
1458 #endif
1459 }
1460 
1461 /**
1462  * \brief Disable all packet threads
1463  */
1465 {
1466  struct timeval start_ts;
1467  struct timeval cur_ts;
1468 
1469  /* first drain all packet threads of their packets */
1470  TmThreadDrainPacketThreads();
1471 
1472  /* since all the threads possibly able to produce more packets
1473  * are now gone or inactive, we should see no packets anywhere
1474  * anymore. */
1475  TmThreadDebugValidateNoMorePackets();
1476 
1477  gettimeofday(&start_ts, NULL);
1478 again:
1479  gettimeofday(&cur_ts, NULL);
1480  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1481  FatalError(SC_ERR_FATAL, "Engine unable to disable packet "
1482  "threads. Killing engine");
1483  }
1484 
1485  /* loop through the packet threads and kill them */
1487  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1489 
1490  /* separate worker threads (autofp) will still wait at their
1491  * input queues. So nudge them here so they will observe the
1492  * THV_KILL flag. */
1493  if (tv->inq != NULL) {
1494  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1495  SCCondSignal(&tv->inq->pq->cond_q);
1496  }
1497  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1498  }
1499 
1502 
1503  SleepMsec(1);
1504  goto again;
1505  }
1506  }
1508  return;
1509 }
1510 
1511 #define MIN_WAIT_TIME 100
1512 #define MAX_WAIT_TIME 999999
1514 {
1515  ThreadVars *tv = NULL;
1516  unsigned int sleep_usec = MIN_WAIT_TIME;
1517 
1518  BUG_ON((family < 0) || (family >= TVT_MAX));
1519 
1520 again:
1522  tv = tv_root[family];
1523 
1524  while (tv) {
1525  int r = TmThreadKillThread(tv);
1526  if (r == 0) {
1528  SleepUsec(sleep_usec);
1529  sleep_usec *= 2; /* slowly back off */
1530  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1531  goto again;
1532  }
1533  sleep_usec = MIN_WAIT_TIME; /* reset */
1534 
1535  tv = tv->next;
1536  }
1538 }
1539 #undef MIN_WAIT_TIME
1540 #undef MAX_WAIT_TIME
1541 
1543 {
1544  int i = 0;
1545 
1546  for (i = 0; i < TVT_MAX; i++) {
1548  }
1549 
1550  return;
1551 }
1552 
1553 static void TmThreadFree(ThreadVars *tv)
1554 {
1555  TmSlot *s;
1556  TmSlot *ps;
1557  if (tv == NULL)
1558  return;
1559 
1560  SCLogDebug("Freeing thread '%s'.", tv->name);
1561 
1562  if (tv->flow_queue) {
1563  BUG_ON(tv->flow_queue->qlen != 0);
1564  SCFree(tv->flow_queue);
1565  }
1566 
1568 
1569  TmThreadDeinitMC(tv);
1570 
1571  if (tv->thread_group_name) {
1573  }
1574 
1575  if (tv->printable_name) {
1577  }
1578 
1579  if (tv->stream_pq_local) {
1583  }
1584 
1585  s = (TmSlot *)tv->tm_slots;
1586  while (s) {
1587  ps = s;
1588  s = s->slot_next;
1589  SCFree(ps);
1590  }
1591 
1593  SCFree(tv);
1594 }
1595 
1596 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1597 {
1598  char *thread_group_name = NULL;
1599 
1600  if (name == NULL)
1601  return;
1602 
1603  if (tv == NULL)
1604  return;
1605 
1606  thread_group_name = SCStrdup(name);
1607  if (unlikely(thread_group_name == NULL)) {
1608  SCLogError(SC_ERR_RUNMODE, "error allocating memory");
1609  return;
1610  }
1611  tv->thread_group_name = thread_group_name;
1612 }
1613 
1615 {
1616  ThreadVars *tv = NULL;
1617  ThreadVars *ptv = NULL;
1618 
1619  if ((family < 0) || (family >= TVT_MAX))
1620  return;
1621 
1623  tv = tv_root[family];
1624 
1625  while (tv) {
1626  ptv = tv;
1627  tv = tv->next;
1628  TmThreadFree(ptv);
1629  }
1630  tv_root[family] = NULL;
1632 }
1633 
1634 /**
1635  * \brief Spawns a thread associated with the ThreadVars instance tv
1636  *
1637  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1638  */
1640 {
1641  pthread_attr_t attr;
1642  if (tv->tm_func == NULL) {
1643  FatalError(SC_ERR_TM_THREADS_ERROR, "No thread function set");
1644  }
1645 
1646  /* Initialize and set thread detached attribute */
1647  pthread_attr_init(&attr);
1648 
1649  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1650 
1651  /* Adjust thread stack size if configured */
1653  SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1654  if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1656  "Unable to increase stack size to %" PRIu64 " in thread attributes",
1658  }
1659  }
1660 
1661  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1662  if (rc) {
1664  "Unable to create thread with pthread_create() is %" PRId32, rc);
1665  }
1666 
1667 #if DEBUG && HAVE_PTHREAD_GETATTR_NP
1669  if (pthread_getattr_np(tv->t, &attr) == 0) {
1670  size_t stack_size;
1671  void *stack_addr;
1672  pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1673  SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1674  } else {
1675  SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1676  "pthread_getattr_np() is %" PRId32,
1677  rc);
1678  }
1679  }
1680 #endif
1681 
1683 
1684  TmThreadAppend(tv, tv->type);
1685  return TM_ECODE_OK;
1686 }
1687 
1688 /**
1689  * \brief Initializes the mutex and condition variables for this TV
1690  *
1691  * It can be used by a thread to control a wait loop that can also be
1692  * influenced by other threads.
1693  *
1694  * \param tv Pointer to a TV instance
1695  */
1697 {
1698  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1700  "Fatal error encountered in TmThreadInitMC. "
1701  "Exiting...");
1702  }
1703 
1704  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1705  printf("Error initializing the tv->m mutex\n");
1706  exit(EXIT_FAILURE);
1707  }
1708 
1709  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1711  "Fatal error encountered in TmThreadInitMC. "
1712  "Exiting...");
1713  }
1714 
1715  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1716  FatalError(SC_ERR_FATAL, "Error initializing the tv->cond condition "
1717  "variable");
1718  }
1719 
1720  return;
1721 }
1722 
1723 static void TmThreadDeinitMC(ThreadVars *tv)
1724 {
1725  if (tv->ctrl_mutex) {
1727  SCFree(tv->ctrl_mutex);
1728  }
1729  if (tv->ctrl_cond) {
1731  SCFree(tv->ctrl_cond);
1732  }
1733  return;
1734 }
1735 
1736 /**
1737  * \brief Tests if the thread represented in the arg has been unpaused or not.
1738  *
1739  * The function would return if the thread tv has been unpaused or if the
1740  * kill flag for the thread has been set.
1741  *
1742  * \param tv Pointer to the TV instance.
1743  */
1745 {
1746  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1747  SleepUsec(100);
1748 
1750  break;
1751  }
1752 
1753  return;
1754 }
1755 
1756 /**
1757  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1758  * the kill flag has been set or not on the thread.
1759  *
1760  * \param tv Pointer to the TV instance.
1761  */
1763 {
1764  while (!TmThreadsCheckFlag(tv, flags)) {
1765  SleepUsec(100);
1766  }
1767 
1768  return;
1769 }
1770 
1771 /**
1772  * \brief Unpauses a thread
1773  *
1774  * \param tv Pointer to a TV instance that has to be unpaused
1775  */
1777 {
1779  return;
1780 }
1781 
1782 /**
1783  * \brief Waits for all threads to be in a running state
1784  *
1785  * \retval TM_ECODE_OK if all are running or error if a thread failed
1786  */
1788 {
1789  uint16_t RX_num = 0;
1790  uint16_t W_num = 0;
1791  uint16_t FM_num = 0;
1792  uint16_t FR_num = 0;
1793  uint16_t TX_num = 0;
1794 
1795  struct timeval start_ts;
1796  struct timeval cur_ts;
1797  gettimeofday(&start_ts, NULL);
1798 
1799 again:
1801  for (int i = 0; i < TVT_MAX; i++) {
1802  ThreadVars *tv = tv_root[i];
1803  while (tv != NULL) {
1806 
1808  "thread \"%s\" failed to "
1809  "start: flags %04x",
1810  tv->name, SC_ATOMIC_GET(tv->flags));
1811  return TM_ECODE_FAILED;
1812  }
1813 
1816 
1817  /* 60 seconds provided for the thread to transition from
1818  * THV_INIT_DONE to THV_RUNNING */
1819  gettimeofday(&cur_ts, NULL);
1820  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1822  "thread \"%s\" failed to "
1823  "start in time: flags %04x",
1824  tv->name, SC_ATOMIC_GET(tv->flags));
1825  return TM_ECODE_FAILED;
1826  }
1827 
1828  /* sleep a little to give the thread some
1829  * time to start running */
1830  SleepUsec(100);
1831  goto again;
1832  }
1833  tv = tv->next;
1834  }
1835  }
1836  for (int i = 0; i < TVT_MAX; i++) {
1837  for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
1838  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1839  RX_num++;
1840  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1841  W_num++;
1842  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1843  TX_num++;
1844  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1845  FM_num++;
1846  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1847  FR_num++;
1848  }
1849  }
1851 
1852  /* Construct a welcome string displaying
1853  * initialized thread types and counts */
1854  uint16_t app_len = 32;
1855  uint16_t buf_len = 256;
1856 
1857  char append_str[app_len];
1858  char thread_counts[buf_len];
1859 
1860  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1861  if (RX_num > 0) {
1862  snprintf(append_str, app_len, "RX: %u ", RX_num);
1863  strlcat(thread_counts, append_str, buf_len);
1864  }
1865  if (W_num > 0) {
1866  snprintf(append_str, app_len, "W: %u ", W_num);
1867  strlcat(thread_counts, append_str, buf_len);
1868  }
1869  if (TX_num > 0) {
1870  snprintf(append_str, app_len, "TX: %u ", TX_num);
1871  strlcat(thread_counts, append_str, buf_len);
1872  }
1873  if (FM_num > 0) {
1874  snprintf(append_str, app_len, "FM: %u ", FM_num);
1875  strlcat(thread_counts, append_str, buf_len);
1876  }
1877  if (FR_num > 0) {
1878  snprintf(append_str, app_len, "FR: %u ", FR_num);
1879  strlcat(thread_counts, append_str, buf_len);
1880  }
1881  snprintf(append_str, app_len, " Engine started.");
1882  strlcat(thread_counts, append_str, buf_len);
1883  SCLogNotice("%s", thread_counts);
1884 
1885  return TM_ECODE_OK;
1886 }
1887 
1888 /**
1889  * \brief Unpauses all threads present in tv_root
1890  */
1892 {
1894  for (int i = 0; i < TVT_MAX; i++) {
1895  ThreadVars *tv = tv_root[i];
1896  while (tv != NULL) {
1898  tv = tv->next;
1899  }
1900  }
1902  return;
1903 }
1904 
1905 /**
1906  * \brief Used to check the thread for certain conditions of failure.
1907  */
1909 {
1911  for (int i = 0; i < TVT_MAX; i++) {
1912  ThreadVars *tv = tv_root[i];
1913  while (tv) {
1915  FatalError(SC_ERR_FATAL, "thread %s failed", tv->name);
1916  }
1917  tv = tv->next;
1918  }
1919  }
1921  return;
1922 }
1923 
1924 /**
1925  * \brief Used to check if all threads have finished their initialization. On
1926  * finding an un-initialized thread, it waits till that thread completes
1927  * its initialization, before proceeding to the next thread.
1928  *
1929  * \retval TM_ECODE_OK all initialized properly
1930  * \retval TM_ECODE_FAILED failure
1931  */
1933 {
1934  struct timeval start_ts;
1935  struct timeval cur_ts;
1936  gettimeofday(&start_ts, NULL);
1937 
1938 again:
1940  for (int i = 0; i < TVT_MAX; i++) {
1941  ThreadVars *tv = tv_root[i];
1942  while (tv != NULL) {
1945 
1946  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1947  "initialize: flags %04x", tv->name,
1948  SC_ATOMIC_GET(tv->flags));
1949  return TM_ECODE_FAILED;
1950  }
1951 
1952  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
1954 
1955  gettimeofday(&cur_ts, NULL);
1956  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
1957  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1958  "initialize in time: flags %04x", tv->name,
1959  SC_ATOMIC_GET(tv->flags));
1960  return TM_ECODE_FAILED;
1961  }
1962 
1963  /* sleep a little to give the thread some
1964  * time to finish initialization */
1965  SleepUsec(100);
1966  goto again;
1967  }
1968 
1971  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1972  "initialize.", tv->name);
1973  return TM_ECODE_FAILED;
1974  }
1977  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" closed on "
1978  "initialization.", tv->name);
1979  return TM_ECODE_FAILED;
1980  }
1981 
1982  tv = tv->next;
1983  }
1984  }
1986 
1987  return TM_ECODE_OK;
1988 }
1989 
1990 /**
1991  * \brief returns a count of all the threads that match the flag
1992  */
1994 {
1995  uint32_t cnt = 0;
1997  for (int i = 0; i < TVT_MAX; i++) {
1998  ThreadVars *tv = tv_root[i];
1999  while (tv != NULL) {
2000  if ((tv->tmm_flags & flags) == flags)
2001  cnt++;
2002 
2003  tv = tv->next;
2004  }
2005  }
2007  return cnt;
2008 }
2009 
2010 #ifdef DEBUG_VALIDATION
2011 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2012 {
2013  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2015  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2016  tv, s, s->tm_id, m->name);
2017  }
2018 }
2019 
2020 static void TmThreadDumpThreads(void)
2021 {
2023  for (int i = 0; i < TVT_MAX; i++) {
2024  ThreadVars *tv = tv_root[i];
2025  while (tv != NULL) {
2026  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2027  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2028  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2029  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2030  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2031  } else if (tv->stream_pq_local != NULL) {
2032  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2033  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2034  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2035  }
2036  }
2037  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2038  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2039  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2040  }
2041  TmThreadDoDumpSlots(tv);
2042  tv = tv->next;
2043  }
2044  }
2047 }
2048 #endif
2049 
2050 typedef struct Thread_ {
2051  ThreadVars *tv; /**< threadvars structure */
2052  const char *name;
2053  int type;
2054  int in_use; /**< bool to indicate this is in use */
2055 
2056  struct timeval pktts; /**< current packet time of this thread
2057  * (offline mode) */
2058  uint32_t sys_sec_stamp; /**< timestamp in seconds of the real system
2059  * time when the pktts was last updated. */
2061 
2062 typedef struct Threads_ {
2067 
2068 static Threads thread_store = { NULL, 0, 0 };
2069 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2070 
2072 {
2073  SCMutexLock(&thread_store_lock);
2074  for (size_t s = 0; s < thread_store.threads_size; s++) {
2075  Thread *t = &thread_store.threads[s];
2076  if (t == NULL || t->in_use == 0)
2077  continue;
2078 
2079  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2080  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2081  if (t->tv) {
2082  ThreadVars *tv = t->tv;
2083  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2084  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2085  tv, tv->type, tv->name, tv->tmm_flags, flags);
2086  }
2087  }
2088  SCMutexUnlock(&thread_store_lock);
2089 }
2090 
2091 #define STEP 32
2092 /**
2093  * \retval id thread id, or 0 if not found
2094  */
2096 {
2097  SCMutexLock(&thread_store_lock);
2098  if (thread_store.threads == NULL) {
2099  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2100  BUG_ON(thread_store.threads == NULL);
2101  thread_store.threads_size = STEP;
2102  }
2103 
2104  size_t s;
2105  for (s = 0; s < thread_store.threads_size; s++) {
2106  if (thread_store.threads[s].in_use == 0) {
2107  Thread *t = &thread_store.threads[s];
2108  t->name = tv->name;
2109  t->type = type;
2110  t->tv = tv;
2111  t->in_use = 1;
2112 
2113  SCMutexUnlock(&thread_store_lock);
2114  return (int)(s+1);
2115  }
2116  }
2117 
2118  /* if we get here the array is completely filled */
2119  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2120  BUG_ON(newmem == NULL);
2121  thread_store.threads = newmem;
2122  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2123 
2124  Thread *t = &thread_store.threads[thread_store.threads_size];
2125  t->name = tv->name;
2126  t->type = type;
2127  t->tv = tv;
2128  t->in_use = 1;
2129 
2130  s = thread_store.threads_size;
2131  thread_store.threads_size += STEP;
2132 
2133  SCMutexUnlock(&thread_store_lock);
2134  return (int)(s+1);
2135 }
2136 #undef STEP
2137 
2138 void TmThreadsUnregisterThread(const int id)
2139 {
2140  SCMutexLock(&thread_store_lock);
2141  if (id <= 0 || id > (int)thread_store.threads_size) {
2142  SCMutexUnlock(&thread_store_lock);
2143  return;
2144  }
2145 
2146  /* id is one higher than index */
2147  int idx = id - 1;
2148 
2149  /* reset thread_id, which serves as clearing the record */
2150  thread_store.threads[idx].in_use = 0;
2151 
2152  /* check if we have at least one registered thread left */
2153  size_t s;
2154  for (s = 0; s < thread_store.threads_size; s++) {
2155  Thread *t = &thread_store.threads[s];
2156  if (t->in_use == 1) {
2157  goto end;
2158  }
2159  }
2160 
2161  /* if we get here no threads are registered */
2162  SCFree(thread_store.threads);
2163  thread_store.threads = NULL;
2164  thread_store.threads_size = 0;
2165  thread_store.threads_cnt = 0;
2166 
2167 end:
2168  SCMutexUnlock(&thread_store_lock);
2169 }
2170 
2171 void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
2172 {
2173  SCMutexLock(&thread_store_lock);
2174  if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2175  SCMutexUnlock(&thread_store_lock);
2176  return;
2177  }
2178 
2179  int idx = id - 1;
2180  Thread *t = &thread_store.threads[idx];
2181  t->pktts = *ts;
2182  struct timeval systs;
2183  gettimeofday(&systs, NULL);
2184  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2185  SCMutexUnlock(&thread_store_lock);
2186 }
2187 
2189 {
2190  bool ready = true;
2191  SCMutexLock(&thread_store_lock);
2192  for (size_t s = 0; s < thread_store.threads_size; s++) {
2193  Thread *t = &thread_store.threads[s];
2194  if (!t->in_use)
2195  break;
2196  if (t->sys_sec_stamp == 0) {
2197  ready = false;
2198  break;
2199  }
2200  }
2201  SCMutexUnlock(&thread_store_lock);
2202  return ready;
2203 }
2204 
2205 void TmThreadsInitThreadsTimestamp(const struct timeval *ts)
2206 {
2207  struct timeval systs;
2208  gettimeofday(&systs, NULL);
2209  SCMutexLock(&thread_store_lock);
2210  for (size_t s = 0; s < thread_store.threads_size; s++) {
2211  Thread *t = &thread_store.threads[s];
2212  if (!t->in_use)
2213  break;
2214  t->pktts = *ts;
2215  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2216  }
2217  SCMutexUnlock(&thread_store_lock);
2218 }
2219 
2220 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2221 {
2222  struct timeval local, nullts;
2223  memset(&local, 0, sizeof(local));
2224  memset(&nullts, 0, sizeof(nullts));
2225  int set = 0;
2226  size_t s;
2227  struct timeval systs;
2228  gettimeofday(&systs, NULL);
2229 
2230  SCMutexLock(&thread_store_lock);
2231  for (s = 0; s < thread_store.threads_size; s++) {
2232  Thread *t = &thread_store.threads[s];
2233  if (t->in_use == 0)
2234  break;
2235  if (!(timercmp(&t->pktts, &nullts, ==))) {
2236  /* ignore sleeping threads */
2237  if (t->sys_sec_stamp + 1 < (uint32_t)systs.tv_sec)
2238  continue;
2239 
2240  if (!set) {
2241  local = t->pktts;
2242  set = 1;
2243  } else {
2244  if (timercmp(&t->pktts, &local, <)) {
2245  local = t->pktts;
2246  }
2247  }
2248  }
2249  }
2250  SCMutexUnlock(&thread_store_lock);
2251  *ts = local;
2252  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2253 }
2254 
2256 {
2257  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2258  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2259  /* always create at least one thread */
2260  if (thread_max == 0)
2261  thread_max = ncpus * threading_detect_ratio;
2262  if (thread_max < 1)
2263  thread_max = 1;
2264  if (thread_max > 1024) {
2265  SCLogWarning(SC_ERR_RUNMODE, "limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2266  thread_max = 1024;
2267  }
2268  return (uint16_t)thread_max;
2269 }
2270 
2271 static inline void ThreadBreakLoop(ThreadVars *tv)
2272 {
2273  if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
2274  return;
2275  }
2276  /* find the correct slot */
2277  TmSlot *s = tv->tm_slots;
2278  TmModule *tm = TmModuleGetById(s->tm_id);
2279  if (tm->flags & TM_FLAG_RECEIVE_TM) {
2280  /* if the method supports it, BreakLoop. Otherwise we rely on
2281  * the capture method's recv timeout */
2282  if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
2283  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
2284  }
2285  }
2286 }
2287 
2288 /** \brief inject a flow into a threads flow queue
2289  */
2290 void TmThreadsInjectFlowById(Flow *f, const int id)
2291 {
2292  BUG_ON(id <= 0 || id > (int)thread_store.threads_size);
2293 
2294  int idx = id - 1;
2295 
2296  Thread *t = &thread_store.threads[idx];
2297  ThreadVars *tv = t->tv;
2298 
2299  BUG_ON(tv == NULL || tv->flow_queue == NULL);
2300 
2301  FlowEnqueue(tv->flow_queue, f);
2302 
2303  /* wake up listening thread(s) if necessary */
2304  if (tv->inq != NULL) {
2305  SCCondSignal(&tv->inq->pq->cond_q);
2306  } else if (tv->break_loop) {
2307  ThreadBreakLoop(tv);
2308  }
2309 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:80
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:796
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:67
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:76
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:73
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:134
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2188
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1696
tm-threads.h
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:69
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
THV_USE
#define THV_USE
Definition: threadvars.h:35
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:114
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1639
Threads
struct Threads_ Threads
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1083
SC_ERR_TM_THREADS_ERROR
@ SC_ERR_TM_THREADS_ERROR
Definition: util-error.h:166
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:837
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:82
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:315
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1026
Thread_::pktts
struct timeval pktts
Definition: tm-threads.c:2056
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:387
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:379
TmThreadSetGroupName
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1596
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:104
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:47
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:296
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:98
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1762
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:173
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:56
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:44
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:84
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:64
Packet_::flags
uint32_t flags
Definition: decode.h:460
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:75
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:356
ThreadVars_::t
pthread_t t
Definition: threadvars.h:58
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:301
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:83
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:1993
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:103
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:78
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const struct timeval *ts)
Definition: tm-threads.c:2205
StatsSetupPrivate
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1210
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:116
MIN
#define MIN(x, y)
Definition: suricata-common.h:380
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:79
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:73
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1332
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:387
SC_ERR_THREAD_CREATE
@ SC_ERR_THREAD_CREATE
Definition: util-error.h:78
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:121
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:749
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:89
m
SCMutex m
Definition: flow-hash.h:6
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:44
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:141
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:357
SC_ERR_SHUTDOWN
@ SC_ERR_SHUTDOWN
Definition: util-error.h:220
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:83
rww_lock_contention
thread_local uint64_t rww_lock_contention
SC_ERR_RUNMODE
@ SC_ERR_RUNMODE
Definition: util-error.h:219
Threads_::threads
Thread * threads
Definition: tm-threads.c:2063
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:173
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:90
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2091
Thread_::in_use
int in_use
Definition: tm-threads.c:2054
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:37
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(void)
Disable all packet threads.
Definition: tm-threads.c:1464
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:61
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:284
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:82
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2290
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:80
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2220
FQLOCK_LOCK
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:73
TmThreadContinueThreads
void TmThreadContinueThreads()
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1891
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:53
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:45
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1055
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:106
util-signal.h
SC_ERR_THREAD_INIT
@ SC_ERR_THREAD_INIT
Definition: util-error.h:79
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:78
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:760
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1511
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2051
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1776
AffinityGetNextCPU
uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
Definition: util-affinity.c:296
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:57
type
uint8_t type
Definition: decode-icmpv4.h:0
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:1932
Threads_::threads_cnt
int threads_cnt
Definition: tm-threads.c:2065
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:77
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:95
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:206
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:58
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
threading_set_stack_size
uint64_t threading_set_stack_size
Definition: runmodes.c:75
ThreadVars_::perf_public_ctx
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:127
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:998
SCEnter
#define SCEnter(...)
Definition: util-debug.h:298
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
util-affinity.h
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1744
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1614
THV_KILL
#define THV_KILL
Definition: threadvars.h:39
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:89
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:896
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:733
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2138
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2095
threading_set_cpu_affinity
int threading_set_cpu_affinity
Definition: runmodes.c:74
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
SC_ERR_INVALID_ARGUMENT
@ SC_ERR_INVALID_ARGUMENT
Definition: util-error.h:43
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
ThreadVars_::id
int id
Definition: threadvars.h:86
ThreadVars_::type
uint8_t type
Definition: threadvars.h:71
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:289
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:109
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:82
SCReturn
#define SCReturn
Definition: util-debug.h:300
stream.h
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1542
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:159
Packet_
Definition: decode.h:425
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:56
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:132
ThreadVars_::thread_group_name
char * thread_group_name
Definition: threadvars.h:66
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:90
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1512
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:74
TmSlot_
Definition: tm-threads.h:53
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:67
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:68
TmEcode
TmEcode
Definition: tm-threads-common.h:81
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
runmodes.h
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1055
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:31
TmModule_
Definition: tm-modules.h:43
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:253
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:115
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:38
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:36
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:633
StatsPublicThreadContext_::m
SCMutex m
Definition: counters.h:74
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1513
Thread_::sys_sec_stamp
uint32_t sys_sec_stamp
Definition: tm-threads.c:2058
Thread_::type
int type
Definition: tm-threads.c:2053
Thread
struct Thread_ Thread
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:89
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1142
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:74
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:53
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:805
Threads_
Definition: tm-threads.c:2062
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:35
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:34
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:105
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:40
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:135
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:100
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:821
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:222
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:66
SCLogError
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:255
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
Definition: tm-threads.c:2171
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:71
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
FatalError
#define FatalError(x,...)
Definition: util-debug.h:530
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:65
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1116
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:142
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:212
threadvars.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:173
ThreadsAffinityType_
Definition: util-affinity.h:65
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:37
THV_KILL_PKTACQ
#define THV_KILL_PKTACQ
Definition: threadvars.h:46
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:597
Thread_::name
const char * name
Definition: tm-threads.c:2052
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:151
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:50
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:90
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCLogWarning
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:242
PKT_SRC_DETECT_RELOAD_FLUSH
@ PKT_SRC_DETECT_RELOAD_FLUSH
Definition: decode.h:63
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:68
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:87
SC_ERR_FATAL
@ SC_ERR_FATAL
Definition: util-error.h:203
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:92
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:81
SC_ERR_THREAD_NICE_PRIO
@ SC_ERR_THREAD_NICE_PRIO
Definition: util-error.h:77
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:67
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:318
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:83
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:55
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:86
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:111
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:91
SC_ERR_MEM_ALLOC
@ SC_ERR_MEM_ALLOC
Definition: util-error.h:31
FlowQueueNew
FlowQueue * FlowQueueNew()
Definition: flow-queue.c:35
suricata.h
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:437
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:140
Tmq_
Definition: tm-queues.h:29
TmThreadWaitOnThreadRunning
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
Definition: tm-threads.c:1787
StatsSyncCounters
#define StatsSyncCounters(tv)
Definition: counters.h:137
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2071
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1908
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:131
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:82
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:68
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:376
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:106
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax()
Definition: tm-threads.c:2255
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:317
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:90
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:230
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:88
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:41
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2050
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
Threads_::threads_size
size_t threads_size
Definition: tm-threads.c:2064
StatsThreadCleanup
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1309
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:62
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:208
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:70
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:962
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:360
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:161
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:351
FQLOCK_UNLOCK
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:75
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:53