suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2022 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-affinity.h"
39 #include "util-debug.h"
40 #include "util-privs.h"
41 #include "util-cpu.h"
42 #include "util-optimize.h"
43 #include "util-profiling.h"
44 #include "util-signal.h"
45 #include "queue.h"
46 #include "util-validate.h"
47 
48 #ifdef PROFILE_LOCKING
49 thread_local uint64_t mutex_lock_contention;
50 thread_local uint64_t mutex_lock_wait_ticks;
51 thread_local uint64_t mutex_lock_cnt;
52 
53 thread_local uint64_t spin_lock_contention;
54 thread_local uint64_t spin_lock_wait_ticks;
55 thread_local uint64_t spin_lock_cnt;
56 
57 thread_local uint64_t rww_lock_contention;
58 thread_local uint64_t rww_lock_wait_ticks;
59 thread_local uint64_t rww_lock_cnt;
60 
61 thread_local uint64_t rwr_lock_contention;
62 thread_local uint64_t rwr_lock_wait_ticks;
63 thread_local uint64_t rwr_lock_cnt;
64 #endif
65 
66 #ifdef OS_FREEBSD
67 #include <sched.h>
68 #include <sys/param.h>
69 #include <sys/resource.h>
70 #include <sys/cpuset.h>
71 #include <sys/thr.h>
72 #define cpu_set_t cpuset_t
73 #endif /* OS_FREEBSD */
74 
75 /* prototypes */
76 static int SetCPUAffinity(uint16_t cpu);
77 static void TmThreadDeinitMC(ThreadVars *tv);
78 
79 /* root of the threadvars list */
80 ThreadVars *tv_root[TVT_MAX] = { NULL };
81 
82 /* lock to protect tv_root */
84 
85 /**
86  * \brief Check if a thread flag is set.
87  *
88  * \retval 1 flag is set.
89  * \retval 0 flag is not set.
90  */
91 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
92 {
93  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
94 }
95 
96 /**
97  * \brief Set a thread flag.
98  */
99 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
100 {
101  SC_ATOMIC_OR(tv->flags, flag);
102 }
103 
104 /**
105  * \brief Unset a thread flag.
106  */
107 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
108 {
109  SC_ATOMIC_AND(tv->flags, ~flag);
110 }
111 
113  ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
114 {
115  while (decode_pq->top != NULL) {
116  Packet *extra_p = PacketDequeueNoLock(decode_pq);
117  if (unlikely(extra_p == NULL))
118  continue;
119  DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
120 
121  if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
123  }
124  }
126 }
127 
128 /**
129  * \brief Separate run function so we can call it recursively.
130  */
132 {
133  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
134  PACKET_PROFILING_TMM_START(p, s->tm_id);
135  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
136  PACKET_PROFILING_TMM_END(p, s->tm_id);
137  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
138 
139  /* handle error */
140  if (unlikely(r == TM_ECODE_FAILED)) {
141  /* Encountered error. Return packets to packetpool and return */
142  TmThreadsSlotProcessPktFail(tv, s, NULL);
143  return TM_ECODE_FAILED;
144  }
145 
147  return TM_ECODE_FAILED;
148  }
149  }
150 
151  return TM_ECODE_OK;
152 }
153 
154 /** \internal
155  *
156  * \brief Process flow timeout packets
157  *
158  * Process flow timeout pseudo packets. During shutdown this loop
159  * is run until the flow engine kills the thread and the queue is
160  * empty.
161  */
162 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
163 {
164  TmSlot *fw_slot = tv->tm_flowworker;
165  int r = TM_ECODE_OK;
166 
167  if (tv->stream_pq == NULL || fw_slot == NULL) {
168  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
169  return r;
170  }
171 
172  SCLogDebug("flow end loop starting");
173  while (1) {
175  uint32_t len = tv->stream_pq->len;
177  if (len > 0) {
178  while (len--) {
182  if (likely(p)) {
183  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
184  r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
185  if (r == TM_ECODE_FAILED) {
186  break;
187  }
188  }
189  }
190  } else {
192  break;
193  }
194  SleepUsec(1);
195  }
196  }
197  SCLogDebug("flow end loop complete");
199 
200  return r;
201 }
202 
203 /*
204 
205  pcap/nfq
206 
207  pkt read
208  callback
209  process_pkt
210 
211  pfring
212 
213  pkt read
214  process_pkt
215 
216  slot:
217  setup
218 
219  pkt_ack_loop(tv, slot_data)
220 
221  deinit
222 
223  process_pkt:
224  while(s)
225  run s;
226  queue;
227 
228  */
229 
230 static void *TmThreadsSlotPktAcqLoop(void *td)
231 {
232  ThreadVars *tv = (ThreadVars *)td;
233  TmSlot *s = tv->tm_slots;
234  char run = 1;
235  TmEcode r = TM_ECODE_OK;
236  TmSlot *slot = NULL;
237 
239 
240  if (tv->thread_setup_flags != 0)
242 
243  /* Drop the capabilities for this thread */
244  SCDropCaps(tv);
246  PacketPoolInit();
247 
248  /* check if we are setup properly */
249  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
250  SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
251  " PktAcqLoop=%p, tmqh_in=%p,"
252  " tmqh_out=%p",
253  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
255  pthread_exit((void *) -1);
256  return NULL;
257  }
258 
259  for (slot = s; slot != NULL; slot = slot->slot_next) {
260  if (slot->SlotThreadInit != NULL) {
261  void *slot_data = NULL;
262  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
263  if (r != TM_ECODE_OK) {
264  if (r == TM_ECODE_DONE) {
265  EngineDone();
267  goto error;
268  } else {
270  goto error;
271  }
272  }
273  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
274  }
275 
276  /* if the flowworker module is the first, get the threads input queue */
277  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
278  tv->stream_pq = tv->inq->pq;
279  tv->tm_flowworker = slot;
280  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
282  if (tv->flow_queue == NULL) {
284  pthread_exit((void *) -1);
285  return NULL;
286  }
287  /* setup a queue */
288  } else if (slot->tm_id == TMM_FLOWWORKER) {
289  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
290  if (tv->stream_pq_local == NULL)
291  FatalError("failed to alloc PacketQueue");
294  tv->tm_flowworker = slot;
295  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
297  if (tv->flow_queue == NULL) {
299  pthread_exit((void *) -1);
300  return NULL;
301  }
302  }
303  }
304 
306 
308 
309  while(run) {
314  }
315 
316  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
317 
318  if (r == TM_ECODE_FAILED) {
320  run = 0;
321  }
323  run = 0;
324  }
325  if (r == TM_ECODE_DONE) {
326  run = 0;
327  }
328  }
330 
332 
333  /* process all pseudo packets the flow timeout may throw at us */
334  TmThreadTimeoutLoop(tv, s);
335 
338 
340 
341  for (slot = s; slot != NULL; slot = slot->slot_next) {
342  if (slot->SlotThreadExitPrintStats != NULL) {
343  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
344  }
345 
346  if (slot->SlotThreadDeinit != NULL) {
347  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
348  if (r != TM_ECODE_OK) {
350  goto error;
351  }
352  }
353  }
354 
355  tv->stream_pq = NULL;
356  SCLogDebug("%s ending", tv->name);
358  pthread_exit((void *) 0);
359  return NULL;
360 
361 error:
362  tv->stream_pq = NULL;
363  pthread_exit((void *) -1);
364  return NULL;
365 }
366 
367 static void *TmThreadsSlotVar(void *td)
368 {
369  ThreadVars *tv = (ThreadVars *)td;
370  TmSlot *s = (TmSlot *)tv->tm_slots;
371  Packet *p = NULL;
372  char run = 1;
373  TmEcode r = TM_ECODE_OK;
374 
376  PacketPoolInit();//Empty();
377 
379 
380  if (tv->thread_setup_flags != 0)
382 
383  /* Drop the capabilities for this thread */
384  SCDropCaps(tv);
385 
386  /* check if we are setup properly */
387  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
389  pthread_exit((void *) -1);
390  return NULL;
391  }
392 
393  for (; s != NULL; s = s->slot_next) {
394  if (s->SlotThreadInit != NULL) {
395  void *slot_data = NULL;
396  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
397  if (r != TM_ECODE_OK) {
399  goto error;
400  }
401  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
402  }
403 
404  /* special case: we need to access the stream queue
405  * from the flow timeout code */
406 
407  /* if the flowworker module is the first, get the threads input queue */
408  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
409  tv->stream_pq = tv->inq->pq;
410  tv->tm_flowworker = s;
411  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
413  if (tv->flow_queue == NULL) {
415  pthread_exit((void *) -1);
416  return NULL;
417  }
418  /* setup a queue */
419  } else if (s->tm_id == TMM_FLOWWORKER) {
420  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
421  if (tv->stream_pq_local == NULL)
422  FatalError("failed to alloc PacketQueue");
425  tv->tm_flowworker = s;
426  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
428  if (tv->flow_queue == NULL) {
430  pthread_exit((void *) -1);
431  return NULL;
432  }
433  }
434  }
435 
437 
438  // Each 'worker' thread uses this func to process/decode the packet read.
439  // Each decode method is different to receive methods in that they do not
440  // enter infinite loops. They use this as the core loop. As a result, at this
441  // point the worker threads can be considered both initialized and running.
443 
444  s = (TmSlot *)tv->tm_slots;
445 
446  while (run) {
451  }
452 
453  /* input a packet */
454  p = tv->tmqh_in(tv);
455 
456  /* if we didn't get a packet see if we need to do some housekeeping */
457  if (unlikely(p == NULL)) {
458  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
460  if (p != NULL) {
463  }
464  }
465  }
466 
467  if (p != NULL) {
468  /* run the thread module(s) */
469  r = TmThreadsSlotVarRun(tv, p, s);
470  if (r == TM_ECODE_FAILED) {
473  break;
474  }
475 
476  /* output the packet */
477  tv->tmqh_out(tv, p);
478 
479  /* now handle the stream pq packets */
480  TmThreadsHandleInjectedPackets(tv);
481  }
482 
484  run = 0;
485  }
486  } /* while (run) */
488 
491 
493 
494  s = (TmSlot *)tv->tm_slots;
495 
496  for ( ; s != NULL; s = s->slot_next) {
497  if (s->SlotThreadExitPrintStats != NULL) {
498  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
499  }
500 
501  if (s->SlotThreadDeinit != NULL) {
502  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
503  if (r != TM_ECODE_OK) {
505  goto error;
506  }
507  }
508  }
509 
510  SCLogDebug("%s ending", tv->name);
511  tv->stream_pq = NULL;
513  pthread_exit((void *) 0);
514  return NULL;
515 
516 error:
517  tv->stream_pq = NULL;
518  pthread_exit((void *) -1);
519  return NULL;
520 }
521 
522 static void *TmThreadsManagement(void *td)
523 {
524  ThreadVars *tv = (ThreadVars *)td;
525  TmSlot *s = (TmSlot *)tv->tm_slots;
526  TmEcode r = TM_ECODE_OK;
527 
528  BUG_ON(s == NULL);
529 
531 
532  if (tv->thread_setup_flags != 0)
534 
535  /* Drop the capabilities for this thread */
536  SCDropCaps(tv);
537 
538  SCLogDebug("%s starting", tv->name);
539 
540  if (s->SlotThreadInit != NULL) {
541  void *slot_data = NULL;
542  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
543  if (r != TM_ECODE_OK) {
545  pthread_exit((void *) -1);
546  return NULL;
547  }
548  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
549  }
550 
552 
554 
555  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
556  /* handle error */
557  if (r == TM_ECODE_FAILED) {
559  }
560 
563  }
564 
567 
568  if (s->SlotThreadExitPrintStats != NULL) {
569  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
570  }
571 
572  if (s->SlotThreadDeinit != NULL) {
573  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
574  if (r != TM_ECODE_OK) {
576  pthread_exit((void *) -1);
577  return NULL;
578  }
579  }
580 
582  pthread_exit((void *) 0);
583  return NULL;
584 }
585 
586 /**
587  * \brief We set the slot functions.
588  *
589  * \param tv Pointer to the TV to set the slot function for.
590  * \param name Name of the slot variant.
591  * \param fn_p Pointer to a custom slot function. Used only if slot variant
592  * "name" is "custom".
593  *
594  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
595  */
596 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
597 {
598  if (name == NULL) {
599  if (fn_p == NULL) {
600  printf("Both slot name and function pointer can't be NULL inside "
601  "TmThreadSetSlots\n");
602  goto error;
603  } else {
604  name = "custom";
605  }
606  }
607 
608  if (strcmp(name, "varslot") == 0) {
609  tv->tm_func = TmThreadsSlotVar;
610  } else if (strcmp(name, "pktacqloop") == 0) {
611  tv->tm_func = TmThreadsSlotPktAcqLoop;
612  } else if (strcmp(name, "management") == 0) {
613  tv->tm_func = TmThreadsManagement;
614  } else if (strcmp(name, "command") == 0) {
615  tv->tm_func = TmThreadsManagement;
616  } else if (strcmp(name, "custom") == 0) {
617  if (fn_p == NULL)
618  goto error;
619  tv->tm_func = fn_p;
620  } else {
621  printf("Error: Slot \"%s\" not supported\n", name);
622  goto error;
623  }
624 
625  return TM_ECODE_OK;
626 
627 error:
628  return TM_ECODE_FAILED;
629 }
630 
631 /**
632  * \brief Appends a new entry to the slots.
633  *
634  * \param tv TV the slot is attached to.
635  * \param tm TM to append.
636  * \param data Data to be passed on to the slot init function.
637  *
638  * \retval The allocated TmSlot or NULL if there is an error
639  */
640 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
641 {
642  TmSlot *slot = SCMalloc(sizeof(TmSlot));
643  if (unlikely(slot == NULL))
644  return;
645  memset(slot, 0, sizeof(TmSlot));
646  SC_ATOMIC_INITPTR(slot->slot_data);
647  slot->SlotThreadInit = tm->ThreadInit;
648  slot->slot_initdata = data;
649  if (tm->Func) {
650  slot->SlotFunc = tm->Func;
651  } else if (tm->PktAcqLoop) {
652  slot->PktAcqLoop = tm->PktAcqLoop;
653  if (tm->PktAcqBreakLoop) {
654  tv->break_loop = true;
655  }
656  } else if (tm->Management) {
657  slot->Management = tm->Management;
658  }
660  slot->SlotThreadDeinit = tm->ThreadDeinit;
661  /* we don't have to check for the return value "-1". We wouldn't have
662  * received a TM as arg, if it didn't exist */
663  slot->tm_id = TmModuleGetIDForTM(tm);
664 
665  tv->tmm_flags |= tm->flags;
666  tv->cap_flags |= tm->cap_flags;
667 
668  if (tv->tm_slots == NULL) {
669  tv->tm_slots = slot;
670  } else {
671  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
672 
673  /* get the last slot */
674  for ( ; a != NULL; a = a->slot_next) {
675  b = a;
676  }
677  /* append the new slot */
678  if (b != NULL) {
679  b->slot_next = slot;
680  }
681  }
682  return;
683 }
684 
685 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
686 static int SetCPUAffinitySet(cpu_set_t *cs)
687 {
688 #if defined OS_FREEBSD
689  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
690  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
691 #elif OS_DARWIN
692  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
693  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
694 #else
695  pid_t tid = syscall(SYS_gettid);
696  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
697 #endif /* OS_FREEBSD */
698 
699  if (r != 0) {
700  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
701  strerror(errno));
702  return -1;
703  }
704 
705  return 0;
706 }
707 #endif
708 
709 
710 /**
711  * \brief Set the thread affinity on the calling thread.
712  *
713  * \param cpuid Id of the core/cpu to setup the affinity.
714  *
715  * \retval 0 If all goes well; -1 if something is wrong.
716  */
717 static int SetCPUAffinity(uint16_t cpuid)
718 {
719 #if defined __OpenBSD__ || defined sun
720  return 0;
721 #else
722  int cpu = (int)cpuid;
723 
724 #if defined OS_WIN32 || defined __CYGWIN__
725  DWORD cs = 1 << cpu;
726 
727  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
728  if (r != 0) {
729  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
730  strerror(errno));
731  return -1;
732  }
733  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
734  SCGetThreadIdLong(), cpu);
735 
736  return 0;
737 
738 #else
739  cpu_set_t cs;
740  memset(&cs, 0, sizeof(cs));
741 
742  CPU_ZERO(&cs);
743  CPU_SET(cpu, &cs);
744  return SetCPUAffinitySet(&cs);
745 #endif /* windows */
746 #endif /* not supported */
747 }
748 
749 
750 /**
751  * \brief Set the thread options (thread priority).
752  *
753  * \param tv Pointer to the ThreadVars to setup the thread priority.
754  *
755  * \retval TM_ECODE_OK.
756  */
758 {
760  tv->thread_priority = prio;
761 
762  return TM_ECODE_OK;
763 }
764 
765 /**
766  * \brief Adjusting nice value for threads.
767  */
769 {
770  SCEnter();
771 #ifndef __CYGWIN__
772 #ifdef OS_WIN32
773  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
774  SCLogError("Error setting priority for "
775  "thread %s: %s",
776  tv->name, strerror(errno));
777  } else {
778  SCLogDebug("Priority set to %"PRId32" for thread %s",
780  }
781 #else
782  int ret = nice(tv->thread_priority);
783  if (ret == -1) {
784  SCLogError("Error setting nice value %d "
785  "for thread %s: %s",
786  tv->thread_priority, tv->name, strerror(errno));
787  } else {
788  SCLogDebug("Nice value set to %"PRId32" for thread %s",
790  }
791 #endif /* OS_WIN32 */
792 #endif
793  SCReturn;
794 }
795 
796 
797 /**
798  * \brief Set the thread options (cpu affinity).
799  *
800  * \param tv pointer to the ThreadVars to setup the affinity.
801  * \param cpu cpu on which affinity is set.
802  *
803  * \retval TM_ECODE_OK
804  */
806 {
808  tv->cpu_affinity = cpu;
809 
810  return TM_ECODE_OK;
811 }
812 
813 
815 {
817  return TM_ECODE_OK;
818 
819  if (type > MAX_CPU_SET) {
820  SCLogError("invalid cpu type family");
821  return TM_ECODE_FAILED;
822  }
823 
825  tv->cpu_affinity = type;
826 
827  return TM_ECODE_OK;
828 }
829 
831 {
832  if (type >= MAX_CPU_SET) {
833  SCLogError("invalid cpu type family");
834  return 0;
835  }
836 
838 }
839 
840 /**
841  * \brief Set the thread options (cpu affinitythread).
842  * Priority should be already set by pthread_create.
843  *
844  * \param tv pointer to the ThreadVars of the calling thread.
845  */
847 {
849  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
850  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
852  SetCPUAffinity(tv->cpu_affinity);
853  }
854 
855 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
860  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
861  uint16_t cpu = AffinityGetNextCPU(taf);
862  SetCPUAffinity(cpu);
863  /* If CPU is in a set overwrite the default thread prio */
864  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
866  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
868  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
870  } else {
871  tv->thread_priority = taf->prio;
872  }
873  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
874  "%d, thread id %lu", tv->thread_priority,
875  tv->name, cpu, SCGetThreadIdLong());
876  } else {
877  SetCPUAffinitySet(&taf->cpu_set);
878  tv->thread_priority = taf->prio;
879  SCLogPerf("Setting prio %d for thread \"%s\", "
880  "thread id %lu", tv->thread_priority,
882  }
884  }
885 #endif
886 
887  return TM_ECODE_OK;
888 }
889 
890 /**
891  * \brief Creates and returns the TV instance for a new thread.
892  *
893  * \param name Name of this TV instance
894  * \param inq_name Incoming queue name
895  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
896  * \param outq_name Outgoing queue name
897  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
898  * \param slots String representation for the slot function to be used
899  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
900  * \param mucond Flag to indicate whether to initialize the condition
901  * and the mutex variables for this newly created TV.
902  *
903  * \retval the newly created TV instance, or NULL on error
904  */
905 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
906  const char *outq_name, const char *outqh_name, const char *slots,
907  void * (*fn_p)(void *), int mucond)
908 {
909  ThreadVars *tv = NULL;
910  Tmq *tmq = NULL;
911  Tmqh *tmqh = NULL;
912 
913  SCLogDebug("creating thread \"%s\"...", name);
914 
915  /* XXX create separate function for this: allocate a thread container */
916  tv = SCMalloc(sizeof(ThreadVars));
917  if (unlikely(tv == NULL))
918  goto error;
919  memset(tv, 0, sizeof(ThreadVars));
920 
921  SC_ATOMIC_INIT(tv->flags);
922  SCMutexInit(&tv->perf_public_ctx.m, NULL);
923 
924  strlcpy(tv->name, name, sizeof(tv->name));
925 
926  /* default state for every newly created thread */
929 
930  /* set the incoming queue */
931  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
932  SCLogDebug("inq_name \"%s\"", inq_name);
933 
934  tmq = TmqGetQueueByName(inq_name);
935  if (tmq == NULL) {
936  tmq = TmqCreateQueue(inq_name);
937  if (tmq == NULL)
938  goto error;
939  }
940  SCLogDebug("tmq %p", tmq);
941 
942  tv->inq = tmq;
943  tv->inq->reader_cnt++;
944  SCLogDebug("tv->inq %p", tv->inq);
945  }
946  if (inqh_name != NULL) {
947  SCLogDebug("inqh_name \"%s\"", inqh_name);
948 
949  int id = TmqhNameToID(inqh_name);
950  if (id <= 0) {
951  goto error;
952  }
953  tmqh = TmqhGetQueueHandlerByName(inqh_name);
954  if (tmqh == NULL)
955  goto error;
956 
957  tv->tmqh_in = tmqh->InHandler;
958  tv->inq_id = (uint8_t)id;
959  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
960  }
961 
962  /* set the outgoing queue */
963  if (outqh_name != NULL) {
964  SCLogDebug("outqh_name \"%s\"", outqh_name);
965 
966  int id = TmqhNameToID(outqh_name);
967  if (id <= 0) {
968  goto error;
969  }
970 
971  tmqh = TmqhGetQueueHandlerByName(outqh_name);
972  if (tmqh == NULL)
973  goto error;
974 
975  tv->tmqh_out = tmqh->OutHandler;
976  tv->outq_id = (uint8_t)id;
977 
978  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
979  SCLogDebug("outq_name \"%s\"", outq_name);
980 
981  if (tmqh->OutHandlerCtxSetup != NULL) {
982  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
983  if (tv->outctx == NULL)
984  goto error;
985  tv->outq = NULL;
986  } else {
987  tmq = TmqGetQueueByName(outq_name);
988  if (tmq == NULL) {
989  tmq = TmqCreateQueue(outq_name);
990  if (tmq == NULL)
991  goto error;
992  }
993  SCLogDebug("tmq %p", tmq);
994 
995  tv->outq = tmq;
996  tv->outctx = NULL;
997  tv->outq->writer_cnt++;
998  }
999  }
1000  }
1001 
1002  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1003  goto error;
1004  }
1005 
1006  if (mucond != 0)
1007  TmThreadInitMC(tv);
1008 
1009  return tv;
1010 
1011 error:
1012  SCLogError("failed to setup a thread");
1013 
1014  if (tv != NULL)
1015  SCFree(tv);
1016  return NULL;
1017 }
1018 
1019 /**
1020  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1021  * This function doesn't support custom slots, and hence shouldn't be
1022  * supplied \"custom\" as its slot type. All PPT threads are created
1023  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1024  * conditional variables are not used to kill the thread.
1025  *
1026  * \param name Name of this TV instance
1027  * \param inq_name Incoming queue name
1028  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1029  * \param outq_name Outgoing queue name
1030  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1031  * \param slots String representation for the slot function to be used
1032  *
1033  * \retval the newly created TV instance, or NULL on error
1034  */
1035 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1036  const char *inqh_name, const char *outq_name,
1037  const char *outqh_name, const char *slots)
1038 {
1039  ThreadVars *tv = NULL;
1040 
1041  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1042  slots, NULL, 0);
1043 
1044  if (tv != NULL) {
1045  tv->type = TVT_PPT;
1047  }
1048 
1049  return tv;
1050 }
1051 
1052 /**
1053  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1054  * This function supports only custom slot functions and hence a
1055  * function pointer should be sent as an argument.
1056  *
1057  * \param name Name of this TV instance
1058  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1059  * \param mucond Flag to indicate whether to initialize the condition
1060  * and the mutex variables for this newly created TV.
1061  *
1062  * \retval the newly created TV instance, or NULL on error
1063  */
1064 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1065  int mucond)
1066 {
1067  ThreadVars *tv = NULL;
1068 
1069  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1070 
1071  if (tv != NULL) {
1072  tv->type = TVT_MGMT;
1075  }
1076 
1077  return tv;
1078 }
1079 
1080 /**
1081  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1082  * This function supports only custom slot functions and hence a
1083  * function pointer should be sent as an argument.
1084  *
1085  * \param name Name of this TV instance
1086  * \param module Name of TmModule with MANAGEMENT flag set.
1087  * \param mucond Flag to indicate whether to initialize the condition
1088  * and the mutex variables for this newly created TV.
1089  *
1090  * \retval the newly created TV instance, or NULL on error
1091  */
1092 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1093  int mucond)
1094 {
1095  ThreadVars *tv = NULL;
1096 
1097  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1098 
1099  if (tv != NULL) {
1100  tv->type = TVT_MGMT;
1103 
1104  TmModule *m = TmModuleGetByName(module);
1105  if (m) {
1106  TmSlotSetFuncAppend(tv, m, NULL);
1107  }
1108  }
1109 
1110  return tv;
1111 }
1112 
1113 /**
1114  * \brief Creates and returns the TV instance for a Command thread (CMD).
1115  * This function supports only custom slot functions and hence a
1116  * function pointer should be sent as an argument.
1117  *
1118  * \param name Name of this TV instance
1119  * \param module Name of TmModule with COMMAND flag set.
1120  * \param mucond Flag to indicate whether to initialize the condition
1121  * and the mutex variables for this newly created TV.
1122  *
1123  * \retval the newly created TV instance, or NULL on error
1124  */
1125 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1126  int mucond)
1127 {
1128  ThreadVars *tv = NULL;
1129 
1130  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1131 
1132  if (tv != NULL) {
1133  tv->type = TVT_CMD;
1136 
1137  TmModule *m = TmModuleGetByName(module);
1138  if (m) {
1139  TmSlotSetFuncAppend(tv, m, NULL);
1140  }
1141  }
1142 
1143  return tv;
1144 }
1145 
1146 /**
1147  * \brief Appends this TV to tv_root based on its type
1148  *
1149  * \param type holds the type this TV belongs to.
1150  */
1152 {
1154 
1155  if (tv_root[type] == NULL) {
1156  tv_root[type] = tv;
1157  tv->next = NULL;
1158 
1160 
1161  return;
1162  }
1163 
1164  ThreadVars *t = tv_root[type];
1165 
1166  while (t) {
1167  if (t->next == NULL) {
1168  t->next = tv;
1169  tv->next = NULL;
1170  break;
1171  }
1172 
1173  t = t->next;
1174  }
1175 
1177 
1178  return;
1179 }
1180 
1181 static bool ThreadStillHasPackets(ThreadVars *tv)
1182 {
1183  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1184  /* we wait till we dry out all the inq packets, before we
1185  * kill this thread. Do note that you should have disabled
1186  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1187  PacketQueue *q = tv->inq->pq;
1188  SCMutexLock(&q->mutex_q);
1189  uint32_t len = q->len;
1190  SCMutexUnlock(&q->mutex_q);
1191  if (len != 0) {
1192  return true;
1193  }
1194  }
1195 
1196  if (tv->stream_pq != NULL) {
1198  uint32_t len = tv->stream_pq->len;
1200 
1201  if (len != 0) {
1202  return true;
1203  }
1204  }
1205  return false;
1206 }
1207 
1208 /**
1209  * \brief Kill a thread.
1210  *
1211  * \param tv A ThreadVars instance corresponding to the thread that has to be
1212  * killed.
1213  *
1214  * \retval r 1 killed successfully
1215  * 0 not yet ready, needs another look
1216  */
1217 static int TmThreadKillThread(ThreadVars *tv)
1218 {
1219  BUG_ON(tv == NULL);
1220 
1221  /* kill only once :) */
1222  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1223  return 1;
1224  }
1225 
1226  /* set the thread flag informing the thread that it needs to be
1227  * terminated */
1230 
1231  /* to be sure, signal more */
1232  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1233  if (tv->inq_id != TMQH_NOT_SET) {
1235  if (qh != NULL && qh->InShutdownHandler != NULL) {
1236  qh->InShutdownHandler(tv);
1237  }
1238  }
1239  if (tv->inq != NULL) {
1240  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1241  SCCondSignal(&tv->inq->pq->cond_q);
1242  }
1243  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1244  }
1245 
1246  if (tv->ctrl_cond != NULL ) {
1247  pthread_cond_broadcast(tv->ctrl_cond);
1248  }
1249  return 0;
1250  }
1251 
1252  if (tv->outctx != NULL) {
1253  if (tv->outq_id != TMQH_NOT_SET) {
1255  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1256  qh->OutHandlerCtxFree(tv->outctx);
1257  tv->outctx = NULL;
1258  }
1259  }
1260  }
1261 
1262  /* join it and flag it as dead */
1263  pthread_join(tv->t, NULL);
1264  SCLogDebug("thread %s stopped", tv->name);
1266  return 1;
1267 }
1268 
1269 static bool ThreadBusy(ThreadVars *tv)
1270 {
1271  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
1272  TmModule *tm = TmModuleGetById(s->tm_id);
1273  if (tm && tm->ThreadBusy != NULL) {
1274  if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
1275  return true;
1276  }
1277  }
1278  return false;
1279 }
1280 
1281 /** \internal
1282  *
1283  * \brief make sure that all packet threads are done processing their
1284  * in-flight packets, including 'injected' flow packets.
1285  */
1286 static void TmThreadDrainPacketThreads(void)
1287 {
1288  ThreadVars *tv = NULL;
1289  struct timeval start_ts;
1290  struct timeval cur_ts;
1291  gettimeofday(&start_ts, NULL);
1292 
1293 again:
1294  gettimeofday(&cur_ts, NULL);
1295  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1296  SCLogWarning("unable to get all packet threads "
1297  "to process their packets in time");
1298  return;
1299  }
1300 
1302 
1303  /* all receive threads are part of packet processing threads */
1304  tv = tv_root[TVT_PPT];
1305  while (tv) {
1306  if (ThreadStillHasPackets(tv)) {
1307  /* we wait till we dry out all the inq packets, before we
1308  * kill this thread. Do note that you should have disabled
1309  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1311 
1312  /* sleep outside lock */
1313  SleepMsec(1);
1314  goto again;
1315  }
1316  if (ThreadBusy(tv)) {
1318 
1319  Packet *p = PacketGetFromAlloc();
1320  if (p != NULL) {
1323  PacketQueue *q = tv->stream_pq;
1324  SCMutexLock(&q->mutex_q);
1325  PacketEnqueue(q, p);
1326  SCCondSignal(&q->cond_q);
1327  SCMutexUnlock(&q->mutex_q);
1328  }
1329 
1330  /* don't sleep while holding a lock */
1331  SleepMsec(1);
1332  goto again;
1333  }
1334  tv = tv->next;
1335  }
1336 
1338  return;
1339 }
1340 
1341 /**
1342  * \brief Disable all threads having the specified TMs.
1343  *
1344  * Breaks out of the packet acquisition loop, and bumps
1345  * into the 'flow loop', where it will process packets
1346  * from the flow engine's shutdown handling.
1347  */
1349 {
1350  ThreadVars *tv = NULL;
1351  struct timeval start_ts;
1352  struct timeval cur_ts;
1353  gettimeofday(&start_ts, NULL);
1354 
1355 again:
1356  gettimeofday(&cur_ts, NULL);
1357  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1358  FatalError("Engine unable to disable detect "
1359  "thread - \"%s\". Killing engine",
1360  tv->name);
1361  }
1362 
1364 
1365  /* all receive threads are part of packet processing threads */
1366  tv = tv_root[TVT_PPT];
1367 
1368  /* we do have to keep in mind that TVs are arranged in the order
1369  * right from receive to log. The moment we fail to find a
1370  * receive TM amongst the slots in a tv, it indicates we are done
1371  * with all receive threads */
1372  while (tv) {
1373  int disable = 0;
1374  TmModule *tm = NULL;
1375  /* obtain the slots for this TV */
1376  TmSlot *slots = tv->tm_slots;
1377  while (slots != NULL) {
1378  tm = TmModuleGetById(slots->tm_id);
1379 
1380  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1381  disable = 1;
1382  break;
1383  }
1384 
1385  slots = slots->slot_next;
1386  continue;
1387  }
1388 
1389  if (disable) {
1390  if (ThreadStillHasPackets(tv)) {
1391  /* we wait till we dry out all the inq packets, before we
1392  * kill this thread. Do note that you should have disabled
1393  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1395  /* don't sleep while holding a lock */
1396  SleepMsec(1);
1397  goto again;
1398  }
1399 
1400  if (ThreadBusy(tv)) {
1402 
1403  Packet *p = PacketGetFromAlloc();
1404  if (p != NULL) {
1407  PacketQueue *q = tv->stream_pq;
1408  SCMutexLock(&q->mutex_q);
1409  PacketEnqueue(q, p);
1410  SCCondSignal(&q->cond_q);
1411  SCMutexUnlock(&q->mutex_q);
1412  }
1413 
1414  /* don't sleep while holding a lock */
1415  SleepMsec(1);
1416  goto again;
1417  }
1418 
1419  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1420  if (tm && tm->PktAcqBreakLoop != NULL) {
1421  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1422  }
1424 
1425  if (tv->inq != NULL) {
1426  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1427  SCCondSignal(&tv->inq->pq->cond_q);
1428  }
1429  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1430  }
1431 
1432  /* wait for it to enter the 'flow loop' stage */
1433  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1435 
1436  SleepMsec(1);
1437  goto again;
1438  }
1439  }
1440 
1441  tv = tv->next;
1442  }
1443 
1445 
1446  /* finally wait for all packet threads to have
1447  * processed all of their 'live' packets so we
1448  * don't process the last live packets together
1449  * with FFR packets */
1450  TmThreadDrainPacketThreads();
1451  return;
1452 }
1453 
1454 #ifdef DEBUG_VALIDATION
1455 static void TmThreadDumpThreads(void);
1456 #endif
1457 
1458 static void TmThreadDebugValidateNoMorePackets(void)
1459 {
1460 #ifdef DEBUG_VALIDATION
1462  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1463  if (ThreadStillHasPackets(tv)) {
1465  TmThreadDumpThreads();
1466  abort();
1467  }
1468  }
1470 #endif
1471 }
1472 
1473 /**
1474  * \brief Disable all packet threads
1475  */
1477 {
1478  struct timeval start_ts;
1479  struct timeval cur_ts;
1480 
1481  /* first drain all packet threads of their packets */
1482  TmThreadDrainPacketThreads();
1483 
1484  /* since all the threads possibly able to produce more packets
1485  * are now gone or inactive, we should see no packets anywhere
1486  * anymore. */
1487  TmThreadDebugValidateNoMorePackets();
1488 
1489  gettimeofday(&start_ts, NULL);
1490 again:
1491  gettimeofday(&cur_ts, NULL);
1492  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1493  FatalError("Engine unable to disable packet "
1494  "threads. Killing engine");
1495  }
1496 
1497  /* loop through the packet threads and kill them */
1499  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1501 
1502  /* separate worker threads (autofp) will still wait at their
1503  * input queues. So nudge them here so they will observe the
1504  * THV_KILL flag. */
1505  if (tv->inq != NULL) {
1506  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1507  SCCondSignal(&tv->inq->pq->cond_q);
1508  }
1509  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1510  }
1511 
1514 
1515  SleepMsec(1);
1516  goto again;
1517  }
1518  }
1520  return;
1521 }
1522 
1523 #define MIN_WAIT_TIME 100
1524 #define MAX_WAIT_TIME 999999
1526 {
1527  ThreadVars *tv = NULL;
1528  unsigned int sleep_usec = MIN_WAIT_TIME;
1529 
1530  BUG_ON((family < 0) || (family >= TVT_MAX));
1531 
1532 again:
1534  tv = tv_root[family];
1535 
1536  while (tv) {
1537  int r = TmThreadKillThread(tv);
1538  if (r == 0) {
1540  SleepUsec(sleep_usec);
1541  sleep_usec *= 2; /* slowly back off */
1542  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1543  goto again;
1544  }
1545  sleep_usec = MIN_WAIT_TIME; /* reset */
1546 
1547  tv = tv->next;
1548  }
1550 }
1551 #undef MIN_WAIT_TIME
1552 #undef MAX_WAIT_TIME
1553 
1555 {
1556  int i = 0;
1557 
1558  for (i = 0; i < TVT_MAX; i++) {
1560  }
1561 
1562  return;
1563 }
1564 
1565 static void TmThreadFree(ThreadVars *tv)
1566 {
1567  TmSlot *s;
1568  TmSlot *ps;
1569  if (tv == NULL)
1570  return;
1571 
1572  SCLogDebug("Freeing thread '%s'.", tv->name);
1573 
1574  if (tv->flow_queue) {
1575  BUG_ON(tv->flow_queue->qlen != 0);
1576  SCFree(tv->flow_queue);
1577  }
1578 
1580 
1581  TmThreadDeinitMC(tv);
1582 
1583  if (tv->thread_group_name) {
1585  }
1586 
1587  if (tv->printable_name) {
1589  }
1590 
1591  if (tv->stream_pq_local) {
1595  }
1596 
1597  s = (TmSlot *)tv->tm_slots;
1598  while (s) {
1599  ps = s;
1600  s = s->slot_next;
1601  SCFree(ps);
1602  }
1603 
1605  SCFree(tv);
1606 }
1607 
1608 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1609 {
1610  char *thread_group_name = NULL;
1611 
1612  if (name == NULL)
1613  return;
1614 
1615  if (tv == NULL)
1616  return;
1617 
1618  thread_group_name = SCStrdup(name);
1619  if (unlikely(thread_group_name == NULL)) {
1620  SCLogError("error allocating memory");
1621  return;
1622  }
1623  tv->thread_group_name = thread_group_name;
1624 }
1625 
1627 {
1628  ThreadVars *tv = NULL;
1629  ThreadVars *ptv = NULL;
1630 
1631  if ((family < 0) || (family >= TVT_MAX))
1632  return;
1633 
1635  tv = tv_root[family];
1636 
1637  while (tv) {
1638  ptv = tv;
1639  tv = tv->next;
1640  TmThreadFree(ptv);
1641  }
1642  tv_root[family] = NULL;
1644 }
1645 
1646 /**
1647  * \brief Spawns a thread associated with the ThreadVars instance tv
1648  *
1649  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1650  */
1652 {
1653  pthread_attr_t attr;
1654  if (tv->tm_func == NULL) {
1655  FatalError("No thread function set");
1656  }
1657 
1658  /* Initialize and set thread detached attribute */
1659  pthread_attr_init(&attr);
1660 
1661  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1662 
1663  /* Adjust thread stack size if configured */
1665  SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1666  if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1667  FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
1669  }
1670  }
1671 
1672  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1673  if (rc) {
1674  FatalError("Unable to create thread with pthread_create(): retval %d: %s", rc,
1675  strerror(errno));
1676  }
1677 
1678 #if DEBUG && HAVE_PTHREAD_GETATTR_NP
1680  if (pthread_getattr_np(tv->t, &attr) == 0) {
1681  size_t stack_size;
1682  void *stack_addr;
1683  pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1684  SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1685  } else {
1686  SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1687  "pthread_getattr_np() is %" PRId32,
1688  rc);
1689  }
1690  }
1691 #endif
1692 
1694 
1695  TmThreadAppend(tv, tv->type);
1696  return TM_ECODE_OK;
1697 }
1698 
1699 /**
1700  * \brief Initializes the mutex and condition variables for this TV
1701  *
1702  * It can be used by a thread to control a wait loop that can also be
1703  * influenced by other threads.
1704  *
1705  * \param tv Pointer to a TV instance
1706  */
1708 {
1709  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1710  FatalError("Fatal error encountered in TmThreadInitMC. "
1711  "Exiting...");
1712  }
1713 
1714  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1715  printf("Error initializing the tv->m mutex\n");
1716  exit(EXIT_FAILURE);
1717  }
1718 
1719  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1720  FatalError("Fatal error encountered in TmThreadInitMC. "
1721  "Exiting...");
1722  }
1723 
1724  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1725  FatalError("Error initializing the tv->cond condition "
1726  "variable");
1727  }
1728 
1729  return;
1730 }
1731 
1732 static void TmThreadDeinitMC(ThreadVars *tv)
1733 {
1734  if (tv->ctrl_mutex) {
1736  SCFree(tv->ctrl_mutex);
1737  }
1738  if (tv->ctrl_cond) {
1740  SCFree(tv->ctrl_cond);
1741  }
1742  return;
1743 }
1744 
1745 /**
1746  * \brief Tests if the thread represented in the arg has been unpaused or not.
1747  *
1748  * The function would return if the thread tv has been unpaused or if the
1749  * kill flag for the thread has been set.
1750  *
1751  * \param tv Pointer to the TV instance.
1752  */
1754 {
1755  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1756  SleepUsec(100);
1757 
1759  break;
1760  }
1761 
1762  return;
1763 }
1764 
1765 /**
1766  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1767  * the kill flag has been set or not on the thread.
1768  *
1769  * \param tv Pointer to the TV instance.
1770  */
1772 {
1773  while (!TmThreadsCheckFlag(tv, flags)) {
1774  SleepUsec(100);
1775  }
1776 
1777  return;
1778 }
1779 
1780 /**
1781  * \brief Unpauses a thread
1782  *
1783  * \param tv Pointer to a TV instance that has to be unpaused
1784  */
1786 {
1788  return;
1789 }
1790 
1791 /**
1792  * \brief Waits for all threads to be in a running state
1793  *
1794  * \retval TM_ECODE_OK if all are running or error if a thread failed
1795  */
1797 {
1798  uint16_t RX_num = 0;
1799  uint16_t W_num = 0;
1800  uint16_t FM_num = 0;
1801  uint16_t FR_num = 0;
1802  uint16_t TX_num = 0;
1803 
1804  struct timeval start_ts;
1805  struct timeval cur_ts;
1806  gettimeofday(&start_ts, NULL);
1807 
1808 again:
1810  for (int i = 0; i < TVT_MAX; i++) {
1811  ThreadVars *tv = tv_root[i];
1812  while (tv != NULL) {
1815 
1816  SCLogError("thread \"%s\" failed to "
1817  "start: flags %04x",
1818  tv->name, SC_ATOMIC_GET(tv->flags));
1819  return TM_ECODE_FAILED;
1820  }
1821 
1824 
1825  /* 60 seconds provided for the thread to transition from
1826  * THV_INIT_DONE to THV_RUNNING */
1827  gettimeofday(&cur_ts, NULL);
1828  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1829  SCLogError("thread \"%s\" failed to "
1830  "start in time: flags %04x",
1831  tv->name, SC_ATOMIC_GET(tv->flags));
1832  return TM_ECODE_FAILED;
1833  }
1834 
1835  /* sleep a little to give the thread some
1836  * time to start running */
1837  SleepUsec(100);
1838  goto again;
1839  }
1840  tv = tv->next;
1841  }
1842  }
1843  for (int i = 0; i < TVT_MAX; i++) {
1844  for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
1845  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1846  RX_num++;
1847  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1848  W_num++;
1849  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1850  TX_num++;
1851  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1852  FM_num++;
1853  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1854  FR_num++;
1855  }
1856  }
1858 
1859  /* Construct a welcome string displaying
1860  * initialized thread types and counts */
1861  uint16_t app_len = 32;
1862  uint16_t buf_len = 256;
1863 
1864  char append_str[app_len];
1865  char thread_counts[buf_len];
1866 
1867  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1868  if (RX_num > 0) {
1869  snprintf(append_str, app_len, "RX: %u ", RX_num);
1870  strlcat(thread_counts, append_str, buf_len);
1871  }
1872  if (W_num > 0) {
1873  snprintf(append_str, app_len, "W: %u ", W_num);
1874  strlcat(thread_counts, append_str, buf_len);
1875  }
1876  if (TX_num > 0) {
1877  snprintf(append_str, app_len, "TX: %u ", TX_num);
1878  strlcat(thread_counts, append_str, buf_len);
1879  }
1880  if (FM_num > 0) {
1881  snprintf(append_str, app_len, "FM: %u ", FM_num);
1882  strlcat(thread_counts, append_str, buf_len);
1883  }
1884  if (FR_num > 0) {
1885  snprintf(append_str, app_len, "FR: %u ", FR_num);
1886  strlcat(thread_counts, append_str, buf_len);
1887  }
1888  snprintf(append_str, app_len, " Engine started.");
1889  strlcat(thread_counts, append_str, buf_len);
1890  SCLogNotice("%s", thread_counts);
1891 
1892  return TM_ECODE_OK;
1893 }
1894 
1895 /**
1896  * \brief Unpauses all threads present in tv_root
1897  */
1899 {
1901  for (int i = 0; i < TVT_MAX; i++) {
1902  ThreadVars *tv = tv_root[i];
1903  while (tv != NULL) {
1905  tv = tv->next;
1906  }
1907  }
1909  return;
1910 }
1911 
1912 /**
1913  * \brief Used to check the thread for certain conditions of failure.
1914  */
1916 {
1918  for (int i = 0; i < TVT_MAX; i++) {
1919  ThreadVars *tv = tv_root[i];
1920  while (tv) {
1922  FatalError("thread %s failed", tv->name);
1923  }
1924  tv = tv->next;
1925  }
1926  }
1928  return;
1929 }
1930 
1931 /**
1932  * \brief Used to check if all threads have finished their initialization. On
1933  * finding an un-initialized thread, it waits till that thread completes
1934  * its initialization, before proceeding to the next thread.
1935  *
1936  * \retval TM_ECODE_OK all initialized properly
1937  * \retval TM_ECODE_FAILED failure
1938  */
1940 {
1941  struct timeval start_ts;
1942  struct timeval cur_ts;
1943  gettimeofday(&start_ts, NULL);
1944 
1945 again:
1947  for (int i = 0; i < TVT_MAX; i++) {
1948  ThreadVars *tv = tv_root[i];
1949  while (tv != NULL) {
1952 
1953  SCLogError("thread \"%s\" failed to "
1954  "initialize: flags %04x",
1955  tv->name, SC_ATOMIC_GET(tv->flags));
1956  return TM_ECODE_FAILED;
1957  }
1958 
1959  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
1961 
1962  gettimeofday(&cur_ts, NULL);
1963  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
1964  SCLogError("thread \"%s\" failed to "
1965  "initialize in time: flags %04x",
1966  tv->name, SC_ATOMIC_GET(tv->flags));
1967  return TM_ECODE_FAILED;
1968  }
1969 
1970  /* sleep a little to give the thread some
1971  * time to finish initialization */
1972  SleepUsec(100);
1973  goto again;
1974  }
1975 
1978  SCLogError("thread \"%s\" failed to "
1979  "initialize.",
1980  tv->name);
1981  return TM_ECODE_FAILED;
1982  }
1985  SCLogError("thread \"%s\" closed on "
1986  "initialization.",
1987  tv->name);
1988  return TM_ECODE_FAILED;
1989  }
1990 
1991  tv = tv->next;
1992  }
1993  }
1995 
1996  return TM_ECODE_OK;
1997 }
1998 
1999 /**
2000  * \brief returns a count of all the threads that match the flag
2001  */
2003 {
2004  uint32_t cnt = 0;
2006  for (int i = 0; i < TVT_MAX; i++) {
2007  ThreadVars *tv = tv_root[i];
2008  while (tv != NULL) {
2009  if ((tv->tmm_flags & flags) == flags)
2010  cnt++;
2011 
2012  tv = tv->next;
2013  }
2014  }
2016  return cnt;
2017 }
2018 
2019 #ifdef DEBUG_VALIDATION
2020 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2021 {
2022  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2024  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2025  tv, s, s->tm_id, m->name);
2026  }
2027 }
2028 
2029 static void TmThreadDumpThreads(void)
2030 {
2032  for (int i = 0; i < TVT_MAX; i++) {
2033  ThreadVars *tv = tv_root[i];
2034  while (tv != NULL) {
2035  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2036  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2037  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2038  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2039  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2040  } else if (tv->stream_pq_local != NULL) {
2041  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2042  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2043  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2044  }
2045  }
2046  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2047  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2048  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2049  }
2050  TmThreadDoDumpSlots(tv);
2051  tv = tv->next;
2052  }
2053  }
2056 }
2057 #endif
2058 
2059 typedef struct Thread_ {
2060  ThreadVars *tv; /**< threadvars structure */
2061  const char *name;
2062  int type;
2063  int in_use; /**< bool to indicate this is in use */
2064 
2065  SCTime_t pktts; /**< current packet time of this thread
2066  * (offline mode) */
2067  uint32_t sys_sec_stamp; /**< timestamp in seconds of the real system
2068  * time when the pktts was last updated. */
2070 
2071 typedef struct Threads_ {
2076 
2077 static Threads thread_store = { NULL, 0, 0 };
2078 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2079 
2081 {
2082  SCMutexLock(&thread_store_lock);
2083  for (size_t s = 0; s < thread_store.threads_size; s++) {
2084  Thread *t = &thread_store.threads[s];
2085  if (t == NULL || t->in_use == 0)
2086  continue;
2087 
2088  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2089  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2090  if (t->tv) {
2091  ThreadVars *tv = t->tv;
2092  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2093  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2094  tv, tv->type, tv->name, tv->tmm_flags, flags);
2095  }
2096  }
2097  SCMutexUnlock(&thread_store_lock);
2098 }
2099 
2100 #define STEP 32
2101 /**
2102  * \retval id thread id, or 0 if not found
2103  */
2105 {
2106  SCMutexLock(&thread_store_lock);
2107  if (thread_store.threads == NULL) {
2108  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2109  BUG_ON(thread_store.threads == NULL);
2110  thread_store.threads_size = STEP;
2111  }
2112 
2113  size_t s;
2114  for (s = 0; s < thread_store.threads_size; s++) {
2115  if (thread_store.threads[s].in_use == 0) {
2116  Thread *t = &thread_store.threads[s];
2117  t->name = tv->name;
2118  t->type = type;
2119  t->tv = tv;
2120  t->in_use = 1;
2121 
2122  SCMutexUnlock(&thread_store_lock);
2123  return (int)(s+1);
2124  }
2125  }
2126 
2127  /* if we get here the array is completely filled */
2128  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2129  BUG_ON(newmem == NULL);
2130  thread_store.threads = newmem;
2131  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2132 
2133  Thread *t = &thread_store.threads[thread_store.threads_size];
2134  t->name = tv->name;
2135  t->type = type;
2136  t->tv = tv;
2137  t->in_use = 1;
2138 
2139  s = thread_store.threads_size;
2140  thread_store.threads_size += STEP;
2141 
2142  SCMutexUnlock(&thread_store_lock);
2143  return (int)(s+1);
2144 }
2145 #undef STEP
2146 
2147 void TmThreadsUnregisterThread(const int id)
2148 {
2149  SCMutexLock(&thread_store_lock);
2150  if (id <= 0 || id > (int)thread_store.threads_size) {
2151  SCMutexUnlock(&thread_store_lock);
2152  return;
2153  }
2154 
2155  /* id is one higher than index */
2156  int idx = id - 1;
2157 
2158  /* reset thread_id, which serves as clearing the record */
2159  thread_store.threads[idx].in_use = 0;
2160 
2161  /* check if we have at least one registered thread left */
2162  size_t s;
2163  for (s = 0; s < thread_store.threads_size; s++) {
2164  Thread *t = &thread_store.threads[s];
2165  if (t->in_use == 1) {
2166  goto end;
2167  }
2168  }
2169 
2170  /* if we get here no threads are registered */
2171  SCFree(thread_store.threads);
2172  thread_store.threads = NULL;
2173  thread_store.threads_size = 0;
2174  thread_store.threads_cnt = 0;
2175 
2176 end:
2177  SCMutexUnlock(&thread_store_lock);
2178 }
2179 
2180 void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2181 {
2182  SCMutexLock(&thread_store_lock);
2183  if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2184  SCMutexUnlock(&thread_store_lock);
2185  return;
2186  }
2187 
2188  int idx = id - 1;
2189  Thread *t = &thread_store.threads[idx];
2190  t->pktts = ts;
2191  struct timeval systs;
2192  gettimeofday(&systs, NULL);
2193  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2194  SCMutexUnlock(&thread_store_lock);
2195 }
2196 
2198 {
2199  bool ready = true;
2200  SCMutexLock(&thread_store_lock);
2201  for (size_t s = 0; s < thread_store.threads_size; s++) {
2202  Thread *t = &thread_store.threads[s];
2203  if (!t->in_use)
2204  break;
2205  if (t->sys_sec_stamp == 0) {
2206  ready = false;
2207  break;
2208  }
2209  }
2210  SCMutexUnlock(&thread_store_lock);
2211  return ready;
2212 }
2213 
2215 {
2216  struct timeval systs;
2217  gettimeofday(&systs, NULL);
2218  SCMutexLock(&thread_store_lock);
2219  for (size_t s = 0; s < thread_store.threads_size; s++) {
2220  Thread *t = &thread_store.threads[s];
2221  if (!t->in_use)
2222  break;
2223  t->pktts = ts;
2224  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2225  }
2226  SCMutexUnlock(&thread_store_lock);
2227 }
2228 
2229 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2230 {
2231  struct timeval local = { 0 };
2232  static struct timeval nullts;
2233  bool set = false;
2234  size_t s;
2235  struct timeval systs;
2236  gettimeofday(&systs, NULL);
2237 
2238  SCMutexLock(&thread_store_lock);
2239  for (s = 0; s < thread_store.threads_size; s++) {
2240  Thread *t = &thread_store.threads[s];
2241  if (t->in_use == 0)
2242  break;
2243  struct timeval pkttv = { .tv_sec = SCTIME_SECS(t->pktts),
2244  .tv_usec = SCTIME_USECS(t->pktts) };
2245  if (!(timercmp(&pkttv, &nullts, ==))) {
2246  /* ignore sleeping threads */
2247  if (t->sys_sec_stamp + 1 < (uint32_t)systs.tv_sec)
2248  continue;
2249 
2250  if (!set) {
2251  SCTIME_TO_TIMEVAL(&local, t->pktts);
2252  set = true;
2253  } else {
2254  if (SCTIME_CMP_LT(t->pktts, SCTIME_FROM_TIMEVAL(&local))) {
2255  SCTIME_TO_TIMEVAL(&local, t->pktts);
2256  }
2257  }
2258  }
2259  }
2260  SCMutexUnlock(&thread_store_lock);
2261  *ts = local;
2262  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2263 }
2264 
2266 {
2267  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2268  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2269  /* always create at least one thread */
2270  if (thread_max == 0)
2271  thread_max = ncpus * threading_detect_ratio;
2272  if (thread_max < 1)
2273  thread_max = 1;
2274  if (thread_max > 1024) {
2275  SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2276  thread_max = 1024;
2277  }
2278  return (uint16_t)thread_max;
2279 }
2280 
2281 /** \brief inject a flow into a threads flow queue
2282  */
2283 void TmThreadsInjectFlowById(Flow *f, const int id)
2284 {
2285  BUG_ON(id <= 0 || id > (int)thread_store.threads_size);
2286 
2287  int idx = id - 1;
2288 
2289  Thread *t = &thread_store.threads[idx];
2290  ThreadVars *tv = t->tv;
2291 
2292  BUG_ON(tv == NULL || tv->flow_queue == NULL);
2293 
2294  FlowEnqueue(tv->flow_queue, f);
2295 
2296  /* wake up listening thread(s) if necessary */
2297  if (tv->inq != NULL) {
2298  SCCondSignal(&tv->inq->pq->cond_q);
2299  } else if (tv->break_loop) {
2300  TmThreadsCaptureBreakLoop(tv);
2301  }
2302 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:81
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:805
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:73
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:76
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:73
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:134
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2197
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1707
tm-threads.h
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:70
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
THV_USE
#define THV_USE
Definition: threadvars.h:35
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:131
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1651
Threads
struct Threads_ Threads
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1092
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:846
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:90
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:83
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:315
TmThreadContinueThreads
void TmThreadContinueThreads(void)
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1898
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1035
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:387
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:379
TmThreadSetGroupName
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1608
CaptureStatsSetup
void CaptureStatsSetup(ThreadVars *tv)
Definition: decode.c:899
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:104
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:47
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
PKT_SRC_SHUTDOWN_FLUSH
@ PKT_SRC_SHUTDOWN_FLUSH
Definition: decode.h:66
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:99
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1771
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:175
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:56
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:44
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:86
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:64
Packet_::flags
uint32_t flags
Definition: decode.h:467
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:75
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:347
ThreadVars_::t
pthread_t t
Definition: threadvars.h:58
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:303
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:61
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:84
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
Thread_::pktts
SCTime_t pktts
Definition: tm-threads.c:2065
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2002
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:103
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:79
StatsSetupPrivate
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1214
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:116
MIN
#define MIN(x, y)
Definition: suricata-common.h:386
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:80
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:73
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1348
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:387
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:121
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:757
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:89
m
SCMutex m
Definition: flow-hash.h:6
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:44
TmModule_::ThreadBusy
bool(* ThreadBusy)(ThreadVars *tv, void *thread_data)
Definition: tm-modules.h:63
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:141
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:356
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:85
rww_lock_contention
thread_local uint64_t rww_lock_contention
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:56
Threads_::threads
Thread * threads
Definition: tm-threads.c:2072
PacketQueueNoLock_
simple fifo queue for packets
Definition: packet-queue.h:34
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:173
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:53
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2100
Thread_::in_use
int in_use
Definition: tm-threads.c:2063
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:37
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(void)
Disable all packet threads.
Definition: tm-threads.c:1476
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:284
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:84
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2283
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:80
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2229
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmThreadsProcessDecodePseudoPackets
TmEcode TmThreadsProcessDecodePseudoPackets(ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
Definition: tm-threads.c:112
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:53
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:45
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:55
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1056
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:107
util-signal.h
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:78
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:768
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1523
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2060
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1785
AffinityGetNextCPU
uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
Definition: util-affinity.c:283
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:57
type
uint8_t type
Definition: decode-icmpv4.h:0
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:1939
Threads_::threads_cnt
int threads_cnt
Definition: tm-threads.c:2074
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:77
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:92
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:95
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:208
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:58
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
threading_set_stack_size
uint64_t threading_set_stack_size
Definition: runmodes.c:76
ThreadVars_::perf_public_ctx
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:127
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1002
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
Definition: tm-threads.c:2180
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
util-affinity.h
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:71
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1753
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:65
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1626
THV_KILL
#define THV_KILL
Definition: threadvars.h:39
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:905
FlowQueueNew
FlowQueue * FlowQueueNew(void)
Definition: flow-queue.c:35
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:740
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2147
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2104
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
threading_set_cpu_affinity
int threading_set_cpu_affinity
Definition: runmodes.c:75
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
ThreadVars_::id
int id
Definition: threadvars.h:86
ThreadVars_::type
uint8_t type
Definition: threadvars.h:71
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:295
SCTIME_TO_TIMEVAL
#define SCTIME_TO_TIMEVAL(tv, t)
Definition: util-time.h:89
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:110
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:83
SCReturn
#define SCReturn
Definition: util-debug.h:273
stream.h
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1554
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:139
Packet_
Definition: decode.h:430
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:132
ThreadVars_::thread_group_name
char * thread_group_name
Definition: threadvars.h:66
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:90
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1524
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:74
TmSlot_
Definition: tm-threads.h:53
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:67
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:68
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:83
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
runmodes.h
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1064
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:31
TmModule_
Definition: tm-modules.h:43
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:255
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
Definition: tm-threads.c:2214
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:115
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
SCTIME_CMP_LT
#define SCTIME_CMP_LT(a, b)
Definition: util-time.h:97
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:38
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:36
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:640
StatsPublicThreadContext_::m
SCMutex m
Definition: counters.h:75
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1525
Thread_::sys_sec_stamp
uint32_t sys_sec_stamp
Definition: tm-threads.c:2067
Thread_::type
int type
Definition: tm-threads.c:2062
Thread
struct Thread_ Thread
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:89
Packet_::flow
struct Flow_ * flow
Definition: decode.h:469
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1151
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:74
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:53
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:814
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:93
Threads_
Definition: tm-threads.c:2071
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:35
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:34
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:105
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:40
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:135
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:100
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:88
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:830
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
SCTIME_SECS
#define SCTIME_SECS(t)
Definition: util-time.h:57
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:66
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:89
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:71
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
FatalError
#define FatalError(...)
Definition: util-debug.h:502
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax(void)
Definition: tm-threads.c:2265
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:65
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1125
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:142
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:216
threadvars.h
util-validate.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:173
ThreadsAffinityType_
Definition: util-affinity.h:65
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:37
THV_KILL_PKTACQ
#define THV_KILL_PKTACQ
Definition: threadvars.h:46
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:607
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:94
Thread_::name
const char * name
Definition: tm-threads.c:2061
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:131
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:50
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:90
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:68
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:82
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:67
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:318
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:83
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:111
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
suricata.h
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:453
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:140
Tmq_
Definition: tm-queues.h:29
TmThreadWaitOnThreadRunning
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
Definition: tm-threads.c:1796
StatsSyncCounters
#define StatsSyncCounters(tv)
Definition: counters.h:138
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2080
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1915
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:131
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:82
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:69
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:376
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:108
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:316
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:91
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:91
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:41
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2059
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:275
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
Threads_::threads_size
size_t threads_size
Definition: tm-threads.c:2073
StatsThreadCleanup
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1313
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:62
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:208
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:104
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:76
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:978
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:360
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:172
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:351
SCTIME_USECS
#define SCTIME_USECS(t)
Definition: util-time.h:56