suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2022 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-affinity.h"
39 #include "util-debug.h"
40 #include "util-privs.h"
41 #include "util-cpu.h"
42 #include "util-optimize.h"
43 #include "util-profiling.h"
44 #include "util-signal.h"
45 #include "queue.h"
46 #include "util-validate.h"
47 
48 #ifdef PROFILE_LOCKING
49 thread_local uint64_t mutex_lock_contention;
50 thread_local uint64_t mutex_lock_wait_ticks;
51 thread_local uint64_t mutex_lock_cnt;
52 
53 thread_local uint64_t spin_lock_contention;
54 thread_local uint64_t spin_lock_wait_ticks;
55 thread_local uint64_t spin_lock_cnt;
56 
57 thread_local uint64_t rww_lock_contention;
58 thread_local uint64_t rww_lock_wait_ticks;
59 thread_local uint64_t rww_lock_cnt;
60 
61 thread_local uint64_t rwr_lock_contention;
62 thread_local uint64_t rwr_lock_wait_ticks;
63 thread_local uint64_t rwr_lock_cnt;
64 #endif
65 
66 #ifdef OS_FREEBSD
67 #include <sched.h>
68 #include <sys/param.h>
69 #include <sys/resource.h>
70 #include <sys/cpuset.h>
71 #include <sys/thr.h>
72 #define cpu_set_t cpuset_t
73 #endif /* OS_FREEBSD */
74 
75 /* prototypes */
76 static int SetCPUAffinity(uint16_t cpu);
77 static void TmThreadDeinitMC(ThreadVars *tv);
78 
79 /* root of the threadvars list */
80 ThreadVars *tv_root[TVT_MAX] = { NULL };
81 
82 /* lock to protect tv_root */
84 
85 /**
86  * \brief Check if a thread flag is set.
87  *
88  * \retval 1 flag is set.
89  * \retval 0 flag is not set.
90  */
91 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
92 {
93  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
94 }
95 
96 /**
97  * \brief Set a thread flag.
98  */
99 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
100 {
101  SC_ATOMIC_OR(tv->flags, flag);
102 }
103 
104 /**
105  * \brief Unset a thread flag.
106  */
107 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
108 {
109  SC_ATOMIC_AND(tv->flags, ~flag);
110 }
111 
113  ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
114 {
115  while (decode_pq->top != NULL) {
116  Packet *extra_p = PacketDequeueNoLock(decode_pq);
117  if (unlikely(extra_p == NULL))
118  continue;
119  DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
120 
121  if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
123  }
124  }
126 }
127 
128 /**
129  * \brief Separate run function so we can call it recursively.
130  */
132 {
133  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
134  PACKET_PROFILING_TMM_START(p, s->tm_id);
135  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
136  PACKET_PROFILING_TMM_END(p, s->tm_id);
137  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
138 
139  /* handle error */
140  if (unlikely(r == TM_ECODE_FAILED)) {
141  /* Encountered error. Return packets to packetpool and return */
142  TmThreadsSlotProcessPktFail(tv, s, NULL);
143  return TM_ECODE_FAILED;
144  }
145  if (s->tm_flags & TM_FLAG_DECODE_TM) {
146  if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
147  TM_ECODE_OK) {
148  return TM_ECODE_FAILED;
149  }
150  }
151  }
152 
153  return TM_ECODE_OK;
154 }
155 
156 /** \internal
157  *
158  * \brief Process flow timeout packets
159  *
160  * Process flow timeout pseudo packets. During shutdown this loop
161  * is run until the flow engine kills the thread and the queue is
162  * empty.
163  */
164 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
165 {
166  TmSlot *fw_slot = tv->tm_flowworker;
167  int r = TM_ECODE_OK;
168 
169  if (tv->stream_pq == NULL || fw_slot == NULL) {
170  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
171  return r;
172  }
173 
174  SCLogDebug("flow end loop starting");
175  while (1) {
177  uint32_t len = tv->stream_pq->len;
179  if (len > 0) {
180  while (len--) {
184  if (likely(p)) {
185  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
186  r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
187  if (r == TM_ECODE_FAILED) {
188  break;
189  }
190  }
191  }
192  } else {
194  break;
195  }
196  SleepUsec(1);
197  }
198  }
199  SCLogDebug("flow end loop complete");
201 
202  return r;
203 }
204 
205 /*
206 
207  pcap/nfq
208 
209  pkt read
210  callback
211  process_pkt
212 
213  pfring
214 
215  pkt read
216  process_pkt
217 
218  slot:
219  setup
220 
221  pkt_ack_loop(tv, slot_data)
222 
223  deinit
224 
225  process_pkt:
226  while(s)
227  run s;
228  queue;
229 
230  */
231 
232 static void *TmThreadsSlotPktAcqLoop(void *td)
233 {
234  ThreadVars *tv = (ThreadVars *)td;
235  TmSlot *s = tv->tm_slots;
236  char run = 1;
237  TmEcode r = TM_ECODE_OK;
238  TmSlot *slot = NULL;
239 
241 
242  if (tv->thread_setup_flags != 0)
244 
245  /* Drop the capabilities for this thread */
246  SCDropCaps(tv);
248  PacketPoolInit();
249 
250  /* check if we are setup properly */
251  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
252  SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
253  " PktAcqLoop=%p, tmqh_in=%p,"
254  " tmqh_out=%p",
255  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
257  pthread_exit((void *) -1);
258  return NULL;
259  }
260 
261  for (slot = s; slot != NULL; slot = slot->slot_next) {
262  if (slot->SlotThreadInit != NULL) {
263  void *slot_data = NULL;
264  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
265  if (r != TM_ECODE_OK) {
266  if (r == TM_ECODE_DONE) {
267  EngineDone();
269  goto error;
270  } else {
272  goto error;
273  }
274  }
275  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
276  }
277 
278  /* if the flowworker module is the first, get the threads input queue */
279  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
280  tv->stream_pq = tv->inq->pq;
281  tv->tm_flowworker = slot;
282  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
284  if (tv->flow_queue == NULL) {
286  pthread_exit((void *) -1);
287  return NULL;
288  }
289  /* setup a queue */
290  } else if (slot->tm_id == TMM_FLOWWORKER) {
291  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
292  if (tv->stream_pq_local == NULL)
293  FatalError("failed to alloc PacketQueue");
296  tv->tm_flowworker = slot;
297  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
299  if (tv->flow_queue == NULL) {
301  pthread_exit((void *) -1);
302  return NULL;
303  }
304  }
305  }
306 
308 
310 
311  while(run) {
316  }
317 
318  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
319 
320  if (r == TM_ECODE_FAILED) {
322  run = 0;
323  }
325  run = 0;
326  }
327  if (r == TM_ECODE_DONE) {
328  run = 0;
329  }
330  }
332 
334 
335  /* process all pseudo packets the flow timeout may throw at us */
336  TmThreadTimeoutLoop(tv, s);
337 
340 
342 
343  for (slot = s; slot != NULL; slot = slot->slot_next) {
344  if (slot->SlotThreadExitPrintStats != NULL) {
345  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
346  }
347 
348  if (slot->SlotThreadDeinit != NULL) {
349  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
350  if (r != TM_ECODE_OK) {
352  goto error;
353  }
354  }
355  }
356 
357  tv->stream_pq = NULL;
358  SCLogDebug("%s ending", tv->name);
360  pthread_exit((void *) 0);
361  return NULL;
362 
363 error:
364  tv->stream_pq = NULL;
365  pthread_exit((void *) -1);
366  return NULL;
367 }
368 
369 static void *TmThreadsSlotVar(void *td)
370 {
371  ThreadVars *tv = (ThreadVars *)td;
372  TmSlot *s = (TmSlot *)tv->tm_slots;
373  Packet *p = NULL;
374  char run = 1;
375  TmEcode r = TM_ECODE_OK;
376 
378  PacketPoolInit();//Empty();
379 
381 
382  if (tv->thread_setup_flags != 0)
384 
385  /* Drop the capabilities for this thread */
386  SCDropCaps(tv);
387 
388  /* check if we are setup properly */
389  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
391  pthread_exit((void *) -1);
392  return NULL;
393  }
394 
395  for (; s != NULL; s = s->slot_next) {
396  if (s->SlotThreadInit != NULL) {
397  void *slot_data = NULL;
398  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
399  if (r != TM_ECODE_OK) {
401  goto error;
402  }
403  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
404  }
405 
406  /* special case: we need to access the stream queue
407  * from the flow timeout code */
408 
409  /* if the flowworker module is the first, get the threads input queue */
410  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
411  tv->stream_pq = tv->inq->pq;
412  tv->tm_flowworker = s;
413  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
415  if (tv->flow_queue == NULL) {
417  pthread_exit((void *) -1);
418  return NULL;
419  }
420  /* setup a queue */
421  } else if (s->tm_id == TMM_FLOWWORKER) {
422  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
423  if (tv->stream_pq_local == NULL)
424  FatalError("failed to alloc PacketQueue");
427  tv->tm_flowworker = s;
428  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
430  if (tv->flow_queue == NULL) {
432  pthread_exit((void *) -1);
433  return NULL;
434  }
435  }
436  }
437 
439 
440  // Each 'worker' thread uses this func to process/decode the packet read.
441  // Each decode method is different to receive methods in that they do not
442  // enter infinite loops. They use this as the core loop. As a result, at this
443  // point the worker threads can be considered both initialized and running.
445 
446  s = (TmSlot *)tv->tm_slots;
447 
448  while (run) {
453  }
454 
455  /* input a packet */
456  p = tv->tmqh_in(tv);
457 
458  /* if we didn't get a packet see if we need to do some housekeeping */
459  if (unlikely(p == NULL)) {
460  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
462  if (p != NULL) {
465  }
466  }
467  }
468 
469  if (p != NULL) {
470  /* run the thread module(s) */
471  r = TmThreadsSlotVarRun(tv, p, s);
472  if (r == TM_ECODE_FAILED) {
475  break;
476  }
477 
478  /* output the packet */
479  tv->tmqh_out(tv, p);
480 
481  /* now handle the stream pq packets */
482  TmThreadsHandleInjectedPackets(tv);
483  }
484 
486  run = 0;
487  }
488  } /* while (run) */
490 
493 
495 
496  s = (TmSlot *)tv->tm_slots;
497 
498  for ( ; s != NULL; s = s->slot_next) {
499  if (s->SlotThreadExitPrintStats != NULL) {
500  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
501  }
502 
503  if (s->SlotThreadDeinit != NULL) {
504  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
505  if (r != TM_ECODE_OK) {
507  goto error;
508  }
509  }
510  }
511 
512  SCLogDebug("%s ending", tv->name);
513  tv->stream_pq = NULL;
515  pthread_exit((void *) 0);
516  return NULL;
517 
518 error:
519  tv->stream_pq = NULL;
520  pthread_exit((void *) -1);
521  return NULL;
522 }
523 
524 static void *TmThreadsManagement(void *td)
525 {
526  ThreadVars *tv = (ThreadVars *)td;
527  TmSlot *s = (TmSlot *)tv->tm_slots;
528  TmEcode r = TM_ECODE_OK;
529 
530  BUG_ON(s == NULL);
531 
533 
534  if (tv->thread_setup_flags != 0)
536 
537  /* Drop the capabilities for this thread */
538  SCDropCaps(tv);
539 
540  SCLogDebug("%s starting", tv->name);
541 
542  if (s->SlotThreadInit != NULL) {
543  void *slot_data = NULL;
544  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
545  if (r != TM_ECODE_OK) {
547  pthread_exit((void *) -1);
548  return NULL;
549  }
550  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
551  }
552 
554 
556 
557  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
558  /* handle error */
559  if (r == TM_ECODE_FAILED) {
561  }
562 
565  }
566 
569 
570  if (s->SlotThreadExitPrintStats != NULL) {
571  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
572  }
573 
574  if (s->SlotThreadDeinit != NULL) {
575  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
576  if (r != TM_ECODE_OK) {
578  pthread_exit((void *) -1);
579  return NULL;
580  }
581  }
582 
584  pthread_exit((void *) 0);
585  return NULL;
586 }
587 
588 /**
589  * \brief We set the slot functions.
590  *
591  * \param tv Pointer to the TV to set the slot function for.
592  * \param name Name of the slot variant.
593  * \param fn_p Pointer to a custom slot function. Used only if slot variant
594  * "name" is "custom".
595  *
596  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
597  */
598 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
599 {
600  if (name == NULL) {
601  if (fn_p == NULL) {
602  printf("Both slot name and function pointer can't be NULL inside "
603  "TmThreadSetSlots\n");
604  goto error;
605  } else {
606  name = "custom";
607  }
608  }
609 
610  if (strcmp(name, "varslot") == 0) {
611  tv->tm_func = TmThreadsSlotVar;
612  } else if (strcmp(name, "pktacqloop") == 0) {
613  tv->tm_func = TmThreadsSlotPktAcqLoop;
614  } else if (strcmp(name, "management") == 0) {
615  tv->tm_func = TmThreadsManagement;
616  } else if (strcmp(name, "command") == 0) {
617  tv->tm_func = TmThreadsManagement;
618  } else if (strcmp(name, "custom") == 0) {
619  if (fn_p == NULL)
620  goto error;
621  tv->tm_func = fn_p;
622  } else {
623  printf("Error: Slot \"%s\" not supported\n", name);
624  goto error;
625  }
626 
627  return TM_ECODE_OK;
628 
629 error:
630  return TM_ECODE_FAILED;
631 }
632 
633 /**
634  * \brief Appends a new entry to the slots.
635  *
636  * \param tv TV the slot is attached to.
637  * \param tm TM to append.
638  * \param data Data to be passed on to the slot init function.
639  *
640  * \retval The allocated TmSlot or NULL if there is an error
641  */
642 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
643 {
644  TmSlot *slot = SCCalloc(1, sizeof(TmSlot));
645  if (unlikely(slot == NULL))
646  return;
647  SC_ATOMIC_INITPTR(slot->slot_data);
648  slot->SlotThreadInit = tm->ThreadInit;
649  slot->slot_initdata = data;
650  if (tm->Func) {
651  slot->SlotFunc = tm->Func;
652  } else if (tm->PktAcqLoop) {
653  slot->PktAcqLoop = tm->PktAcqLoop;
654  if (tm->PktAcqBreakLoop) {
655  tv->break_loop = true;
656  }
657  } else if (tm->Management) {
658  slot->Management = tm->Management;
659  }
661  slot->SlotThreadDeinit = tm->ThreadDeinit;
662  /* we don't have to check for the return value "-1". We wouldn't have
663  * received a TM as arg, if it didn't exist */
664  slot->tm_id = TmModuleGetIDForTM(tm);
665  slot->tm_flags |= tm->flags;
666 
667  tv->tmm_flags |= tm->flags;
668  tv->cap_flags |= tm->cap_flags;
669 
670  if (tv->tm_slots == NULL) {
671  tv->tm_slots = slot;
672  } else {
673  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
674 
675  /* get the last slot */
676  for ( ; a != NULL; a = a->slot_next) {
677  b = a;
678  }
679  /* append the new slot */
680  if (b != NULL) {
681  b->slot_next = slot;
682  }
683  }
684  return;
685 }
686 
687 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
688 static int SetCPUAffinitySet(cpu_set_t *cs)
689 {
690 #if defined OS_FREEBSD
691  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
692  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
693 #elif OS_DARWIN
694  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
695  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
696 #else
697  pid_t tid = syscall(SYS_gettid);
698  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
699 #endif /* OS_FREEBSD */
700 
701  if (r != 0) {
702  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
703  strerror(errno));
704  return -1;
705  }
706 
707  return 0;
708 }
709 #endif
710 
711 
712 /**
713  * \brief Set the thread affinity on the calling thread.
714  *
715  * \param cpuid Id of the core/cpu to setup the affinity.
716  *
717  * \retval 0 If all goes well; -1 if something is wrong.
718  */
719 static int SetCPUAffinity(uint16_t cpuid)
720 {
721 #if defined __OpenBSD__ || defined sun
722  return 0;
723 #else
724  int cpu = (int)cpuid;
725 
726 #if defined OS_WIN32 || defined __CYGWIN__
727  DWORD cs = 1 << cpu;
728 
729  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
730  if (r != 0) {
731  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
732  strerror(errno));
733  return -1;
734  }
735  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
736  SCGetThreadIdLong(), cpu);
737 
738  return 0;
739 
740 #else
741  cpu_set_t cs;
742  memset(&cs, 0, sizeof(cs));
743 
744  CPU_ZERO(&cs);
745  CPU_SET(cpu, &cs);
746  return SetCPUAffinitySet(&cs);
747 #endif /* windows */
748 #endif /* not supported */
749 }
750 
751 
752 /**
753  * \brief Set the thread options (thread priority).
754  *
755  * \param tv Pointer to the ThreadVars to setup the thread priority.
756  *
757  * \retval TM_ECODE_OK.
758  */
760 {
762  tv->thread_priority = prio;
763 
764  return TM_ECODE_OK;
765 }
766 
767 /**
768  * \brief Adjusting nice value for threads.
769  */
771 {
772  SCEnter();
773 #ifndef __CYGWIN__
774 #ifdef OS_WIN32
775  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
776  SCLogError("Error setting priority for "
777  "thread %s: %s",
778  tv->name, strerror(errno));
779  } else {
780  SCLogDebug("Priority set to %"PRId32" for thread %s",
782  }
783 #else
784  int ret = nice(tv->thread_priority);
785  if (ret == -1) {
786  SCLogError("Error setting nice value %d "
787  "for thread %s: %s",
788  tv->thread_priority, tv->name, strerror(errno));
789  } else {
790  SCLogDebug("Nice value set to %"PRId32" for thread %s",
792  }
793 #endif /* OS_WIN32 */
794 #endif
795  SCReturn;
796 }
797 
798 
799 /**
800  * \brief Set the thread options (cpu affinity).
801  *
802  * \param tv pointer to the ThreadVars to setup the affinity.
803  * \param cpu cpu on which affinity is set.
804  *
805  * \retval TM_ECODE_OK
806  */
808 {
810  tv->cpu_affinity = cpu;
811 
812  return TM_ECODE_OK;
813 }
814 
815 
817 {
819  return TM_ECODE_OK;
820 
821  if (type > MAX_CPU_SET) {
822  SCLogError("invalid cpu type family");
823  return TM_ECODE_FAILED;
824  }
825 
827  tv->cpu_affinity = type;
828 
829  return TM_ECODE_OK;
830 }
831 
833 {
834  if (type >= MAX_CPU_SET) {
835  SCLogError("invalid cpu type family");
836  return 0;
837  }
838 
840 }
841 
842 /**
843  * \brief Set the thread options (cpu affinitythread).
844  * Priority should be already set by pthread_create.
845  *
846  * \param tv pointer to the ThreadVars of the calling thread.
847  */
849 {
851  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
852  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
854  SetCPUAffinity(tv->cpu_affinity);
855  }
856 
857 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
862  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
863  uint16_t cpu = AffinityGetNextCPU(taf);
864  SetCPUAffinity(cpu);
865  /* If CPU is in a set overwrite the default thread prio */
866  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
868  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
870  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
872  } else {
873  tv->thread_priority = taf->prio;
874  }
875  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
876  "%d, thread id %lu", tv->thread_priority,
877  tv->name, cpu, SCGetThreadIdLong());
878  } else {
879  SetCPUAffinitySet(&taf->cpu_set);
880  tv->thread_priority = taf->prio;
881  SCLogPerf("Setting prio %d for thread \"%s\", "
882  "thread id %lu", tv->thread_priority,
884  }
886  }
887 #endif
888 
889  return TM_ECODE_OK;
890 }
891 
892 /**
893  * \brief Creates and returns the TV instance for a new thread.
894  *
895  * \param name Name of this TV instance
896  * \param inq_name Incoming queue name
897  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
898  * \param outq_name Outgoing queue name
899  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
900  * \param slots String representation for the slot function to be used
901  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
902  * \param mucond Flag to indicate whether to initialize the condition
903  * and the mutex variables for this newly created TV.
904  *
905  * \retval the newly created TV instance, or NULL on error
906  */
907 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
908  const char *outq_name, const char *outqh_name, const char *slots,
909  void * (*fn_p)(void *), int mucond)
910 {
911  ThreadVars *tv = NULL;
912  Tmq *tmq = NULL;
913  Tmqh *tmqh = NULL;
914 
915  SCLogDebug("creating thread \"%s\"...", name);
916 
917  /* XXX create separate function for this: allocate a thread container */
918  tv = SCCalloc(1, sizeof(ThreadVars));
919  if (unlikely(tv == NULL))
920  goto error;
921 
922  SC_ATOMIC_INIT(tv->flags);
923  SCMutexInit(&tv->perf_public_ctx.m, NULL);
924 
925  strlcpy(tv->name, name, sizeof(tv->name));
926 
927  /* default state for every newly created thread */
930 
931  /* set the incoming queue */
932  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
933  SCLogDebug("inq_name \"%s\"", inq_name);
934 
935  tmq = TmqGetQueueByName(inq_name);
936  if (tmq == NULL) {
937  tmq = TmqCreateQueue(inq_name);
938  if (tmq == NULL)
939  goto error;
940  }
941  SCLogDebug("tmq %p", tmq);
942 
943  tv->inq = tmq;
944  tv->inq->reader_cnt++;
945  SCLogDebug("tv->inq %p", tv->inq);
946  }
947  if (inqh_name != NULL) {
948  SCLogDebug("inqh_name \"%s\"", inqh_name);
949 
950  int id = TmqhNameToID(inqh_name);
951  if (id <= 0) {
952  goto error;
953  }
954  tmqh = TmqhGetQueueHandlerByName(inqh_name);
955  if (tmqh == NULL)
956  goto error;
957 
958  tv->tmqh_in = tmqh->InHandler;
959  tv->inq_id = (uint8_t)id;
960  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
961  }
962 
963  /* set the outgoing queue */
964  if (outqh_name != NULL) {
965  SCLogDebug("outqh_name \"%s\"", outqh_name);
966 
967  int id = TmqhNameToID(outqh_name);
968  if (id <= 0) {
969  goto error;
970  }
971 
972  tmqh = TmqhGetQueueHandlerByName(outqh_name);
973  if (tmqh == NULL)
974  goto error;
975 
976  tv->tmqh_out = tmqh->OutHandler;
977  tv->outq_id = (uint8_t)id;
978 
979  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
980  SCLogDebug("outq_name \"%s\"", outq_name);
981 
982  if (tmqh->OutHandlerCtxSetup != NULL) {
983  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
984  if (tv->outctx == NULL)
985  goto error;
986  tv->outq = NULL;
987  } else {
988  tmq = TmqGetQueueByName(outq_name);
989  if (tmq == NULL) {
990  tmq = TmqCreateQueue(outq_name);
991  if (tmq == NULL)
992  goto error;
993  }
994  SCLogDebug("tmq %p", tmq);
995 
996  tv->outq = tmq;
997  tv->outctx = NULL;
998  tv->outq->writer_cnt++;
999  }
1000  }
1001  }
1002 
1003  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1004  goto error;
1005  }
1006 
1007  if (mucond != 0)
1008  TmThreadInitMC(tv);
1009 
1010  return tv;
1011 
1012 error:
1013  SCLogError("failed to setup a thread");
1014 
1015  if (tv != NULL)
1016  SCFree(tv);
1017  return NULL;
1018 }
1019 
1020 /**
1021  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1022  * This function doesn't support custom slots, and hence shouldn't be
1023  * supplied \"custom\" as its slot type. All PPT threads are created
1024  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1025  * conditional variables are not used to kill the thread.
1026  *
1027  * \param name Name of this TV instance
1028  * \param inq_name Incoming queue name
1029  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1030  * \param outq_name Outgoing queue name
1031  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1032  * \param slots String representation for the slot function to be used
1033  *
1034  * \retval the newly created TV instance, or NULL on error
1035  */
1036 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1037  const char *inqh_name, const char *outq_name,
1038  const char *outqh_name, const char *slots)
1039 {
1040  ThreadVars *tv = NULL;
1041 
1042  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1043  slots, NULL, 0);
1044 
1045  if (tv != NULL) {
1046  tv->type = TVT_PPT;
1048  }
1049 
1050  return tv;
1051 }
1052 
1053 /**
1054  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1055  * This function supports only custom slot functions and hence a
1056  * function pointer should be sent as an argument.
1057  *
1058  * \param name Name of this TV instance
1059  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1060  * \param mucond Flag to indicate whether to initialize the condition
1061  * and the mutex variables for this newly created TV.
1062  *
1063  * \retval the newly created TV instance, or NULL on error
1064  */
1065 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1066  int mucond)
1067 {
1068  ThreadVars *tv = NULL;
1069 
1070  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1071 
1072  if (tv != NULL) {
1073  tv->type = TVT_MGMT;
1076  }
1077 
1078  return tv;
1079 }
1080 
1081 /**
1082  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1083  * This function supports only custom slot functions and hence a
1084  * function pointer should be sent as an argument.
1085  *
1086  * \param name Name of this TV instance
1087  * \param module Name of TmModule with MANAGEMENT flag set.
1088  * \param mucond Flag to indicate whether to initialize the condition
1089  * and the mutex variables for this newly created TV.
1090  *
1091  * \retval the newly created TV instance, or NULL on error
1092  */
1093 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1094  int mucond)
1095 {
1096  ThreadVars *tv = NULL;
1097 
1098  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1099 
1100  if (tv != NULL) {
1101  tv->type = TVT_MGMT;
1104 
1105  TmModule *m = TmModuleGetByName(module);
1106  if (m) {
1107  TmSlotSetFuncAppend(tv, m, NULL);
1108  }
1109  }
1110 
1111  return tv;
1112 }
1113 
1114 /**
1115  * \brief Creates and returns the TV instance for a Command thread (CMD).
1116  * This function supports only custom slot functions and hence a
1117  * function pointer should be sent as an argument.
1118  *
1119  * \param name Name of this TV instance
1120  * \param module Name of TmModule with COMMAND flag set.
1121  * \param mucond Flag to indicate whether to initialize the condition
1122  * and the mutex variables for this newly created TV.
1123  *
1124  * \retval the newly created TV instance, or NULL on error
1125  */
1126 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1127  int mucond)
1128 {
1129  ThreadVars *tv = NULL;
1130 
1131  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1132 
1133  if (tv != NULL) {
1134  tv->type = TVT_CMD;
1137 
1138  TmModule *m = TmModuleGetByName(module);
1139  if (m) {
1140  TmSlotSetFuncAppend(tv, m, NULL);
1141  }
1142  }
1143 
1144  return tv;
1145 }
1146 
1147 /**
1148  * \brief Appends this TV to tv_root based on its type
1149  *
1150  * \param type holds the type this TV belongs to.
1151  */
1153 {
1155 
1156  if (tv_root[type] == NULL) {
1157  tv_root[type] = tv;
1158  tv->next = NULL;
1159 
1161 
1162  return;
1163  }
1164 
1165  ThreadVars *t = tv_root[type];
1166 
1167  while (t) {
1168  if (t->next == NULL) {
1169  t->next = tv;
1170  tv->next = NULL;
1171  break;
1172  }
1173 
1174  t = t->next;
1175  }
1176 
1178 
1179  return;
1180 }
1181 
1182 static bool ThreadStillHasPackets(ThreadVars *tv)
1183 {
1184  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1185  /* we wait till we dry out all the inq packets, before we
1186  * kill this thread. Do note that you should have disabled
1187  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1188  PacketQueue *q = tv->inq->pq;
1189  SCMutexLock(&q->mutex_q);
1190  uint32_t len = q->len;
1191  SCMutexUnlock(&q->mutex_q);
1192  if (len != 0) {
1193  return true;
1194  }
1195  }
1196 
1197  if (tv->stream_pq != NULL) {
1199  uint32_t len = tv->stream_pq->len;
1201 
1202  if (len != 0) {
1203  return true;
1204  }
1205  }
1206  return false;
1207 }
1208 
1209 /**
1210  * \brief Kill a thread.
1211  *
1212  * \param tv A ThreadVars instance corresponding to the thread that has to be
1213  * killed.
1214  *
1215  * \retval r 1 killed successfully
1216  * 0 not yet ready, needs another look
1217  */
1218 static int TmThreadKillThread(ThreadVars *tv)
1219 {
1220  BUG_ON(tv == NULL);
1221 
1222  /* kill only once :) */
1223  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1224  return 1;
1225  }
1226 
1227  /* set the thread flag informing the thread that it needs to be
1228  * terminated */
1231 
1232  /* to be sure, signal more */
1233  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1234  if (tv->inq_id != TMQH_NOT_SET) {
1236  if (qh != NULL && qh->InShutdownHandler != NULL) {
1237  qh->InShutdownHandler(tv);
1238  }
1239  }
1240  if (tv->inq != NULL) {
1241  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1242  SCMutexLock(&tv->inq->pq->mutex_q);
1243  SCCondSignal(&tv->inq->pq->cond_q);
1244  SCMutexUnlock(&tv->inq->pq->mutex_q);
1245  }
1246  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1247  }
1248 
1249  if (tv->ctrl_cond != NULL ) {
1251  pthread_cond_broadcast(tv->ctrl_cond);
1253  }
1254  return 0;
1255  }
1256 
1257  if (tv->outctx != NULL) {
1258  if (tv->outq_id != TMQH_NOT_SET) {
1260  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1261  qh->OutHandlerCtxFree(tv->outctx);
1262  tv->outctx = NULL;
1263  }
1264  }
1265  }
1266 
1267  /* join it and flag it as dead */
1268  pthread_join(tv->t, NULL);
1269  SCLogDebug("thread %s stopped", tv->name);
1271  return 1;
1272 }
1273 
1274 static bool ThreadBusy(ThreadVars *tv)
1275 {
1276  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
1277  TmModule *tm = TmModuleGetById(s->tm_id);
1278  if (tm && tm->ThreadBusy != NULL) {
1279  if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
1280  return true;
1281  }
1282  }
1283  return false;
1284 }
1285 
1286 /** \internal
1287  *
1288  * \brief make sure that all packet threads are done processing their
1289  * in-flight packets, including 'injected' flow packets.
1290  */
1291 static void TmThreadDrainPacketThreads(void)
1292 {
1293  ThreadVars *tv = NULL;
1294  struct timeval start_ts;
1295  struct timeval cur_ts;
1296  gettimeofday(&start_ts, NULL);
1297 
1298 again:
1299  gettimeofday(&cur_ts, NULL);
1300  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1301  SCLogWarning("unable to get all packet threads "
1302  "to process their packets in time");
1303  return;
1304  }
1305 
1307 
1308  /* all receive threads are part of packet processing threads */
1309  tv = tv_root[TVT_PPT];
1310  while (tv) {
1311  if (ThreadStillHasPackets(tv)) {
1312  /* we wait till we dry out all the inq packets, before we
1313  * kill this thread. Do note that you should have disabled
1314  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1316 
1317  /* sleep outside lock */
1318  SleepMsec(1);
1319  goto again;
1320  }
1321  if (ThreadBusy(tv)) {
1323 
1324  Packet *p = PacketGetFromAlloc();
1325  if (p != NULL) {
1328  PacketQueue *q = tv->stream_pq;
1329  SCMutexLock(&q->mutex_q);
1330  PacketEnqueue(q, p);
1331  SCCondSignal(&q->cond_q);
1332  SCMutexUnlock(&q->mutex_q);
1333  }
1334 
1335  /* don't sleep while holding a lock */
1336  SleepMsec(1);
1337  goto again;
1338  }
1339  tv = tv->next;
1340  }
1341 
1343  return;
1344 }
1345 
1346 /**
1347  * \brief Disable all threads having the specified TMs.
1348  *
1349  * Breaks out of the packet acquisition loop, and bumps
1350  * into the 'flow loop', where it will process packets
1351  * from the flow engine's shutdown handling.
1352  */
1354 {
1355  ThreadVars *tv = NULL;
1356  struct timeval start_ts;
1357  struct timeval cur_ts;
1358  gettimeofday(&start_ts, NULL);
1359 
1360 again:
1361  gettimeofday(&cur_ts, NULL);
1362  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1363  FatalError("Engine unable to disable detect "
1364  "thread - \"%s\". Killing engine",
1365  tv->name);
1366  }
1367 
1369 
1370  /* all receive threads are part of packet processing threads */
1371  tv = tv_root[TVT_PPT];
1372 
1373  /* we do have to keep in mind that TVs are arranged in the order
1374  * right from receive to log. The moment we fail to find a
1375  * receive TM amongst the slots in a tv, it indicates we are done
1376  * with all receive threads */
1377  while (tv) {
1378  int disable = 0;
1379  TmModule *tm = NULL;
1380  /* obtain the slots for this TV */
1381  TmSlot *slots = tv->tm_slots;
1382  while (slots != NULL) {
1383  tm = TmModuleGetById(slots->tm_id);
1384 
1385  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1386  disable = 1;
1387  break;
1388  }
1389 
1390  slots = slots->slot_next;
1391  continue;
1392  }
1393 
1394  if (disable) {
1395  if (ThreadStillHasPackets(tv)) {
1396  /* we wait till we dry out all the inq packets, before we
1397  * kill this thread. Do note that you should have disabled
1398  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1400  /* don't sleep while holding a lock */
1401  SleepMsec(1);
1402  goto again;
1403  }
1404 
1405  if (ThreadBusy(tv)) {
1407 
1408  Packet *p = PacketGetFromAlloc();
1409  if (p != NULL) {
1412  PacketQueue *q = tv->stream_pq;
1413  SCMutexLock(&q->mutex_q);
1414  PacketEnqueue(q, p);
1415  SCCondSignal(&q->cond_q);
1416  SCMutexUnlock(&q->mutex_q);
1417  }
1418 
1419  /* don't sleep while holding a lock */
1420  SleepMsec(1);
1421  goto again;
1422  }
1423 
1424  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1425  if (tm && tm->PktAcqBreakLoop != NULL) {
1426  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1427  }
1429 
1430  if (tv->inq != NULL) {
1431  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1432  SCMutexLock(&tv->inq->pq->mutex_q);
1433  SCCondSignal(&tv->inq->pq->cond_q);
1434  SCMutexUnlock(&tv->inq->pq->mutex_q);
1435  }
1436  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1437  }
1438 
1439  /* wait for it to enter the 'flow loop' stage */
1440  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1442 
1443  SleepMsec(1);
1444  goto again;
1445  }
1446  }
1447 
1448  tv = tv->next;
1449  }
1450 
1452 
1453  /* finally wait for all packet threads to have
1454  * processed all of their 'live' packets so we
1455  * don't process the last live packets together
1456  * with FFR packets */
1457  TmThreadDrainPacketThreads();
1458  return;
1459 }
1460 
1461 #ifdef DEBUG_VALIDATION
1462 static void TmThreadDumpThreads(void);
1463 #endif
1464 
1465 static void TmThreadDebugValidateNoMorePackets(void)
1466 {
1467 #ifdef DEBUG_VALIDATION
1469  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1470  if (ThreadStillHasPackets(tv)) {
1472  TmThreadDumpThreads();
1473  abort();
1474  }
1475  }
1477 #endif
1478 }
1479 
1480 /**
1481  * \brief Disable all packet threads
1482  */
1484 {
1485  struct timeval start_ts;
1486  struct timeval cur_ts;
1487 
1488  /* first drain all packet threads of their packets */
1489  TmThreadDrainPacketThreads();
1490 
1491  /* since all the threads possibly able to produce more packets
1492  * are now gone or inactive, we should see no packets anywhere
1493  * anymore. */
1494  TmThreadDebugValidateNoMorePackets();
1495 
1496  gettimeofday(&start_ts, NULL);
1497 again:
1498  gettimeofday(&cur_ts, NULL);
1499  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1500  FatalError("Engine unable to disable packet "
1501  "threads. Killing engine");
1502  }
1503 
1504  /* loop through the packet threads and kill them */
1506  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1508 
1509  /* separate worker threads (autofp) will still wait at their
1510  * input queues. So nudge them here so they will observe the
1511  * THV_KILL flag. */
1512  if (tv->inq != NULL) {
1513  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1514  SCMutexLock(&tv->inq->pq->mutex_q);
1515  SCCondSignal(&tv->inq->pq->cond_q);
1516  SCMutexUnlock(&tv->inq->pq->mutex_q);
1517  }
1518  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1519  }
1520 
1523 
1524  SleepMsec(1);
1525  goto again;
1526  }
1527  }
1529  return;
1530 }
1531 
1532 #define MIN_WAIT_TIME 100
1533 #define MAX_WAIT_TIME 999999
1535 {
1536  ThreadVars *tv = NULL;
1537  unsigned int sleep_usec = MIN_WAIT_TIME;
1538 
1539  BUG_ON((family < 0) || (family >= TVT_MAX));
1540 
1541 again:
1543  tv = tv_root[family];
1544 
1545  while (tv) {
1546  int r = TmThreadKillThread(tv);
1547  if (r == 0) {
1549  SleepUsec(sleep_usec);
1550  sleep_usec *= 2; /* slowly back off */
1551  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1552  goto again;
1553  }
1554  sleep_usec = MIN_WAIT_TIME; /* reset */
1555 
1556  tv = tv->next;
1557  }
1559 }
1560 #undef MIN_WAIT_TIME
1561 #undef MAX_WAIT_TIME
1562 
1564 {
1565  int i = 0;
1566 
1567  for (i = 0; i < TVT_MAX; i++) {
1569  }
1570 
1571  return;
1572 }
1573 
1574 static void TmThreadFree(ThreadVars *tv)
1575 {
1576  TmSlot *s;
1577  TmSlot *ps;
1578  if (tv == NULL)
1579  return;
1580 
1581  SCLogDebug("Freeing thread '%s'.", tv->name);
1582 
1583  if (tv->flow_queue) {
1584  BUG_ON(tv->flow_queue->qlen != 0);
1585  SCFree(tv->flow_queue);
1586  }
1587 
1589 
1590  TmThreadDeinitMC(tv);
1591 
1592  if (tv->thread_group_name) {
1594  }
1595 
1596  if (tv->printable_name) {
1598  }
1599 
1600  if (tv->stream_pq_local) {
1604  }
1605 
1606  s = (TmSlot *)tv->tm_slots;
1607  while (s) {
1608  ps = s;
1609  s = s->slot_next;
1610  SCFree(ps);
1611  }
1612 
1614  SCFree(tv);
1615 }
1616 
1617 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1618 {
1619  char *thread_group_name = NULL;
1620 
1621  if (name == NULL)
1622  return;
1623 
1624  if (tv == NULL)
1625  return;
1626 
1627  thread_group_name = SCStrdup(name);
1628  if (unlikely(thread_group_name == NULL)) {
1629  SCLogError("error allocating memory");
1630  return;
1631  }
1632  tv->thread_group_name = thread_group_name;
1633 }
1634 
1636 {
1637  ThreadVars *tv = NULL;
1638  ThreadVars *ptv = NULL;
1639 
1640  if ((family < 0) || (family >= TVT_MAX))
1641  return;
1642 
1644  tv = tv_root[family];
1645 
1646  while (tv) {
1647  ptv = tv;
1648  tv = tv->next;
1649  TmThreadFree(ptv);
1650  }
1651  tv_root[family] = NULL;
1653 }
1654 
1655 /**
1656  * \brief Spawns a thread associated with the ThreadVars instance tv
1657  *
1658  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1659  */
1661 {
1662  pthread_attr_t attr;
1663  if (tv->tm_func == NULL) {
1664  FatalError("No thread function set");
1665  }
1666 
1667  /* Initialize and set thread detached attribute */
1668  pthread_attr_init(&attr);
1669 
1670  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1671 
1672  /* Adjust thread stack size if configured */
1674  SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1675  if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1676  FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
1678  }
1679  }
1680 
1681  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1682  if (rc) {
1683  FatalError("Unable to create thread with pthread_create(): retval %d: %s", rc,
1684  strerror(errno));
1685  }
1686 
1687 #if DEBUG && HAVE_PTHREAD_GETATTR_NP
1689  if (pthread_getattr_np(tv->t, &attr) == 0) {
1690  size_t stack_size;
1691  void *stack_addr;
1692  pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1693  SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1694  } else {
1695  SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1696  "pthread_getattr_np() is %" PRId32,
1697  rc);
1698  }
1699  }
1700 #endif
1701 
1703 
1704  TmThreadAppend(tv, tv->type);
1705  return TM_ECODE_OK;
1706 }
1707 
1708 /**
1709  * \brief Initializes the mutex and condition variables for this TV
1710  *
1711  * It can be used by a thread to control a wait loop that can also be
1712  * influenced by other threads.
1713  *
1714  * \param tv Pointer to a TV instance
1715  */
1717 {
1718  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1719  FatalError("Fatal error encountered in TmThreadInitMC. "
1720  "Exiting...");
1721  }
1722 
1723  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1724  printf("Error initializing the tv->m mutex\n");
1725  exit(EXIT_FAILURE);
1726  }
1727 
1728  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1729  FatalError("Fatal error encountered in TmThreadInitMC. "
1730  "Exiting...");
1731  }
1732 
1733  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1734  FatalError("Error initializing the tv->cond condition "
1735  "variable");
1736  }
1737 
1738  return;
1739 }
1740 
1741 static void TmThreadDeinitMC(ThreadVars *tv)
1742 {
1743  if (tv->ctrl_mutex) {
1745  SCFree(tv->ctrl_mutex);
1746  }
1747  if (tv->ctrl_cond) {
1749  SCFree(tv->ctrl_cond);
1750  }
1751  return;
1752 }
1753 
1754 /**
1755  * \brief Tests if the thread represented in the arg has been unpaused or not.
1756  *
1757  * The function would return if the thread tv has been unpaused or if the
1758  * kill flag for the thread has been set.
1759  *
1760  * \param tv Pointer to the TV instance.
1761  */
1763 {
1764  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1765  SleepUsec(100);
1766 
1768  break;
1769  }
1770 
1771  return;
1772 }
1773 
1774 /**
1775  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1776  * the kill flag has been set or not on the thread.
1777  *
1778  * \param tv Pointer to the TV instance.
1779  */
1781 {
1782  while (!TmThreadsCheckFlag(tv, flags)) {
1783  SleepUsec(100);
1784  }
1785 
1786  return;
1787 }
1788 
1789 /**
1790  * \brief Unpauses a thread
1791  *
1792  * \param tv Pointer to a TV instance that has to be unpaused
1793  */
1795 {
1797  return;
1798 }
1799 
1800 /**
1801  * \brief Waits for all threads to be in a running state
1802  *
1803  * \retval TM_ECODE_OK if all are running or error if a thread failed
1804  */
1806 {
1807  uint16_t RX_num = 0;
1808  uint16_t W_num = 0;
1809  uint16_t FM_num = 0;
1810  uint16_t FR_num = 0;
1811  uint16_t TX_num = 0;
1812 
1813  struct timeval start_ts;
1814  struct timeval cur_ts;
1815  gettimeofday(&start_ts, NULL);
1816 
1817 again:
1819  for (int i = 0; i < TVT_MAX; i++) {
1820  ThreadVars *tv = tv_root[i];
1821  while (tv != NULL) {
1824 
1825  SCLogError("thread \"%s\" failed to "
1826  "start: flags %04x",
1827  tv->name, SC_ATOMIC_GET(tv->flags));
1828  return TM_ECODE_FAILED;
1829  }
1830 
1833 
1834  /* 60 seconds provided for the thread to transition from
1835  * THV_INIT_DONE to THV_RUNNING */
1836  gettimeofday(&cur_ts, NULL);
1837  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1838  SCLogError("thread \"%s\" failed to "
1839  "start in time: flags %04x",
1840  tv->name, SC_ATOMIC_GET(tv->flags));
1841  return TM_ECODE_FAILED;
1842  }
1843 
1844  /* sleep a little to give the thread some
1845  * time to start running */
1846  SleepUsec(100);
1847  goto again;
1848  }
1849  tv = tv->next;
1850  }
1851  }
1852  for (int i = 0; i < TVT_MAX; i++) {
1853  for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
1854  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1855  RX_num++;
1856  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1857  W_num++;
1858  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1859  TX_num++;
1860  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1861  FM_num++;
1862  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1863  FR_num++;
1864  }
1865  }
1867 
1868  /* Construct a welcome string displaying
1869  * initialized thread types and counts */
1870  uint16_t app_len = 32;
1871  uint16_t buf_len = 256;
1872 
1873  char append_str[app_len];
1874  char thread_counts[buf_len];
1875 
1876  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1877  if (RX_num > 0) {
1878  snprintf(append_str, app_len, "RX: %u ", RX_num);
1879  strlcat(thread_counts, append_str, buf_len);
1880  }
1881  if (W_num > 0) {
1882  snprintf(append_str, app_len, "W: %u ", W_num);
1883  strlcat(thread_counts, append_str, buf_len);
1884  }
1885  if (TX_num > 0) {
1886  snprintf(append_str, app_len, "TX: %u ", TX_num);
1887  strlcat(thread_counts, append_str, buf_len);
1888  }
1889  if (FM_num > 0) {
1890  snprintf(append_str, app_len, "FM: %u ", FM_num);
1891  strlcat(thread_counts, append_str, buf_len);
1892  }
1893  if (FR_num > 0) {
1894  snprintf(append_str, app_len, "FR: %u ", FR_num);
1895  strlcat(thread_counts, append_str, buf_len);
1896  }
1897  snprintf(append_str, app_len, " Engine started.");
1898  strlcat(thread_counts, append_str, buf_len);
1899  SCLogNotice("%s", thread_counts);
1900 
1901  return TM_ECODE_OK;
1902 }
1903 
1904 /**
1905  * \brief Unpauses all threads present in tv_root
1906  */
1908 {
1910  for (int i = 0; i < TVT_MAX; i++) {
1911  ThreadVars *tv = tv_root[i];
1912  while (tv != NULL) {
1914  tv = tv->next;
1915  }
1916  }
1918  return;
1919 }
1920 
1921 /**
1922  * \brief Used to check the thread for certain conditions of failure.
1923  */
1925 {
1927  for (int i = 0; i < TVT_MAX; i++) {
1928  ThreadVars *tv = tv_root[i];
1929  while (tv) {
1931  FatalError("thread %s failed", tv->name);
1932  }
1933  tv = tv->next;
1934  }
1935  }
1937  return;
1938 }
1939 
1940 /**
1941  * \brief Used to check if all threads have finished their initialization. On
1942  * finding an un-initialized thread, it waits till that thread completes
1943  * its initialization, before proceeding to the next thread.
1944  *
1945  * \retval TM_ECODE_OK all initialized properly
1946  * \retval TM_ECODE_FAILED failure
1947  */
1949 {
1950  struct timeval start_ts;
1951  struct timeval cur_ts;
1952  gettimeofday(&start_ts, NULL);
1953 
1954 again:
1956  for (int i = 0; i < TVT_MAX; i++) {
1957  ThreadVars *tv = tv_root[i];
1958  while (tv != NULL) {
1961 
1962  SCLogError("thread \"%s\" failed to "
1963  "initialize: flags %04x",
1964  tv->name, SC_ATOMIC_GET(tv->flags));
1965  return TM_ECODE_FAILED;
1966  }
1967 
1968  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
1970 
1971  gettimeofday(&cur_ts, NULL);
1972  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
1973  SCLogError("thread \"%s\" failed to "
1974  "initialize in time: flags %04x",
1975  tv->name, SC_ATOMIC_GET(tv->flags));
1976  return TM_ECODE_FAILED;
1977  }
1978 
1979  /* sleep a little to give the thread some
1980  * time to finish initialization */
1981  SleepUsec(100);
1982  goto again;
1983  }
1984 
1987  SCLogError("thread \"%s\" failed to "
1988  "initialize.",
1989  tv->name);
1990  return TM_ECODE_FAILED;
1991  }
1994  SCLogError("thread \"%s\" closed on "
1995  "initialization.",
1996  tv->name);
1997  return TM_ECODE_FAILED;
1998  }
1999 
2000  tv = tv->next;
2001  }
2002  }
2004 
2005  return TM_ECODE_OK;
2006 }
2007 
2008 /**
2009  * \brief returns a count of all the threads that match the flag
2010  */
2012 {
2013  uint32_t cnt = 0;
2015  for (int i = 0; i < TVT_MAX; i++) {
2016  ThreadVars *tv = tv_root[i];
2017  while (tv != NULL) {
2018  if ((tv->tmm_flags & flags) == flags)
2019  cnt++;
2020 
2021  tv = tv->next;
2022  }
2023  }
2025  return cnt;
2026 }
2027 
2028 #ifdef DEBUG_VALIDATION
2029 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2030 {
2031  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2033  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2034  tv, s, s->tm_id, m->name);
2035  }
2036 }
2037 
2038 static void TmThreadDumpThreads(void)
2039 {
2041  for (int i = 0; i < TVT_MAX; i++) {
2042  ThreadVars *tv = tv_root[i];
2043  while (tv != NULL) {
2044  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2045  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2046  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2047  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2048  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2049  } else if (tv->stream_pq_local != NULL) {
2050  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2051  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2052  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2053  }
2054  }
2055  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2056  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2057  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2058  }
2059  TmThreadDoDumpSlots(tv);
2060  tv = tv->next;
2061  }
2062  }
2065 }
2066 #endif
2067 
2068 typedef struct Thread_ {
2069  ThreadVars *tv; /**< threadvars structure */
2070  const char *name;
2071  int type;
2072  int in_use; /**< bool to indicate this is in use */
2073 
2074  SCTime_t pktts; /**< current packet time of this thread
2075  * (offline mode) */
2076  uint32_t sys_sec_stamp; /**< timestamp in seconds of the real system
2077  * time when the pktts was last updated. */
2079 
2080 typedef struct Threads_ {
2085 
2086 static Threads thread_store = { NULL, 0, 0 };
2087 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2088 
2090 {
2091  SCMutexLock(&thread_store_lock);
2092  for (size_t s = 0; s < thread_store.threads_size; s++) {
2093  Thread *t = &thread_store.threads[s];
2094  if (t == NULL || t->in_use == 0)
2095  continue;
2096 
2097  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2098  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2099  if (t->tv) {
2100  ThreadVars *tv = t->tv;
2101  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2102  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2103  tv, tv->type, tv->name, tv->tmm_flags, flags);
2104  }
2105  }
2106  SCMutexUnlock(&thread_store_lock);
2107 }
2108 
2109 #define STEP 32
2110 /**
2111  * \retval id thread id, or 0 if not found
2112  */
2114 {
2115  SCMutexLock(&thread_store_lock);
2116  if (thread_store.threads == NULL) {
2117  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2118  BUG_ON(thread_store.threads == NULL);
2119  thread_store.threads_size = STEP;
2120  }
2121 
2122  size_t s;
2123  for (s = 0; s < thread_store.threads_size; s++) {
2124  if (thread_store.threads[s].in_use == 0) {
2125  Thread *t = &thread_store.threads[s];
2126  t->name = tv->name;
2127  t->type = type;
2128  t->tv = tv;
2129  t->in_use = 1;
2130 
2131  SCMutexUnlock(&thread_store_lock);
2132  return (int)(s+1);
2133  }
2134  }
2135 
2136  /* if we get here the array is completely filled */
2137  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2138  BUG_ON(newmem == NULL);
2139  thread_store.threads = newmem;
2140  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2141 
2142  Thread *t = &thread_store.threads[thread_store.threads_size];
2143  t->name = tv->name;
2144  t->type = type;
2145  t->tv = tv;
2146  t->in_use = 1;
2147 
2148  s = thread_store.threads_size;
2149  thread_store.threads_size += STEP;
2150 
2151  SCMutexUnlock(&thread_store_lock);
2152  return (int)(s+1);
2153 }
2154 #undef STEP
2155 
2156 void TmThreadsUnregisterThread(const int id)
2157 {
2158  SCMutexLock(&thread_store_lock);
2159  if (id <= 0 || id > (int)thread_store.threads_size) {
2160  SCMutexUnlock(&thread_store_lock);
2161  return;
2162  }
2163 
2164  /* id is one higher than index */
2165  int idx = id - 1;
2166 
2167  /* reset thread_id, which serves as clearing the record */
2168  thread_store.threads[idx].in_use = 0;
2169 
2170  /* check if we have at least one registered thread left */
2171  size_t s;
2172  for (s = 0; s < thread_store.threads_size; s++) {
2173  Thread *t = &thread_store.threads[s];
2174  if (t->in_use == 1) {
2175  goto end;
2176  }
2177  }
2178 
2179  /* if we get here no threads are registered */
2180  SCFree(thread_store.threads);
2181  thread_store.threads = NULL;
2182  thread_store.threads_size = 0;
2183  thread_store.threads_cnt = 0;
2184 
2185 end:
2186  SCMutexUnlock(&thread_store_lock);
2187 }
2188 
2189 void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2190 {
2191  SCMutexLock(&thread_store_lock);
2192  if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2193  SCMutexUnlock(&thread_store_lock);
2194  return;
2195  }
2196 
2197  int idx = id - 1;
2198  Thread *t = &thread_store.threads[idx];
2199  t->pktts = ts;
2200  struct timeval systs;
2201  gettimeofday(&systs, NULL);
2202  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2203  SCMutexUnlock(&thread_store_lock);
2204 }
2205 
2207 {
2208  bool ready = true;
2209  SCMutexLock(&thread_store_lock);
2210  for (size_t s = 0; s < thread_store.threads_size; s++) {
2211  Thread *t = &thread_store.threads[s];
2212  if (!t->in_use)
2213  break;
2214  if (t->sys_sec_stamp == 0) {
2215  ready = false;
2216  break;
2217  }
2218  }
2219  SCMutexUnlock(&thread_store_lock);
2220  return ready;
2221 }
2222 
2224 {
2225  struct timeval systs;
2226  gettimeofday(&systs, NULL);
2227  SCMutexLock(&thread_store_lock);
2228  for (size_t s = 0; s < thread_store.threads_size; s++) {
2229  Thread *t = &thread_store.threads[s];
2230  if (!t->in_use)
2231  break;
2232  t->pktts = ts;
2233  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2234  }
2235  SCMutexUnlock(&thread_store_lock);
2236 }
2237 
2238 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2239 {
2240  struct timeval local = { 0 };
2241  static struct timeval nullts;
2242  bool set = false;
2243  size_t s;
2244  struct timeval systs;
2245  gettimeofday(&systs, NULL);
2246 
2247  SCMutexLock(&thread_store_lock);
2248  for (s = 0; s < thread_store.threads_size; s++) {
2249  Thread *t = &thread_store.threads[s];
2250  if (t->in_use == 0)
2251  break;
2252  struct timeval pkttv = { .tv_sec = SCTIME_SECS(t->pktts),
2253  .tv_usec = SCTIME_USECS(t->pktts) };
2254  if (!(timercmp(&pkttv, &nullts, ==))) {
2255  /* ignore sleeping threads */
2256  if (t->sys_sec_stamp + 1 < (uint32_t)systs.tv_sec)
2257  continue;
2258 
2259  if (!set) {
2260  SCTIME_TO_TIMEVAL(&local, t->pktts);
2261  set = true;
2262  } else {
2263  if (SCTIME_CMP_LT(t->pktts, SCTIME_FROM_TIMEVAL(&local))) {
2264  SCTIME_TO_TIMEVAL(&local, t->pktts);
2265  }
2266  }
2267  }
2268  }
2269  SCMutexUnlock(&thread_store_lock);
2270  *ts = local;
2271  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2272 }
2273 
2275 {
2276  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2277  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2278  /* always create at least one thread */
2279  if (thread_max == 0)
2280  thread_max = ncpus * threading_detect_ratio;
2281  if (thread_max < 1)
2282  thread_max = 1;
2283  if (thread_max > 1024) {
2284  SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2285  thread_max = 1024;
2286  }
2287  return (uint16_t)thread_max;
2288 }
2289 
2290 /** \brief inject a flow into a threads flow queue
2291  */
2292 void TmThreadsInjectFlowById(Flow *f, const int id)
2293 {
2294  BUG_ON(id <= 0 || id > (int)thread_store.threads_size);
2295 
2296  int idx = id - 1;
2297 
2298  Thread *t = &thread_store.threads[idx];
2299  ThreadVars *tv = t->tv;
2300 
2301  BUG_ON(tv == NULL || tv->flow_queue == NULL);
2302 
2303  FlowEnqueue(tv->flow_queue, f);
2304 
2305  /* wake up listening thread(s) if necessary */
2306  if (tv->inq != NULL) {
2307  SCMutexLock(&tv->inq->pq->mutex_q);
2308  SCCondSignal(&tv->inq->pq->cond_q);
2309  SCMutexUnlock(&tv->inq->pq->mutex_q);
2310  } else if (tv->break_loop) {
2311  TmThreadsCaptureBreakLoop(tv);
2312  }
2313 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:81
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:807
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:74
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:76
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:70
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:134
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2206
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1716
tm-threads.h
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:70
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
THV_USE
#define THV_USE
Definition: threadvars.h:35
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:131
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1660
Threads
struct Threads_ Threads
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1093
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
threading_set_cpu_affinity
bool threading_set_cpu_affinity
Definition: runmodes.c:75
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:848
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:83
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:314
TmThreadContinueThreads
void TmThreadContinueThreads(void)
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1907
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1036
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:379
TmThreadSetGroupName
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1617
CaptureStatsSetup
void CaptureStatsSetup(ThreadVars *tv)
Definition: decode.c:979
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:104
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:47
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
PKT_SRC_SHUTDOWN_FLUSH
@ PKT_SRC_SHUTDOWN_FLUSH
Definition: decode.h:67
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:99
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1780
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:175
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:56
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:44
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:86
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:65
Packet_::flags
uint32_t flags
Definition: decode.h:474
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:75
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:351
ThreadVars_::t
pthread_t t
Definition: threadvars.h:58
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:303
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:84
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
Thread_::pktts
SCTime_t pktts
Definition: tm-threads.c:2074
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2011
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:103
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:79
StatsSetupPrivate
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1224
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:116
MIN
#define MIN(x, y)
Definition: suricata-common.h:391
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:80
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:73
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1353
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:387
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:121
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:759
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:89
m
SCMutex m
Definition: flow-hash.h:6
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:44
TmModule_::ThreadBusy
bool(* ThreadBusy)(ThreadVars *tv, void *thread_data)
Definition: tm-modules.h:64
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:141
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:317
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:85
rww_lock_contention
thread_local uint64_t rww_lock_contention
Threads_::threads
Thread * threads
Definition: tm-threads.c:2081
PacketQueueNoLock_
simple fifo queue for packets
Definition: packet-queue.h:34
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:173
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:92
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2109
Thread_::in_use
int in_use
Definition: tm-threads.c:2072
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:37
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(void)
Disable all packet threads.
Definition: tm-threads.c:1483
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:55
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:61
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:246
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:84
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2292
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:80
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2238
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmThreadsProcessDecodePseudoPackets
TmEcode TmThreadsProcessDecodePseudoPackets(ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
Definition: tm-threads.c:112
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:50
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:53
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:45
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1078
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:107
util-signal.h
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:78
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:770
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1532
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2069
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1794
AffinityGetNextCPU
uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
Definition: util-affinity.c:283
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:57
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:1948
Threads_::threads_cnt
int threads_cnt
Definition: tm-threads.c:2083
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:58
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:77
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:95
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:208
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:58
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
threading_set_stack_size
uint64_t threading_set_stack_size
Definition: runmodes.c:76
ThreadVars_::perf_public_ctx
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:127
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1024
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
Definition: tm-threads.c:2189
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
util-affinity.h
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:79
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1762
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:66
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:53
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1635
THV_KILL
#define THV_KILL
Definition: threadvars.h:39
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:91
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:907
FlowQueueNew
FlowQueue * FlowQueueNew(void)
Definition: flow-queue.c:35
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:820
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2156
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2113
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
ThreadVars_::id
int id
Definition: threadvars.h:86
ThreadVars_::type
uint8_t type
Definition: threadvars.h:71
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:300
SCTIME_TO_TIMEVAL
#define SCTIME_TO_TIMEVAL(tv, t)
Definition: util-time.h:97
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:99
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:83
SCReturn
#define SCReturn
Definition: util-debug.h:273
stream.h
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:376
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1563
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:139
Packet_
Definition: decode.h:437
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
type
uint16_t type
Definition: decode-vlan.c:107
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:56
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:132
ThreadVars_::thread_group_name
char * thread_group_name
Definition: threadvars.h:66
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:79
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1533
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:74
TmSlot_
Definition: tm-threads.h:53
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:67
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:68
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:83
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
runmodes.h
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1065
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
TmModule_
Definition: tm-modules.h:44
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:255
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
Definition: tm-threads.c:2223
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:115
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
SCTIME_CMP_LT
#define SCTIME_CMP_LT(a, b)
Definition: util-time.h:105
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:38
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:36
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:642
StatsPublicThreadContext_::m
SCMutex m
Definition: counters.h:75
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:378
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1534
Thread_::sys_sec_stamp
uint32_t sys_sec_stamp
Definition: tm-threads.c:2076
Thread_::type
int type
Definition: tm-threads.c:2071
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
Thread
struct Thread_ Thread
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:89
Packet_::flow
struct Flow_ * flow
Definition: decode.h:476
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1152
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:74
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:53
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:816
Threads_
Definition: tm-threads.c:2080
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:35
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:34
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:105
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:40
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:135
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:100
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:832
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
SCTIME_SECS
#define SCTIME_SECS(t)
Definition: util-time.h:57
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:72
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:48
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:77
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
FatalError
#define FatalError(...)
Definition: util-debug.h:502
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax(void)
Definition: tm-threads.c:2274
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:65
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1126
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
StatsSyncCounters
void StatsSyncCounters(ThreadVars *tv)
Definition: counters.c:456
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:142
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:49
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:216
threadvars.h
util-validate.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:229
ThreadsAffinityType_
Definition: util-affinity.h:65
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:37
THV_KILL_PKTACQ
#define THV_KILL_PKTACQ
Definition: threadvars.h:46
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:616
TmSlot_::tm_flags
uint8_t tm_flags
Definition: tm-threads.h:67
Thread_::name
const char * name
Definition: tm-threads.c:2070
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:131
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:50
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:90
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:74
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:89
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:94
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:82
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:73
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:317
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:83
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:55
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:88
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:111
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:93
suricata.h
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:457
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:140
Tmq_
Definition: tm-queues.h:29
TmThreadWaitOnThreadRunning
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
Definition: tm-threads.c:1805
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2089
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1924
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:131
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:82
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:69
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:108
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:277
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:91
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:90
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:41
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2068
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:275
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
Threads_::threads_size
size_t threads_size
Definition: tm-threads.c:2082
StatsThreadCleanup
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1323
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:62
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:264
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:103
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:77
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:973
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:359
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:172
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:350
SCTIME_USECS
#define SCTIME_USECS(t)
Definition: util-time.h:56
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:53