suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2022 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-affinity.h"
39 #include "util-debug.h"
40 #include "util-privs.h"
41 #include "util-cpu.h"
42 #include "util-optimize.h"
43 #include "util-profiling.h"
44 #include "util-signal.h"
45 #include "queue.h"
46 #include "util-validate.h"
47 
48 #ifdef PROFILE_LOCKING
49 thread_local uint64_t mutex_lock_contention;
50 thread_local uint64_t mutex_lock_wait_ticks;
51 thread_local uint64_t mutex_lock_cnt;
52 
53 thread_local uint64_t spin_lock_contention;
54 thread_local uint64_t spin_lock_wait_ticks;
55 thread_local uint64_t spin_lock_cnt;
56 
57 thread_local uint64_t rww_lock_contention;
58 thread_local uint64_t rww_lock_wait_ticks;
59 thread_local uint64_t rww_lock_cnt;
60 
61 thread_local uint64_t rwr_lock_contention;
62 thread_local uint64_t rwr_lock_wait_ticks;
63 thread_local uint64_t rwr_lock_cnt;
64 #endif
65 
66 #ifdef OS_FREEBSD
67 #include <sched.h>
68 #include <sys/param.h>
69 #include <sys/resource.h>
70 #include <sys/cpuset.h>
71 #include <sys/thr.h>
72 #define cpu_set_t cpuset_t
73 #endif /* OS_FREEBSD */
74 
75 /* prototypes */
76 static int SetCPUAffinity(uint16_t cpu);
77 static void TmThreadDeinitMC(ThreadVars *tv);
78 
79 /* root of the threadvars list */
80 ThreadVars *tv_root[TVT_MAX] = { NULL };
81 
82 /* lock to protect tv_root */
84 
85 /**
86  * \brief Check if a thread flag is set.
87  *
88  * \retval 1 flag is set.
89  * \retval 0 flag is not set.
90  */
91 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
92 {
93  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
94 }
95 
96 /**
97  * \brief Set a thread flag.
98  */
99 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
100 {
101  SC_ATOMIC_OR(tv->flags, flag);
102 }
103 
104 /**
105  * \brief Unset a thread flag.
106  */
107 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
108 {
109  SC_ATOMIC_AND(tv->flags, ~flag);
110 }
111 
112 /**
113  * \brief Separate run function so we can call it recursively.
114  */
116 {
117  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
118  PACKET_PROFILING_TMM_START(p, s->tm_id);
119  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
120  PACKET_PROFILING_TMM_END(p, s->tm_id);
121  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
122 
123  /* handle error */
124  if (unlikely(r == TM_ECODE_FAILED)) {
125  /* Encountered error. Return packets to packetpool and return */
126  TmThreadsSlotProcessPktFail(tv, s, NULL);
127  return TM_ECODE_FAILED;
128  }
129 
130  /* handle new pseudo packets immediately */
131  while (tv->decode_pq.top != NULL) {
132  Packet *extra_p = PacketDequeueNoLock(&tv->decode_pq);
133  if (unlikely(extra_p == NULL))
134  continue;
135  DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
136 
137  /* see if we need to process the packet */
138  if (s->slot_next != NULL) {
139  r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
140  if (unlikely(r == TM_ECODE_FAILED)) {
141  TmThreadsSlotProcessPktFail(tv, s, extra_p);
142  return TM_ECODE_FAILED;
143  }
144  }
145  tv->tmqh_out(tv, extra_p);
146  }
147  }
148 
149  return TM_ECODE_OK;
150 }
151 
152 /** \internal
153  *
154  * \brief Process flow timeout packets
155  *
156  * Process flow timeout pseudo packets. During shutdown this loop
157  * is run until the flow engine kills the thread and the queue is
158  * empty.
159  */
160 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
161 {
162  TmSlot *fw_slot = tv->tm_flowworker;
163  int r = TM_ECODE_OK;
164 
165  if (tv->stream_pq == NULL || fw_slot == NULL) {
166  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
167  return r;
168  }
169 
170  SCLogDebug("flow end loop starting");
171  while (1) {
173  uint32_t len = tv->stream_pq->len;
175  if (len > 0) {
176  while (len--) {
180  if (likely(p)) {
181  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
182  r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
183  if (r == TM_ECODE_FAILED) {
184  break;
185  }
186  }
187  }
188  } else {
190  break;
191  }
192  SleepUsec(1);
193  }
194  }
195  SCLogDebug("flow end loop complete");
197 
198  return r;
199 }
200 
201 /*
202 
203  pcap/nfq
204 
205  pkt read
206  callback
207  process_pkt
208 
209  pfring
210 
211  pkt read
212  process_pkt
213 
214  slot:
215  setup
216 
217  pkt_ack_loop(tv, slot_data)
218 
219  deinit
220 
221  process_pkt:
222  while(s)
223  run s;
224  queue;
225 
226  */
227 
228 static void *TmThreadsSlotPktAcqLoop(void *td)
229 {
230  ThreadVars *tv = (ThreadVars *)td;
231  TmSlot *s = tv->tm_slots;
232  char run = 1;
233  TmEcode r = TM_ECODE_OK;
234  TmSlot *slot = NULL;
235 
237 
238  if (tv->thread_setup_flags != 0)
240 
241  /* Drop the capabilities for this thread */
242  SCDropCaps(tv);
243 
244  PacketPoolInit();
245 
246  /* check if we are setup properly */
247  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
248  SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
249  " PktAcqLoop=%p, tmqh_in=%p,"
250  " tmqh_out=%p",
251  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
253  pthread_exit((void *) -1);
254  return NULL;
255  }
256 
257  for (slot = s; slot != NULL; slot = slot->slot_next) {
258  if (slot->SlotThreadInit != NULL) {
259  void *slot_data = NULL;
260  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
261  if (r != TM_ECODE_OK) {
262  if (r == TM_ECODE_DONE) {
263  EngineDone();
265  goto error;
266  } else {
268  goto error;
269  }
270  }
271  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
272  }
273 
274  /* if the flowworker module is the first, get the threads input queue */
275  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
276  tv->stream_pq = tv->inq->pq;
277  tv->tm_flowworker = slot;
278  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
280  if (tv->flow_queue == NULL) {
282  pthread_exit((void *) -1);
283  return NULL;
284  }
285  /* setup a queue */
286  } else if (slot->tm_id == TMM_FLOWWORKER) {
287  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
288  if (tv->stream_pq_local == NULL)
289  FatalError("failed to alloc PacketQueue");
292  tv->tm_flowworker = slot;
293  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
295  if (tv->flow_queue == NULL) {
297  pthread_exit((void *) -1);
298  return NULL;
299  }
300  }
301  }
302 
304 
306 
307  while(run) {
312  }
313 
314  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
315 
316  if (r == TM_ECODE_FAILED) {
318  run = 0;
319  }
321  run = 0;
322  }
323  if (r == TM_ECODE_DONE) {
324  run = 0;
325  }
326  }
328 
330 
331  /* process all pseudo packets the flow timeout may throw at us */
332  TmThreadTimeoutLoop(tv, s);
333 
336 
338 
339  for (slot = s; slot != NULL; slot = slot->slot_next) {
340  if (slot->SlotThreadExitPrintStats != NULL) {
341  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
342  }
343 
344  if (slot->SlotThreadDeinit != NULL) {
345  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
346  if (r != TM_ECODE_OK) {
348  goto error;
349  }
350  }
351  }
352 
353  tv->stream_pq = NULL;
354  SCLogDebug("%s ending", tv->name);
356  pthread_exit((void *) 0);
357  return NULL;
358 
359 error:
360  tv->stream_pq = NULL;
361  pthread_exit((void *) -1);
362  return NULL;
363 }
364 
365 static void *TmThreadsSlotVar(void *td)
366 {
367  ThreadVars *tv = (ThreadVars *)td;
368  TmSlot *s = (TmSlot *)tv->tm_slots;
369  Packet *p = NULL;
370  char run = 1;
371  TmEcode r = TM_ECODE_OK;
372 
373  PacketPoolInit();//Empty();
374 
376 
377  if (tv->thread_setup_flags != 0)
379 
380  /* Drop the capabilities for this thread */
381  SCDropCaps(tv);
382 
383  /* check if we are setup properly */
384  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
386  pthread_exit((void *) -1);
387  return NULL;
388  }
389 
390  for (; s != NULL; s = s->slot_next) {
391  if (s->SlotThreadInit != NULL) {
392  void *slot_data = NULL;
393  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
394  if (r != TM_ECODE_OK) {
396  goto error;
397  }
398  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
399  }
400 
401  /* special case: we need to access the stream queue
402  * from the flow timeout code */
403 
404  /* if the flowworker module is the first, get the threads input queue */
405  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
406  tv->stream_pq = tv->inq->pq;
407  tv->tm_flowworker = s;
408  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
410  if (tv->flow_queue == NULL) {
412  pthread_exit((void *) -1);
413  return NULL;
414  }
415  /* setup a queue */
416  } else if (s->tm_id == TMM_FLOWWORKER) {
417  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
418  if (tv->stream_pq_local == NULL)
419  FatalError("failed to alloc PacketQueue");
422  tv->tm_flowworker = s;
423  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
425  if (tv->flow_queue == NULL) {
427  pthread_exit((void *) -1);
428  return NULL;
429  }
430  }
431  }
432 
434 
435  // Each 'worker' thread uses this func to process/decode the packet read.
436  // Each decode method is different to receive methods in that they do not
437  // enter infinite loops. They use this as the core loop. As a result, at this
438  // point the worker threads can be considered both initialized and running.
440 
441  s = (TmSlot *)tv->tm_slots;
442 
443  while (run) {
448  }
449 
450  /* input a packet */
451  p = tv->tmqh_in(tv);
452 
453  /* if we didn't get a packet see if we need to do some housekeeping */
454  if (unlikely(p == NULL)) {
455  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
457  if (p != NULL) {
460  }
461  }
462  }
463 
464  if (p != NULL) {
465  /* run the thread module(s) */
466  r = TmThreadsSlotVarRun(tv, p, s);
467  if (r == TM_ECODE_FAILED) {
470  break;
471  }
472 
473  /* output the packet */
474  tv->tmqh_out(tv, p);
475 
476  /* now handle the stream pq packets */
477  TmThreadsHandleInjectedPackets(tv);
478  }
479 
481  run = 0;
482  }
483  } /* while (run) */
485 
488 
490 
491  s = (TmSlot *)tv->tm_slots;
492 
493  for ( ; s != NULL; s = s->slot_next) {
494  if (s->SlotThreadExitPrintStats != NULL) {
495  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
496  }
497 
498  if (s->SlotThreadDeinit != NULL) {
499  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
500  if (r != TM_ECODE_OK) {
502  goto error;
503  }
504  }
505  }
506 
507  SCLogDebug("%s ending", tv->name);
508  tv->stream_pq = NULL;
510  pthread_exit((void *) 0);
511  return NULL;
512 
513 error:
514  tv->stream_pq = NULL;
515  pthread_exit((void *) -1);
516  return NULL;
517 }
518 
519 static void *TmThreadsManagement(void *td)
520 {
521  ThreadVars *tv = (ThreadVars *)td;
522  TmSlot *s = (TmSlot *)tv->tm_slots;
523  TmEcode r = TM_ECODE_OK;
524 
525  BUG_ON(s == NULL);
526 
528 
529  if (tv->thread_setup_flags != 0)
531 
532  /* Drop the capabilities for this thread */
533  SCDropCaps(tv);
534 
535  SCLogDebug("%s starting", tv->name);
536 
537  if (s->SlotThreadInit != NULL) {
538  void *slot_data = NULL;
539  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
540  if (r != TM_ECODE_OK) {
542  pthread_exit((void *) -1);
543  return NULL;
544  }
545  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
546  }
547 
549 
551 
552  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
553  /* handle error */
554  if (r == TM_ECODE_FAILED) {
556  }
557 
560  }
561 
564 
565  if (s->SlotThreadExitPrintStats != NULL) {
566  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
567  }
568 
569  if (s->SlotThreadDeinit != NULL) {
570  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
571  if (r != TM_ECODE_OK) {
573  pthread_exit((void *) -1);
574  return NULL;
575  }
576  }
577 
579  pthread_exit((void *) 0);
580  return NULL;
581 }
582 
583 /**
584  * \brief We set the slot functions.
585  *
586  * \param tv Pointer to the TV to set the slot function for.
587  * \param name Name of the slot variant.
588  * \param fn_p Pointer to a custom slot function. Used only if slot variant
589  * "name" is "custom".
590  *
591  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
592  */
593 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
594 {
595  if (name == NULL) {
596  if (fn_p == NULL) {
597  printf("Both slot name and function pointer can't be NULL inside "
598  "TmThreadSetSlots\n");
599  goto error;
600  } else {
601  name = "custom";
602  }
603  }
604 
605  if (strcmp(name, "varslot") == 0) {
606  tv->tm_func = TmThreadsSlotVar;
607  } else if (strcmp(name, "pktacqloop") == 0) {
608  tv->tm_func = TmThreadsSlotPktAcqLoop;
609  } else if (strcmp(name, "management") == 0) {
610  tv->tm_func = TmThreadsManagement;
611  } else if (strcmp(name, "command") == 0) {
612  tv->tm_func = TmThreadsManagement;
613  } else if (strcmp(name, "custom") == 0) {
614  if (fn_p == NULL)
615  goto error;
616  tv->tm_func = fn_p;
617  } else {
618  printf("Error: Slot \"%s\" not supported\n", name);
619  goto error;
620  }
621 
622  return TM_ECODE_OK;
623 
624 error:
625  return TM_ECODE_FAILED;
626 }
627 
628 /**
629  * \brief Appends a new entry to the slots.
630  *
631  * \param tv TV the slot is attached to.
632  * \param tm TM to append.
633  * \param data Data to be passed on to the slot init function.
634  *
635  * \retval The allocated TmSlot or NULL if there is an error
636  */
637 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
638 {
639  TmSlot *slot = SCMalloc(sizeof(TmSlot));
640  if (unlikely(slot == NULL))
641  return;
642  memset(slot, 0, sizeof(TmSlot));
643  SC_ATOMIC_INITPTR(slot->slot_data);
644  slot->SlotThreadInit = tm->ThreadInit;
645  slot->slot_initdata = data;
646  if (tm->Func) {
647  slot->SlotFunc = tm->Func;
648  } else if (tm->PktAcqLoop) {
649  slot->PktAcqLoop = tm->PktAcqLoop;
650  if (tm->PktAcqBreakLoop) {
651  tv->break_loop = true;
652  }
653  } else if (tm->Management) {
654  slot->Management = tm->Management;
655  }
657  slot->SlotThreadDeinit = tm->ThreadDeinit;
658  /* we don't have to check for the return value "-1". We wouldn't have
659  * received a TM as arg, if it didn't exist */
660  slot->tm_id = TmModuleGetIDForTM(tm);
661 
662  tv->tmm_flags |= tm->flags;
663  tv->cap_flags |= tm->cap_flags;
664 
665  if (tv->tm_slots == NULL) {
666  tv->tm_slots = slot;
667  } else {
668  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
669 
670  /* get the last slot */
671  for ( ; a != NULL; a = a->slot_next) {
672  b = a;
673  }
674  /* append the new slot */
675  if (b != NULL) {
676  b->slot_next = slot;
677  }
678  }
679  return;
680 }
681 
682 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
683 static int SetCPUAffinitySet(cpu_set_t *cs)
684 {
685 #if defined OS_FREEBSD
686  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
687  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
688 #elif OS_DARWIN
689  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
690  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
691 #else
692  pid_t tid = syscall(SYS_gettid);
693  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
694 #endif /* OS_FREEBSD */
695 
696  if (r != 0) {
697  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
698  strerror(errno));
699  return -1;
700  }
701 
702  return 0;
703 }
704 #endif
705 
706 
707 /**
708  * \brief Set the thread affinity on the calling thread.
709  *
710  * \param cpuid Id of the core/cpu to setup the affinity.
711  *
712  * \retval 0 If all goes well; -1 if something is wrong.
713  */
714 static int SetCPUAffinity(uint16_t cpuid)
715 {
716 #if defined __OpenBSD__ || defined sun
717  return 0;
718 #else
719  int cpu = (int)cpuid;
720 
721 #if defined OS_WIN32 || defined __CYGWIN__
722  DWORD cs = 1 << cpu;
723 
724  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
725  if (r != 0) {
726  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
727  strerror(errno));
728  return -1;
729  }
730  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
731  SCGetThreadIdLong(), cpu);
732 
733  return 0;
734 
735 #else
736  cpu_set_t cs;
737  memset(&cs, 0, sizeof(cs));
738 
739  CPU_ZERO(&cs);
740  CPU_SET(cpu, &cs);
741  return SetCPUAffinitySet(&cs);
742 #endif /* windows */
743 #endif /* not supported */
744 }
745 
746 
747 /**
748  * \brief Set the thread options (thread priority).
749  *
750  * \param tv Pointer to the ThreadVars to setup the thread priority.
751  *
752  * \retval TM_ECODE_OK.
753  */
755 {
757  tv->thread_priority = prio;
758 
759  return TM_ECODE_OK;
760 }
761 
762 /**
763  * \brief Adjusting nice value for threads.
764  */
766 {
767  SCEnter();
768 #ifndef __CYGWIN__
769 #ifdef OS_WIN32
770  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
771  SCLogError("Error setting priority for "
772  "thread %s: %s",
773  tv->name, strerror(errno));
774  } else {
775  SCLogDebug("Priority set to %"PRId32" for thread %s",
777  }
778 #else
779  int ret = nice(tv->thread_priority);
780  if (ret == -1) {
781  SCLogError("Error setting nice value %d "
782  "for thread %s: %s",
783  tv->thread_priority, tv->name, strerror(errno));
784  } else {
785  SCLogDebug("Nice value set to %"PRId32" for thread %s",
787  }
788 #endif /* OS_WIN32 */
789 #endif
790  SCReturn;
791 }
792 
793 
794 /**
795  * \brief Set the thread options (cpu affinity).
796  *
797  * \param tv pointer to the ThreadVars to setup the affinity.
798  * \param cpu cpu on which affinity is set.
799  *
800  * \retval TM_ECODE_OK
801  */
803 {
805  tv->cpu_affinity = cpu;
806 
807  return TM_ECODE_OK;
808 }
809 
810 
812 {
814  return TM_ECODE_OK;
815 
816  if (type > MAX_CPU_SET) {
817  SCLogError("invalid cpu type family");
818  return TM_ECODE_FAILED;
819  }
820 
822  tv->cpu_affinity = type;
823 
824  return TM_ECODE_OK;
825 }
826 
828 {
829  if (type >= MAX_CPU_SET) {
830  SCLogError("invalid cpu type family");
831  return 0;
832  }
833 
835 }
836 
837 /**
838  * \brief Set the thread options (cpu affinitythread).
839  * Priority should be already set by pthread_create.
840  *
841  * \param tv pointer to the ThreadVars of the calling thread.
842  */
844 {
846  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
847  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
849  SetCPUAffinity(tv->cpu_affinity);
850  }
851 
852 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
857  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
858  uint16_t cpu = AffinityGetNextCPU(taf);
859  SetCPUAffinity(cpu);
860  /* If CPU is in a set overwrite the default thread prio */
861  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
863  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
865  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
867  } else {
868  tv->thread_priority = taf->prio;
869  }
870  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
871  "%d, thread id %lu", tv->thread_priority,
872  tv->name, cpu, SCGetThreadIdLong());
873  } else {
874  SetCPUAffinitySet(&taf->cpu_set);
875  tv->thread_priority = taf->prio;
876  SCLogPerf("Setting prio %d for thread \"%s\", "
877  "thread id %lu", tv->thread_priority,
879  }
881  }
882 #endif
883 
884  return TM_ECODE_OK;
885 }
886 
887 /**
888  * \brief Creates and returns the TV instance for a new thread.
889  *
890  * \param name Name of this TV instance
891  * \param inq_name Incoming queue name
892  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
893  * \param outq_name Outgoing queue name
894  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
895  * \param slots String representation for the slot function to be used
896  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
897  * \param mucond Flag to indicate whether to initialize the condition
898  * and the mutex variables for this newly created TV.
899  *
900  * \retval the newly created TV instance, or NULL on error
901  */
902 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
903  const char *outq_name, const char *outqh_name, const char *slots,
904  void * (*fn_p)(void *), int mucond)
905 {
906  ThreadVars *tv = NULL;
907  Tmq *tmq = NULL;
908  Tmqh *tmqh = NULL;
909 
910  SCLogDebug("creating thread \"%s\"...", name);
911 
912  /* XXX create separate function for this: allocate a thread container */
913  tv = SCMalloc(sizeof(ThreadVars));
914  if (unlikely(tv == NULL))
915  goto error;
916  memset(tv, 0, sizeof(ThreadVars));
917 
918  SC_ATOMIC_INIT(tv->flags);
919  SCMutexInit(&tv->perf_public_ctx.m, NULL);
920 
921  strlcpy(tv->name, name, sizeof(tv->name));
922 
923  /* default state for every newly created thread */
926 
927  /* set the incoming queue */
928  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
929  SCLogDebug("inq_name \"%s\"", inq_name);
930 
931  tmq = TmqGetQueueByName(inq_name);
932  if (tmq == NULL) {
933  tmq = TmqCreateQueue(inq_name);
934  if (tmq == NULL)
935  goto error;
936  }
937  SCLogDebug("tmq %p", tmq);
938 
939  tv->inq = tmq;
940  tv->inq->reader_cnt++;
941  SCLogDebug("tv->inq %p", tv->inq);
942  }
943  if (inqh_name != NULL) {
944  SCLogDebug("inqh_name \"%s\"", inqh_name);
945 
946  int id = TmqhNameToID(inqh_name);
947  if (id <= 0) {
948  goto error;
949  }
950  tmqh = TmqhGetQueueHandlerByName(inqh_name);
951  if (tmqh == NULL)
952  goto error;
953 
954  tv->tmqh_in = tmqh->InHandler;
955  tv->inq_id = (uint8_t)id;
956  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
957  }
958 
959  /* set the outgoing queue */
960  if (outqh_name != NULL) {
961  SCLogDebug("outqh_name \"%s\"", outqh_name);
962 
963  int id = TmqhNameToID(outqh_name);
964  if (id <= 0) {
965  goto error;
966  }
967 
968  tmqh = TmqhGetQueueHandlerByName(outqh_name);
969  if (tmqh == NULL)
970  goto error;
971 
972  tv->tmqh_out = tmqh->OutHandler;
973  tv->outq_id = (uint8_t)id;
974 
975  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
976  SCLogDebug("outq_name \"%s\"", outq_name);
977 
978  if (tmqh->OutHandlerCtxSetup != NULL) {
979  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
980  if (tv->outctx == NULL)
981  goto error;
982  tv->outq = NULL;
983  } else {
984  tmq = TmqGetQueueByName(outq_name);
985  if (tmq == NULL) {
986  tmq = TmqCreateQueue(outq_name);
987  if (tmq == NULL)
988  goto error;
989  }
990  SCLogDebug("tmq %p", tmq);
991 
992  tv->outq = tmq;
993  tv->outctx = NULL;
994  tv->outq->writer_cnt++;
995  }
996  }
997  }
998 
999  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1000  goto error;
1001  }
1002 
1003  if (mucond != 0)
1004  TmThreadInitMC(tv);
1005 
1006  return tv;
1007 
1008 error:
1009  SCLogError("failed to setup a thread");
1010 
1011  if (tv != NULL)
1012  SCFree(tv);
1013  return NULL;
1014 }
1015 
1016 /**
1017  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1018  * This function doesn't support custom slots, and hence shouldn't be
1019  * supplied \"custom\" as its slot type. All PPT threads are created
1020  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1021  * conditional variables are not used to kill the thread.
1022  *
1023  * \param name Name of this TV instance
1024  * \param inq_name Incoming queue name
1025  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1026  * \param outq_name Outgoing queue name
1027  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1028  * \param slots String representation for the slot function to be used
1029  *
1030  * \retval the newly created TV instance, or NULL on error
1031  */
1032 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1033  const char *inqh_name, const char *outq_name,
1034  const char *outqh_name, const char *slots)
1035 {
1036  ThreadVars *tv = NULL;
1037 
1038  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1039  slots, NULL, 0);
1040 
1041  if (tv != NULL) {
1042  tv->type = TVT_PPT;
1044  }
1045 
1046  return tv;
1047 }
1048 
1049 /**
1050  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1051  * This function supports only custom slot functions and hence a
1052  * function pointer should be sent as an argument.
1053  *
1054  * \param name Name of this TV instance
1055  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1056  * \param mucond Flag to indicate whether to initialize the condition
1057  * and the mutex variables for this newly created TV.
1058  *
1059  * \retval the newly created TV instance, or NULL on error
1060  */
1061 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1062  int mucond)
1063 {
1064  ThreadVars *tv = NULL;
1065 
1066  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1067 
1068  if (tv != NULL) {
1069  tv->type = TVT_MGMT;
1072  }
1073 
1074  return tv;
1075 }
1076 
1077 /**
1078  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1079  * This function supports only custom slot functions and hence a
1080  * function pointer should be sent as an argument.
1081  *
1082  * \param name Name of this TV instance
1083  * \param module Name of TmModule with MANAGEMENT flag set.
1084  * \param mucond Flag to indicate whether to initialize the condition
1085  * and the mutex variables for this newly created TV.
1086  *
1087  * \retval the newly created TV instance, or NULL on error
1088  */
1089 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1090  int mucond)
1091 {
1092  ThreadVars *tv = NULL;
1093 
1094  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1095 
1096  if (tv != NULL) {
1097  tv->type = TVT_MGMT;
1100 
1101  TmModule *m = TmModuleGetByName(module);
1102  if (m) {
1103  TmSlotSetFuncAppend(tv, m, NULL);
1104  }
1105  }
1106 
1107  return tv;
1108 }
1109 
1110 /**
1111  * \brief Creates and returns the TV instance for a Command thread (CMD).
1112  * This function supports only custom slot functions and hence a
1113  * function pointer should be sent as an argument.
1114  *
1115  * \param name Name of this TV instance
1116  * \param module Name of TmModule with COMMAND flag set.
1117  * \param mucond Flag to indicate whether to initialize the condition
1118  * and the mutex variables for this newly created TV.
1119  *
1120  * \retval the newly created TV instance, or NULL on error
1121  */
1122 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1123  int mucond)
1124 {
1125  ThreadVars *tv = NULL;
1126 
1127  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1128 
1129  if (tv != NULL) {
1130  tv->type = TVT_CMD;
1133 
1134  TmModule *m = TmModuleGetByName(module);
1135  if (m) {
1136  TmSlotSetFuncAppend(tv, m, NULL);
1137  }
1138  }
1139 
1140  return tv;
1141 }
1142 
1143 /**
1144  * \brief Appends this TV to tv_root based on its type
1145  *
1146  * \param type holds the type this TV belongs to.
1147  */
1149 {
1151 
1152  if (tv_root[type] == NULL) {
1153  tv_root[type] = tv;
1154  tv->next = NULL;
1155 
1157 
1158  return;
1159  }
1160 
1161  ThreadVars *t = tv_root[type];
1162 
1163  while (t) {
1164  if (t->next == NULL) {
1165  t->next = tv;
1166  tv->next = NULL;
1167  break;
1168  }
1169 
1170  t = t->next;
1171  }
1172 
1174 
1175  return;
1176 }
1177 
1178 static bool ThreadStillHasPackets(ThreadVars *tv)
1179 {
1180  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1181  /* we wait till we dry out all the inq packets, before we
1182  * kill this thread. Do note that you should have disabled
1183  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1184  PacketQueue *q = tv->inq->pq;
1185  SCMutexLock(&q->mutex_q);
1186  uint32_t len = q->len;
1187  SCMutexUnlock(&q->mutex_q);
1188  if (len != 0) {
1189  return true;
1190  }
1191  }
1192 
1193  if (tv->stream_pq != NULL) {
1195  uint32_t len = tv->stream_pq->len;
1197 
1198  if (len != 0) {
1199  return true;
1200  }
1201  }
1202  return false;
1203 }
1204 
1205 /**
1206  * \brief Kill a thread.
1207  *
1208  * \param tv A ThreadVars instance corresponding to the thread that has to be
1209  * killed.
1210  *
1211  * \retval r 1 killed succesfully
1212  * 0 not yet ready, needs another look
1213  */
1214 static int TmThreadKillThread(ThreadVars *tv)
1215 {
1216  BUG_ON(tv == NULL);
1217 
1218  /* kill only once :) */
1219  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1220  return 1;
1221  }
1222 
1223  /* set the thread flag informing the thread that it needs to be
1224  * terminated */
1227 
1228  /* to be sure, signal more */
1229  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1230  if (tv->inq_id != TMQH_NOT_SET) {
1232  if (qh != NULL && qh->InShutdownHandler != NULL) {
1233  qh->InShutdownHandler(tv);
1234  }
1235  }
1236  if (tv->inq != NULL) {
1237  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1238  SCCondSignal(&tv->inq->pq->cond_q);
1239  }
1240  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1241  }
1242 
1243  if (tv->ctrl_cond != NULL ) {
1244  pthread_cond_broadcast(tv->ctrl_cond);
1245  }
1246  return 0;
1247  }
1248 
1249  if (tv->outctx != NULL) {
1250  if (tv->outq_id != TMQH_NOT_SET) {
1252  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1253  qh->OutHandlerCtxFree(tv->outctx);
1254  tv->outctx = NULL;
1255  }
1256  }
1257  }
1258 
1259  /* join it and flag it as dead */
1260  pthread_join(tv->t, NULL);
1261  SCLogDebug("thread %s stopped", tv->name);
1263  return 1;
1264 }
1265 
1266 /** \internal
1267  *
1268  * \brief make sure that all packet threads are done processing their
1269  * in-flight packets, including 'injected' flow packets.
1270  */
1271 static void TmThreadDrainPacketThreads(void)
1272 {
1273  ThreadVars *tv = NULL;
1274  struct timeval start_ts;
1275  struct timeval cur_ts;
1276  gettimeofday(&start_ts, NULL);
1277 
1278 again:
1279  gettimeofday(&cur_ts, NULL);
1280  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1281  SCLogWarning("unable to get all packet threads "
1282  "to process their packets in time");
1283  return;
1284  }
1285 
1287 
1288  /* all receive threads are part of packet processing threads */
1289  tv = tv_root[TVT_PPT];
1290  while (tv) {
1291  if (ThreadStillHasPackets(tv)) {
1292  /* we wait till we dry out all the inq packets, before we
1293  * kill this thread. Do note that you should have disabled
1294  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1296 
1297  /* sleep outside lock */
1298  SleepMsec(1);
1299  goto again;
1300  }
1301  if (tv->flow_queue) {
1303  bool fq_done = (tv->flow_queue->qlen == 0);
1305  if (!fq_done) {
1307 
1308  Packet *p = PacketGetFromAlloc();
1309  if (p != NULL) {
1312  PacketQueue *q = tv->stream_pq;
1313  SCMutexLock(&q->mutex_q);
1314  PacketEnqueue(q, p);
1315  SCCondSignal(&q->cond_q);
1316  SCMutexUnlock(&q->mutex_q);
1317  }
1318 
1319  /* don't sleep while holding a lock */
1320  SleepMsec(1);
1321  goto again;
1322  }
1323  }
1324  tv = tv->next;
1325  }
1326 
1328  return;
1329 }
1330 
1331 /**
1332  * \brief Disable all threads having the specified TMs.
1333  *
1334  * Breaks out of the packet acquisition loop, and bumps
1335  * into the 'flow loop', where it will process packets
1336  * from the flow engine's shutdown handling.
1337  */
1339 {
1340  ThreadVars *tv = NULL;
1341  struct timeval start_ts;
1342  struct timeval cur_ts;
1343  gettimeofday(&start_ts, NULL);
1344 
1345 again:
1346  gettimeofday(&cur_ts, NULL);
1347  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1348  FatalError("Engine unable to disable detect "
1349  "thread - \"%s\". Killing engine",
1350  tv->name);
1351  }
1352 
1354 
1355  /* all receive threads are part of packet processing threads */
1356  tv = tv_root[TVT_PPT];
1357 
1358  /* we do have to keep in mind that TVs are arranged in the order
1359  * right from receive to log. The moment we fail to find a
1360  * receive TM amongst the slots in a tv, it indicates we are done
1361  * with all receive threads */
1362  while (tv) {
1363  int disable = 0;
1364  TmModule *tm = NULL;
1365  /* obtain the slots for this TV */
1366  TmSlot *slots = tv->tm_slots;
1367  while (slots != NULL) {
1368  tm = TmModuleGetById(slots->tm_id);
1369 
1370  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1371  disable = 1;
1372  break;
1373  }
1374 
1375  slots = slots->slot_next;
1376  continue;
1377  }
1378 
1379  if (disable) {
1380  if (ThreadStillHasPackets(tv)) {
1381  /* we wait till we dry out all the inq packets, before we
1382  * kill this thread. Do note that you should have disabled
1383  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1385  /* don't sleep while holding a lock */
1386  SleepMsec(1);
1387  goto again;
1388  }
1389 
1390  if (tv->flow_queue) {
1392  bool fq_done = (tv->flow_queue->qlen == 0);
1394  if (!fq_done) {
1396 
1397  Packet *p = PacketGetFromAlloc();
1398  if (p != NULL) {
1401  PacketQueue *q = tv->stream_pq;
1402  SCMutexLock(&q->mutex_q);
1403  PacketEnqueue(q, p);
1404  SCCondSignal(&q->cond_q);
1405  SCMutexUnlock(&q->mutex_q);
1406  }
1407 
1408  /* don't sleep while holding a lock */
1409  SleepMsec(1);
1410  goto again;
1411  }
1412  }
1413 
1414  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1415  if (tm && tm->PktAcqBreakLoop != NULL) {
1416  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1417  }
1419 
1420  if (tv->inq != NULL) {
1421  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1422  SCCondSignal(&tv->inq->pq->cond_q);
1423  }
1424  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1425  }
1426 
1427  /* wait for it to enter the 'flow loop' stage */
1428  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1430 
1431  SleepMsec(1);
1432  goto again;
1433  }
1434  }
1435 
1436  tv = tv->next;
1437  }
1438 
1440 
1441  /* finally wait for all packet threads to have
1442  * processed all of their 'live' packets so we
1443  * don't process the last live packets together
1444  * with FFR packets */
1445  TmThreadDrainPacketThreads();
1446  return;
1447 }
1448 
1449 #ifdef DEBUG_VALIDATION
1450 static void TmThreadDumpThreads(void);
1451 #endif
1452 
1453 static void TmThreadDebugValidateNoMorePackets(void)
1454 {
1455 #ifdef DEBUG_VALIDATION
1457  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1458  if (ThreadStillHasPackets(tv)) {
1460  TmThreadDumpThreads();
1461  abort();
1462  }
1463  }
1465 #endif
1466 }
1467 
1468 /**
1469  * \brief Disable all packet threads
1470  */
1472 {
1473  struct timeval start_ts;
1474  struct timeval cur_ts;
1475 
1476  /* first drain all packet threads of their packets */
1477  TmThreadDrainPacketThreads();
1478 
1479  /* since all the threads possibly able to produce more packets
1480  * are now gone or inactive, we should see no packets anywhere
1481  * anymore. */
1482  TmThreadDebugValidateNoMorePackets();
1483 
1484  gettimeofday(&start_ts, NULL);
1485 again:
1486  gettimeofday(&cur_ts, NULL);
1487  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1488  FatalError("Engine unable to disable packet "
1489  "threads. Killing engine");
1490  }
1491 
1492  /* loop through the packet threads and kill them */
1494  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1496 
1497  /* separate worker threads (autofp) will still wait at their
1498  * input queues. So nudge them here so they will observe the
1499  * THV_KILL flag. */
1500  if (tv->inq != NULL) {
1501  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1502  SCCondSignal(&tv->inq->pq->cond_q);
1503  }
1504  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1505  }
1506 
1509 
1510  SleepMsec(1);
1511  goto again;
1512  }
1513  }
1515  return;
1516 }
1517 
1518 #define MIN_WAIT_TIME 100
1519 #define MAX_WAIT_TIME 999999
1521 {
1522  ThreadVars *tv = NULL;
1523  unsigned int sleep_usec = MIN_WAIT_TIME;
1524 
1525  BUG_ON((family < 0) || (family >= TVT_MAX));
1526 
1527 again:
1529  tv = tv_root[family];
1530 
1531  while (tv) {
1532  int r = TmThreadKillThread(tv);
1533  if (r == 0) {
1535  SleepUsec(sleep_usec);
1536  sleep_usec *= 2; /* slowly back off */
1537  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1538  goto again;
1539  }
1540  sleep_usec = MIN_WAIT_TIME; /* reset */
1541 
1542  tv = tv->next;
1543  }
1545 }
1546 #undef MIN_WAIT_TIME
1547 #undef MAX_WAIT_TIME
1548 
1550 {
1551  int i = 0;
1552 
1553  for (i = 0; i < TVT_MAX; i++) {
1555  }
1556 
1557  return;
1558 }
1559 
1560 static void TmThreadFree(ThreadVars *tv)
1561 {
1562  TmSlot *s;
1563  TmSlot *ps;
1564  if (tv == NULL)
1565  return;
1566 
1567  SCLogDebug("Freeing thread '%s'.", tv->name);
1568 
1569  if (tv->flow_queue) {
1570  BUG_ON(tv->flow_queue->qlen != 0);
1571  SCFree(tv->flow_queue);
1572  }
1573 
1575 
1576  TmThreadDeinitMC(tv);
1577 
1578  if (tv->thread_group_name) {
1580  }
1581 
1582  if (tv->printable_name) {
1584  }
1585 
1586  if (tv->stream_pq_local) {
1590  }
1591 
1592  s = (TmSlot *)tv->tm_slots;
1593  while (s) {
1594  ps = s;
1595  s = s->slot_next;
1596  SCFree(ps);
1597  }
1598 
1600  SCFree(tv);
1601 }
1602 
1603 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1604 {
1605  char *thread_group_name = NULL;
1606 
1607  if (name == NULL)
1608  return;
1609 
1610  if (tv == NULL)
1611  return;
1612 
1613  thread_group_name = SCStrdup(name);
1614  if (unlikely(thread_group_name == NULL)) {
1615  SCLogError("error allocating memory");
1616  return;
1617  }
1618  tv->thread_group_name = thread_group_name;
1619 }
1620 
1622 {
1623  ThreadVars *tv = NULL;
1624  ThreadVars *ptv = NULL;
1625 
1626  if ((family < 0) || (family >= TVT_MAX))
1627  return;
1628 
1630  tv = tv_root[family];
1631 
1632  while (tv) {
1633  ptv = tv;
1634  tv = tv->next;
1635  TmThreadFree(ptv);
1636  }
1637  tv_root[family] = NULL;
1639 }
1640 
1641 /**
1642  * \brief Spawns a thread associated with the ThreadVars instance tv
1643  *
1644  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1645  */
1647 {
1648  pthread_attr_t attr;
1649  if (tv->tm_func == NULL) {
1650  FatalError("No thread function set");
1651  }
1652 
1653  /* Initialize and set thread detached attribute */
1654  pthread_attr_init(&attr);
1655 
1656  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1657 
1658  /* Adjust thread stack size if configured */
1660  SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1661  if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1662  FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
1664  }
1665  }
1666 
1667  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1668  if (rc) {
1669  FatalError("Unable to create thread with pthread_create() is %" PRId32, rc);
1670  }
1671 
1672 #if DEBUG && HAVE_PTHREAD_GETATTR_NP
1674  if (pthread_getattr_np(tv->t, &attr) == 0) {
1675  size_t stack_size;
1676  void *stack_addr;
1677  pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1678  SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1679  } else {
1680  SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1681  "pthread_getattr_np() is %" PRId32,
1682  rc);
1683  }
1684  }
1685 #endif
1686 
1688 
1689  TmThreadAppend(tv, tv->type);
1690  return TM_ECODE_OK;
1691 }
1692 
1693 /**
1694  * \brief Initializes the mutex and condition variables for this TV
1695  *
1696  * It can be used by a thread to control a wait loop that can also be
1697  * influenced by other threads.
1698  *
1699  * \param tv Pointer to a TV instance
1700  */
1702 {
1703  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1704  FatalError("Fatal error encountered in TmThreadInitMC. "
1705  "Exiting...");
1706  }
1707 
1708  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1709  printf("Error initializing the tv->m mutex\n");
1710  exit(EXIT_FAILURE);
1711  }
1712 
1713  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1714  FatalError("Fatal error encountered in TmThreadInitMC. "
1715  "Exiting...");
1716  }
1717 
1718  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1719  FatalError("Error initializing the tv->cond condition "
1720  "variable");
1721  }
1722 
1723  return;
1724 }
1725 
1726 static void TmThreadDeinitMC(ThreadVars *tv)
1727 {
1728  if (tv->ctrl_mutex) {
1730  SCFree(tv->ctrl_mutex);
1731  }
1732  if (tv->ctrl_cond) {
1734  SCFree(tv->ctrl_cond);
1735  }
1736  return;
1737 }
1738 
1739 /**
1740  * \brief Tests if the thread represented in the arg has been unpaused or not.
1741  *
1742  * The function would return if the thread tv has been unpaused or if the
1743  * kill flag for the thread has been set.
1744  *
1745  * \param tv Pointer to the TV instance.
1746  */
1748 {
1749  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1750  SleepUsec(100);
1751 
1753  break;
1754  }
1755 
1756  return;
1757 }
1758 
1759 /**
1760  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1761  * the kill flag has been set or not on the thread.
1762  *
1763  * \param tv Pointer to the TV instance.
1764  */
1766 {
1767  while (!TmThreadsCheckFlag(tv, flags)) {
1768  SleepUsec(100);
1769  }
1770 
1771  return;
1772 }
1773 
1774 /**
1775  * \brief Unpauses a thread
1776  *
1777  * \param tv Pointer to a TV instance that has to be unpaused
1778  */
1780 {
1782  return;
1783 }
1784 
1785 /**
1786  * \brief Waits for all threads to be in a running state
1787  *
1788  * \retval TM_ECODE_OK if all are running or error if a thread failed
1789  */
1791 {
1792  uint16_t RX_num = 0;
1793  uint16_t W_num = 0;
1794  uint16_t FM_num = 0;
1795  uint16_t FR_num = 0;
1796  uint16_t TX_num = 0;
1797 
1798  struct timeval start_ts;
1799  struct timeval cur_ts;
1800  gettimeofday(&start_ts, NULL);
1801 
1802 again:
1804  for (int i = 0; i < TVT_MAX; i++) {
1805  ThreadVars *tv = tv_root[i];
1806  while (tv != NULL) {
1809 
1810  SCLogError("thread \"%s\" failed to "
1811  "start: flags %04x",
1812  tv->name, SC_ATOMIC_GET(tv->flags));
1813  return TM_ECODE_FAILED;
1814  }
1815 
1818 
1819  /* 60 seconds provided for the thread to transition from
1820  * THV_INIT_DONE to THV_RUNNING */
1821  gettimeofday(&cur_ts, NULL);
1822  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1823  SCLogError("thread \"%s\" failed to "
1824  "start in time: flags %04x",
1825  tv->name, SC_ATOMIC_GET(tv->flags));
1826  return TM_ECODE_FAILED;
1827  }
1828 
1829  /* sleep a little to give the thread some
1830  * time to start running */
1831  SleepUsec(100);
1832  goto again;
1833  }
1834  tv = tv->next;
1835  }
1836  }
1837  for (int i = 0; i < TVT_MAX; i++) {
1838  for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
1839  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1840  RX_num++;
1841  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1842  W_num++;
1843  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1844  TX_num++;
1845  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1846  FM_num++;
1847  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1848  FR_num++;
1849  }
1850  }
1852 
1853  /* Construct a welcome string displaying
1854  * initialized thread types and counts */
1855  uint16_t app_len = 32;
1856  uint16_t buf_len = 256;
1857 
1858  char append_str[app_len];
1859  char thread_counts[buf_len];
1860 
1861  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1862  if (RX_num > 0) {
1863  snprintf(append_str, app_len, "RX: %u ", RX_num);
1864  strlcat(thread_counts, append_str, buf_len);
1865  }
1866  if (W_num > 0) {
1867  snprintf(append_str, app_len, "W: %u ", W_num);
1868  strlcat(thread_counts, append_str, buf_len);
1869  }
1870  if (TX_num > 0) {
1871  snprintf(append_str, app_len, "TX: %u ", TX_num);
1872  strlcat(thread_counts, append_str, buf_len);
1873  }
1874  if (FM_num > 0) {
1875  snprintf(append_str, app_len, "FM: %u ", FM_num);
1876  strlcat(thread_counts, append_str, buf_len);
1877  }
1878  if (FR_num > 0) {
1879  snprintf(append_str, app_len, "FR: %u ", FR_num);
1880  strlcat(thread_counts, append_str, buf_len);
1881  }
1882  snprintf(append_str, app_len, " Engine started.");
1883  strlcat(thread_counts, append_str, buf_len);
1884  SCLogNotice("%s", thread_counts);
1885 
1886  return TM_ECODE_OK;
1887 }
1888 
1889 /**
1890  * \brief Unpauses all threads present in tv_root
1891  */
1893 {
1895  for (int i = 0; i < TVT_MAX; i++) {
1896  ThreadVars *tv = tv_root[i];
1897  while (tv != NULL) {
1899  tv = tv->next;
1900  }
1901  }
1903  return;
1904 }
1905 
1906 /**
1907  * \brief Used to check the thread for certain conditions of failure.
1908  */
1910 {
1912  for (int i = 0; i < TVT_MAX; i++) {
1913  ThreadVars *tv = tv_root[i];
1914  while (tv) {
1916  FatalError("thread %s failed", tv->name);
1917  }
1918  tv = tv->next;
1919  }
1920  }
1922  return;
1923 }
1924 
1925 /**
1926  * \brief Used to check if all threads have finished their initialization. On
1927  * finding an un-initialized thread, it waits till that thread completes
1928  * its initialization, before proceeding to the next thread.
1929  *
1930  * \retval TM_ECODE_OK all initialized properly
1931  * \retval TM_ECODE_FAILED failure
1932  */
1934 {
1935  struct timeval start_ts;
1936  struct timeval cur_ts;
1937  gettimeofday(&start_ts, NULL);
1938 
1939 again:
1941  for (int i = 0; i < TVT_MAX; i++) {
1942  ThreadVars *tv = tv_root[i];
1943  while (tv != NULL) {
1946 
1947  SCLogError("thread \"%s\" failed to "
1948  "initialize: flags %04x",
1949  tv->name, SC_ATOMIC_GET(tv->flags));
1950  return TM_ECODE_FAILED;
1951  }
1952 
1953  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
1955 
1956  gettimeofday(&cur_ts, NULL);
1957  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
1958  SCLogError("thread \"%s\" failed to "
1959  "initialize in time: flags %04x",
1960  tv->name, SC_ATOMIC_GET(tv->flags));
1961  return TM_ECODE_FAILED;
1962  }
1963 
1964  /* sleep a little to give the thread some
1965  * time to finish initialization */
1966  SleepUsec(100);
1967  goto again;
1968  }
1969 
1972  SCLogError("thread \"%s\" failed to "
1973  "initialize.",
1974  tv->name);
1975  return TM_ECODE_FAILED;
1976  }
1979  SCLogError("thread \"%s\" closed on "
1980  "initialization.",
1981  tv->name);
1982  return TM_ECODE_FAILED;
1983  }
1984 
1985  tv = tv->next;
1986  }
1987  }
1989 
1990  return TM_ECODE_OK;
1991 }
1992 
1993 /**
1994  * \brief returns a count of all the threads that match the flag
1995  */
1997 {
1998  uint32_t cnt = 0;
2000  for (int i = 0; i < TVT_MAX; i++) {
2001  ThreadVars *tv = tv_root[i];
2002  while (tv != NULL) {
2003  if ((tv->tmm_flags & flags) == flags)
2004  cnt++;
2005 
2006  tv = tv->next;
2007  }
2008  }
2010  return cnt;
2011 }
2012 
2013 #ifdef DEBUG_VALIDATION
2014 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2015 {
2016  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2018  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2019  tv, s, s->tm_id, m->name);
2020  }
2021 }
2022 
2023 static void TmThreadDumpThreads(void)
2024 {
2026  for (int i = 0; i < TVT_MAX; i++) {
2027  ThreadVars *tv = tv_root[i];
2028  while (tv != NULL) {
2029  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2030  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2031  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2032  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2033  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2034  } else if (tv->stream_pq_local != NULL) {
2035  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2036  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2037  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2038  }
2039  }
2040  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2041  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2042  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2043  }
2044  TmThreadDoDumpSlots(tv);
2045  tv = tv->next;
2046  }
2047  }
2050 }
2051 #endif
2052 
2053 typedef struct Thread_ {
2054  ThreadVars *tv; /**< threadvars structure */
2055  const char *name;
2056  int type;
2057  int in_use; /**< bool to indicate this is in use */
2058 
2059  SCTime_t pktts; /**< current packet time of this thread
2060  * (offline mode) */
2061  uint32_t sys_sec_stamp; /**< timestamp in seconds of the real system
2062  * time when the pktts was last updated. */
2064 
2065 typedef struct Threads_ {
2070 
2071 static Threads thread_store = { NULL, 0, 0 };
2072 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2073 
2075 {
2076  SCMutexLock(&thread_store_lock);
2077  for (size_t s = 0; s < thread_store.threads_size; s++) {
2078  Thread *t = &thread_store.threads[s];
2079  if (t == NULL || t->in_use == 0)
2080  continue;
2081 
2082  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2083  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2084  if (t->tv) {
2085  ThreadVars *tv = t->tv;
2086  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2087  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2088  tv, tv->type, tv->name, tv->tmm_flags, flags);
2089  }
2090  }
2091  SCMutexUnlock(&thread_store_lock);
2092 }
2093 
2094 #define STEP 32
2095 /**
2096  * \retval id thread id, or 0 if not found
2097  */
2099 {
2100  SCMutexLock(&thread_store_lock);
2101  if (thread_store.threads == NULL) {
2102  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2103  BUG_ON(thread_store.threads == NULL);
2104  thread_store.threads_size = STEP;
2105  }
2106 
2107  size_t s;
2108  for (s = 0; s < thread_store.threads_size; s++) {
2109  if (thread_store.threads[s].in_use == 0) {
2110  Thread *t = &thread_store.threads[s];
2111  t->name = tv->name;
2112  t->type = type;
2113  t->tv = tv;
2114  t->in_use = 1;
2115 
2116  SCMutexUnlock(&thread_store_lock);
2117  return (int)(s+1);
2118  }
2119  }
2120 
2121  /* if we get here the array is completely filled */
2122  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2123  BUG_ON(newmem == NULL);
2124  thread_store.threads = newmem;
2125  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2126 
2127  Thread *t = &thread_store.threads[thread_store.threads_size];
2128  t->name = tv->name;
2129  t->type = type;
2130  t->tv = tv;
2131  t->in_use = 1;
2132 
2133  s = thread_store.threads_size;
2134  thread_store.threads_size += STEP;
2135 
2136  SCMutexUnlock(&thread_store_lock);
2137  return (int)(s+1);
2138 }
2139 #undef STEP
2140 
2141 void TmThreadsUnregisterThread(const int id)
2142 {
2143  SCMutexLock(&thread_store_lock);
2144  if (id <= 0 || id > (int)thread_store.threads_size) {
2145  SCMutexUnlock(&thread_store_lock);
2146  return;
2147  }
2148 
2149  /* id is one higher than index */
2150  int idx = id - 1;
2151 
2152  /* reset thread_id, which serves as clearing the record */
2153  thread_store.threads[idx].in_use = 0;
2154 
2155  /* check if we have at least one registered thread left */
2156  size_t s;
2157  for (s = 0; s < thread_store.threads_size; s++) {
2158  Thread *t = &thread_store.threads[s];
2159  if (t->in_use == 1) {
2160  goto end;
2161  }
2162  }
2163 
2164  /* if we get here no threads are registered */
2165  SCFree(thread_store.threads);
2166  thread_store.threads = NULL;
2167  thread_store.threads_size = 0;
2168  thread_store.threads_cnt = 0;
2169 
2170 end:
2171  SCMutexUnlock(&thread_store_lock);
2172 }
2173 
2174 void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2175 {
2176  SCMutexLock(&thread_store_lock);
2177  if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2178  SCMutexUnlock(&thread_store_lock);
2179  return;
2180  }
2181 
2182  int idx = id - 1;
2183  Thread *t = &thread_store.threads[idx];
2184  t->pktts = ts;
2185  struct timeval systs;
2186  gettimeofday(&systs, NULL);
2187  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2188  SCMutexUnlock(&thread_store_lock);
2189 }
2190 
2192 {
2193  bool ready = true;
2194  SCMutexLock(&thread_store_lock);
2195  for (size_t s = 0; s < thread_store.threads_size; s++) {
2196  Thread *t = &thread_store.threads[s];
2197  if (!t->in_use)
2198  break;
2199  if (t->sys_sec_stamp == 0) {
2200  ready = false;
2201  break;
2202  }
2203  }
2204  SCMutexUnlock(&thread_store_lock);
2205  return ready;
2206 }
2207 
2209 {
2210  struct timeval systs;
2211  gettimeofday(&systs, NULL);
2212  SCMutexLock(&thread_store_lock);
2213  for (size_t s = 0; s < thread_store.threads_size; s++) {
2214  Thread *t = &thread_store.threads[s];
2215  if (!t->in_use)
2216  break;
2217  t->pktts = ts;
2218  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2219  }
2220  SCMutexUnlock(&thread_store_lock);
2221 }
2222 
2223 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2224 {
2225  struct timeval local = { 0 };
2226  static struct timeval nullts;
2227  bool set = false;
2228  size_t s;
2229  struct timeval systs;
2230  gettimeofday(&systs, NULL);
2231 
2232  SCMutexLock(&thread_store_lock);
2233  for (s = 0; s < thread_store.threads_size; s++) {
2234  Thread *t = &thread_store.threads[s];
2235  if (t->in_use == 0)
2236  break;
2237  struct timeval pkttv = { .tv_sec = SCTIME_SECS(t->pktts),
2238  .tv_usec = SCTIME_USECS(t->pktts) };
2239  if (!(timercmp(&pkttv, &nullts, ==))) {
2240  /* ignore sleeping threads */
2241  if (t->sys_sec_stamp + 1 < (uint32_t)systs.tv_sec)
2242  continue;
2243 
2244  if (!set) {
2245  SCTIME_TO_TIMEVAL(&local, t->pktts);
2246  set = true;
2247  } else {
2248  if (SCTIME_CMP_LT(t->pktts, SCTIME_FROM_TIMEVAL(&local))) {
2249  SCTIME_TO_TIMEVAL(&local, t->pktts);
2250  }
2251  }
2252  }
2253  }
2254  SCMutexUnlock(&thread_store_lock);
2255  *ts = local;
2256  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2257 }
2258 
2260 {
2261  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2262  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2263  /* always create at least one thread */
2264  if (thread_max == 0)
2265  thread_max = ncpus * threading_detect_ratio;
2266  if (thread_max < 1)
2267  thread_max = 1;
2268  if (thread_max > 1024) {
2269  SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2270  thread_max = 1024;
2271  }
2272  return (uint16_t)thread_max;
2273 }
2274 
2275 static inline void ThreadBreakLoop(ThreadVars *tv)
2276 {
2277  if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
2278  return;
2279  }
2280  /* find the correct slot */
2281  TmSlot *s = tv->tm_slots;
2282  TmModule *tm = TmModuleGetById(s->tm_id);
2283  if (tm->flags & TM_FLAG_RECEIVE_TM) {
2284  /* if the method supports it, BreakLoop. Otherwise we rely on
2285  * the capture method's recv timeout */
2286  if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
2287  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
2288  }
2289  }
2290 }
2291 
2292 /** \brief inject a flow into a threads flow queue
2293  */
2294 void TmThreadsInjectFlowById(Flow *f, const int id)
2295 {
2296  BUG_ON(id <= 0 || id > (int)thread_store.threads_size);
2297 
2298  int idx = id - 1;
2299 
2300  Thread *t = &thread_store.threads[idx];
2301  ThreadVars *tv = t->tv;
2302 
2303  BUG_ON(tv == NULL || tv->flow_queue == NULL);
2304 
2305  FlowEnqueue(tv->flow_queue, f);
2306 
2307  /* wake up listening thread(s) if necessary */
2308  if (tv->inq != NULL) {
2309  SCCondSignal(&tv->inq->pq->cond_q);
2310  } else if (tv->break_loop) {
2311  ThreadBreakLoop(tv);
2312  }
2313 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:81
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:802
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:67
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:94
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:76
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:73
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:134
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2191
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1701
tm-threads.h
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:69
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
THV_USE
#define THV_USE
Definition: threadvars.h:35
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:115
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1646
Threads
struct Threads_ Threads
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1089
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:843
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:83
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:315
TmThreadContinueThreads
void TmThreadContinueThreads(void)
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1892
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:92
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1032
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:387
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:56
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:379
TmThreadSetGroupName
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1603
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:104
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:47
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:99
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1765
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:173
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:56
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:44
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:86
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:64
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:90
Packet_::flags
uint32_t flags
Definition: decode.h:463
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:75
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:357
ThreadVars_::t
pthread_t t
Definition: threadvars.h:58
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:303
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:84
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
Thread_::pktts
SCTime_t pktts
Definition: tm-threads.c:2059
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:89
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:1996
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:103
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:79
StatsSetupPrivate
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1210
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:116
MIN
#define MIN(x, y)
Definition: suricata-common.h:380
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:80
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:73
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1338
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:88
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:387
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:121
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:93
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:754
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:89
m
SCMutex m
Definition: flow-hash.h:6
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:44
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:141
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:356
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:85
rww_lock_contention
thread_local uint64_t rww_lock_contention
Threads_::threads
Thread * threads
Definition: tm-threads.c:2066
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:173
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2094
Thread_::in_use
int in_use
Definition: tm-threads.c:2057
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:37
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(void)
Disable all packet threads.
Definition: tm-threads.c:1471
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:284
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:84
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2294
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:80
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2223
FQLOCK_LOCK
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:73
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:53
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:45
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1062
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:107
util-signal.h
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:78
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:765
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1518
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2054
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1779
AffinityGetNextCPU
uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
Definition: util-affinity.c:283
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:57
type
uint8_t type
Definition: decode-icmpv4.h:0
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:1933
Threads_::threads_cnt
int threads_cnt
Definition: tm-threads.c:2068
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:77
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:95
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:206
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:58
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
threading_set_stack_size
uint64_t threading_set_stack_size
Definition: runmodes.c:76
ThreadVars_::perf_public_ctx
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:127
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1005
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
Definition: tm-threads.c:2174
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
util-affinity.h
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:71
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1747
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1621
THV_KILL
#define THV_KILL
Definition: threadvars.h:39
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:902
FlowQueueNew
FlowQueue * FlowQueueNew(void)
Definition: flow-queue.c:35
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:741
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2141
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2098
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
threading_set_cpu_affinity
int threading_set_cpu_affinity
Definition: runmodes.c:75
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
ThreadVars_::id
int id
Definition: threadvars.h:86
ThreadVars_::type
uint8_t type
Definition: threadvars.h:71
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:289
SCTIME_TO_TIMEVAL
#define SCTIME_TO_TIMEVAL(tv, t)
Definition: util-time.h:82
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:110
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:83
SCReturn
#define SCReturn
Definition: util-debug.h:273
stream.h
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1549
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:159
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:53
Packet_
Definition: decode.h:428
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:132
ThreadVars_::thread_group_name
char * thread_group_name
Definition: threadvars.h:66
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:90
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1519
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:74
TmSlot_
Definition: tm-threads.h:53
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:67
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:68
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:83
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
runmodes.h
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1061
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:31
TmModule_
Definition: tm-modules.h:43
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:255
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
Definition: tm-threads.c:2208
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:115
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
SCTIME_CMP_LT
#define SCTIME_CMP_LT(a, b)
Definition: util-time.h:90
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:38
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:36
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:637
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:61
StatsPublicThreadContext_::m
SCMutex m
Definition: counters.h:74
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1520
Thread_::sys_sec_stamp
uint32_t sys_sec_stamp
Definition: tm-threads.c:2061
Thread_::type
int type
Definition: tm-threads.c:2056
Thread
struct Thread_ Thread
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:89
Packet_::flow
struct Flow_ * flow
Definition: decode.h:465
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1148
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:74
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:53
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:811
Threads_
Definition: tm-threads.c:2065
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:35
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:34
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:105
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:40
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:135
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:100
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:827
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
SCTIME_SECS
#define SCTIME_SECS(t)
Definition: util-time.h:57
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:66
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:71
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
FatalError
#define FatalError(...)
Definition: util-debug.h:502
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax(void)
Definition: tm-threads.c:2259
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:65
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1122
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:142
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:212
threadvars.h
util-validate.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:173
ThreadsAffinityType_
Definition: util-affinity.h:65
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:37
THV_KILL_PKTACQ
#define THV_KILL_PKTACQ
Definition: threadvars.h:46
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:603
Thread_::name
const char * name
Definition: tm-threads.c:2055
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:151
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:50
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:90
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
PKT_SRC_DETECT_RELOAD_FLUSH
@ PKT_SRC_DETECT_RELOAD_FLUSH
Definition: decode.h:63
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:68
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:82
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:67
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:318
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:83
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:111
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:91
suricata.h
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:438
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:140
Tmq_
Definition: tm-queues.h:29
TmThreadWaitOnThreadRunning
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
Definition: tm-threads.c:1790
StatsSyncCounters
#define StatsSyncCounters(tv)
Definition: counters.h:137
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2074
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1909
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:131
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:82
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:68
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:376
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:108
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:316
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:55
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:91
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:41
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2053
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
Threads_::threads_size
size_t threads_size
Definition: tm-threads.c:2067
StatsThreadCleanup
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1309
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:62
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:208
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:111
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:70
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:980
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:360
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:164
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:351
SCTIME_USECS
#define SCTIME_USECS(t)
Definition: util-time.h:56
FQLOCK_UNLOCK
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:75