suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2022 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-debug.h"
39 #include "util-privs.h"
40 #include "util-cpu.h"
41 #include "util-optimize.h"
42 #include "util-profiling.h"
43 #include "util-signal.h"
44 #include "queue.h"
45 
46 #ifdef PROFILE_LOCKING
47 thread_local uint64_t mutex_lock_contention;
48 thread_local uint64_t mutex_lock_wait_ticks;
49 thread_local uint64_t mutex_lock_cnt;
50 
51 thread_local uint64_t spin_lock_contention;
52 thread_local uint64_t spin_lock_wait_ticks;
53 thread_local uint64_t spin_lock_cnt;
54 
55 thread_local uint64_t rww_lock_contention;
56 thread_local uint64_t rww_lock_wait_ticks;
57 thread_local uint64_t rww_lock_cnt;
58 
59 thread_local uint64_t rwr_lock_contention;
60 thread_local uint64_t rwr_lock_wait_ticks;
61 thread_local uint64_t rwr_lock_cnt;
62 #endif
63 
64 #ifdef OS_FREEBSD
65 #include <sched.h>
66 #include <sys/param.h>
67 #include <sys/resource.h>
68 #include <sys/cpuset.h>
69 #include <sys/thr.h>
70 #define cpu_set_t cpuset_t
71 #endif /* OS_FREEBSD */
72 
73 /* prototypes */
74 static int SetCPUAffinity(uint16_t cpu);
75 static void TmThreadDeinitMC(ThreadVars *tv);
76 
77 /* root of the threadvars list */
78 ThreadVars *tv_root[TVT_MAX] = { NULL };
79 
80 /* lock to protect tv_root */
82 
83 /**
84  * \brief Check if a thread flag is set.
85  *
86  * \retval 1 flag is set.
87  * \retval 0 flag is not set.
88  */
89 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
90 {
91  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
92 }
93 
94 /**
95  * \brief Set a thread flag.
96  */
97 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
98 {
99  SC_ATOMIC_OR(tv->flags, flag);
100 }
101 
102 /**
103  * \brief Unset a thread flag.
104  */
105 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
106 {
107  SC_ATOMIC_AND(tv->flags, ~flag);
108 }
109 
110 /**
111  * \brief Separate run function so we can call it recursively.
112  */
114 {
115  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
116  PACKET_PROFILING_TMM_START(p, s->tm_id);
117  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
118  PACKET_PROFILING_TMM_END(p, s->tm_id);
119 
120  /* handle error */
121  if (unlikely(r == TM_ECODE_FAILED)) {
122  /* Encountered error. Return packets to packetpool and return */
123  TmThreadsSlotProcessPktFail(tv, s, NULL);
124  return TM_ECODE_FAILED;
125  }
126 
127  /* handle new packets */
128  while (tv->decode_pq.top != NULL) {
129  Packet *extra_p = PacketDequeueNoLock(&tv->decode_pq);
130  if (unlikely(extra_p == NULL))
131  continue;
132 
133  /* see if we need to process the packet */
134  if (s->slot_next != NULL) {
135  r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
136  if (unlikely(r == TM_ECODE_FAILED)) {
137  TmThreadsSlotProcessPktFail(tv, s, extra_p);
138  return TM_ECODE_FAILED;
139  }
140  }
141  tv->tmqh_out(tv, extra_p);
142  }
143  }
144 
145  return TM_ECODE_OK;
146 }
147 
148 /** \internal
149  *
150  * \brief Process flow timeout packets
151  *
152  * Process flow timeout pseudo packets. During shutdown this loop
153  * is run until the flow engine kills the thread and the queue is
154  * empty.
155  */
156 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
157 {
158  TmSlot *fw_slot = tv->tm_flowworker;
159  int r = TM_ECODE_OK;
160 
161  if (tv->stream_pq == NULL || fw_slot == NULL) {
162  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
163  return r;
164  }
165 
166  SCLogDebug("flow end loop starting");
167  while (1) {
169  uint32_t len = tv->stream_pq->len;
171  if (len > 0) {
172  while (len--) {
176  if (likely(p)) {
177  if ((r = TmThreadsSlotProcessPkt(tv, fw_slot, p) != TM_ECODE_OK)) {
178  if (r == TM_ECODE_FAILED)
179  break;
180  }
181  }
182  }
183  } else {
185  break;
186  }
187  SleepUsec(1);
188  }
189  }
190  SCLogDebug("flow end loop complete");
192 
193  return r;
194 }
195 
196 /*
197 
198  pcap/nfq
199 
200  pkt read
201  callback
202  process_pkt
203 
204  pfring
205 
206  pkt read
207  process_pkt
208 
209  slot:
210  setup
211 
212  pkt_ack_loop(tv, slot_data)
213 
214  deinit
215 
216  process_pkt:
217  while(s)
218  run s;
219  queue;
220 
221  */
222 
223 static void *TmThreadsSlotPktAcqLoop(void *td)
224 {
225  ThreadVars *tv = (ThreadVars *)td;
226  TmSlot *s = tv->tm_slots;
227  char run = 1;
228  TmEcode r = TM_ECODE_OK;
229  TmSlot *slot = NULL;
230 
232 
233  if (tv->thread_setup_flags != 0)
235 
236  /* Drop the capabilities for this thread */
237  SCDropCaps(tv);
238 
239  PacketPoolInit();
240 
241  /* check if we are setup properly */
242  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
243  SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
244  " PktAcqLoop=%p, tmqh_in=%p,"
245  " tmqh_out=%p",
246  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
248  pthread_exit((void *) -1);
249  return NULL;
250  }
251 
252  for (slot = s; slot != NULL; slot = slot->slot_next) {
253  if (slot->SlotThreadInit != NULL) {
254  void *slot_data = NULL;
255  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
256  if (r != TM_ECODE_OK) {
257  if (r == TM_ECODE_DONE) {
258  EngineDone();
260  goto error;
261  } else {
263  goto error;
264  }
265  }
266  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
267  }
268 
269  /* if the flowworker module is the first, get the threads input queue */
270  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
271  tv->stream_pq = tv->inq->pq;
272  tv->tm_flowworker = slot;
273  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
275  if (tv->flow_queue == NULL) {
277  pthread_exit((void *) -1);
278  return NULL;
279  }
280  /* setup a queue */
281  } else if (slot->tm_id == TMM_FLOWWORKER) {
282  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
283  if (tv->stream_pq_local == NULL)
284  FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
287  tv->tm_flowworker = slot;
288  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
290  if (tv->flow_queue == NULL) {
292  pthread_exit((void *) -1);
293  return NULL;
294  }
295  }
296  }
297 
299 
301 
302  while(run) {
307  }
308 
309  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
310 
311  if (r == TM_ECODE_FAILED) {
313  run = 0;
314  }
316  run = 0;
317  }
318  if (r == TM_ECODE_DONE) {
319  run = 0;
320  }
321  }
323 
325 
326  /* process all pseudo packets the flow timeout may throw at us */
327  TmThreadTimeoutLoop(tv, s);
328 
331 
333 
334  for (slot = s; slot != NULL; slot = slot->slot_next) {
335  if (slot->SlotThreadExitPrintStats != NULL) {
336  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
337  }
338 
339  if (slot->SlotThreadDeinit != NULL) {
340  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
341  if (r != TM_ECODE_OK) {
343  goto error;
344  }
345  }
346  }
347 
348  tv->stream_pq = NULL;
349  SCLogDebug("%s ending", tv->name);
351  pthread_exit((void *) 0);
352  return NULL;
353 
354 error:
355  tv->stream_pq = NULL;
356  pthread_exit((void *) -1);
357  return NULL;
358 }
359 
360 static void *TmThreadsSlotVar(void *td)
361 {
362  ThreadVars *tv = (ThreadVars *)td;
363  TmSlot *s = (TmSlot *)tv->tm_slots;
364  Packet *p = NULL;
365  char run = 1;
366  TmEcode r = TM_ECODE_OK;
367 
368  PacketPoolInit();//Empty();
369 
371 
372  if (tv->thread_setup_flags != 0)
374 
375  /* Drop the capabilities for this thread */
376  SCDropCaps(tv);
377 
378  /* check if we are setup properly */
379  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
381  pthread_exit((void *) -1);
382  return NULL;
383  }
384 
385  for (; s != NULL; s = s->slot_next) {
386  if (s->SlotThreadInit != NULL) {
387  void *slot_data = NULL;
388  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
389  if (r != TM_ECODE_OK) {
391  goto error;
392  }
393  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
394  }
395 
396  /* special case: we need to access the stream queue
397  * from the flow timeout code */
398 
399  /* if the flowworker module is the first, get the threads input queue */
400  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
401  tv->stream_pq = tv->inq->pq;
402  tv->tm_flowworker = s;
403  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
405  if (tv->flow_queue == NULL) {
407  pthread_exit((void *) -1);
408  return NULL;
409  }
410  /* setup a queue */
411  } else if (s->tm_id == TMM_FLOWWORKER) {
412  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
413  if (tv->stream_pq_local == NULL)
414  FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
417  tv->tm_flowworker = s;
418  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
420  if (tv->flow_queue == NULL) {
422  pthread_exit((void *) -1);
423  return NULL;
424  }
425  }
426  }
427 
429 
431 
432  s = (TmSlot *)tv->tm_slots;
433 
434  while (run) {
439  }
440 
441  /* input a packet */
442  p = tv->tmqh_in(tv);
443 
444  /* if we didn't get a packet see if we need to do some housekeeping */
445  if (unlikely(p == NULL)) {
446  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
448  if (p != NULL) {
451  }
452  }
453  }
454 
455  if (p != NULL) {
456  /* run the thread module(s) */
457  r = TmThreadsSlotVarRun(tv, p, s);
458  if (r == TM_ECODE_FAILED) {
461  break;
462  }
463 
464  /* output the packet */
465  tv->tmqh_out(tv, p);
466 
467  /* now handle the stream pq packets */
468  TmThreadsHandleInjectedPackets(tv);
469  }
470 
472  run = 0;
473  }
474  } /* while (run) */
476 
479 
481 
482  s = (TmSlot *)tv->tm_slots;
483 
484  for ( ; s != NULL; s = s->slot_next) {
485  if (s->SlotThreadExitPrintStats != NULL) {
486  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
487  }
488 
489  if (s->SlotThreadDeinit != NULL) {
490  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
491  if (r != TM_ECODE_OK) {
493  goto error;
494  }
495  }
496  }
497 
498  SCLogDebug("%s ending", tv->name);
499  tv->stream_pq = NULL;
501  pthread_exit((void *) 0);
502  return NULL;
503 
504 error:
505  tv->stream_pq = NULL;
506  pthread_exit((void *) -1);
507  return NULL;
508 }
509 
510 static void *TmThreadsManagement(void *td)
511 {
512  ThreadVars *tv = (ThreadVars *)td;
513  TmSlot *s = (TmSlot *)tv->tm_slots;
514  TmEcode r = TM_ECODE_OK;
515 
516  BUG_ON(s == NULL);
517 
519 
520  if (tv->thread_setup_flags != 0)
522 
523  /* Drop the capabilities for this thread */
524  SCDropCaps(tv);
525 
526  SCLogDebug("%s starting", tv->name);
527 
528  if (s->SlotThreadInit != NULL) {
529  void *slot_data = NULL;
530  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
531  if (r != TM_ECODE_OK) {
533  pthread_exit((void *) -1);
534  return NULL;
535  }
536  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
537  }
538 
540 
542 
543  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
544  /* handle error */
545  if (r == TM_ECODE_FAILED) {
547  }
548 
551  }
552 
555 
556  if (s->SlotThreadExitPrintStats != NULL) {
557  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
558  }
559 
560  if (s->SlotThreadDeinit != NULL) {
561  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
562  if (r != TM_ECODE_OK) {
564  pthread_exit((void *) -1);
565  return NULL;
566  }
567  }
568 
570  pthread_exit((void *) 0);
571  return NULL;
572 }
573 
574 /**
575  * \brief We set the slot functions.
576  *
577  * \param tv Pointer to the TV to set the slot function for.
578  * \param name Name of the slot variant.
579  * \param fn_p Pointer to a custom slot function. Used only if slot variant
580  * "name" is "custom".
581  *
582  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
583  */
584 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
585 {
586  if (name == NULL) {
587  if (fn_p == NULL) {
588  printf("Both slot name and function pointer can't be NULL inside "
589  "TmThreadSetSlots\n");
590  goto error;
591  } else {
592  name = "custom";
593  }
594  }
595 
596  if (strcmp(name, "varslot") == 0) {
597  tv->tm_func = TmThreadsSlotVar;
598  } else if (strcmp(name, "pktacqloop") == 0) {
599  tv->tm_func = TmThreadsSlotPktAcqLoop;
600  } else if (strcmp(name, "management") == 0) {
601  tv->tm_func = TmThreadsManagement;
602  } else if (strcmp(name, "command") == 0) {
603  tv->tm_func = TmThreadsManagement;
604  } else if (strcmp(name, "custom") == 0) {
605  if (fn_p == NULL)
606  goto error;
607  tv->tm_func = fn_p;
608  } else {
609  printf("Error: Slot \"%s\" not supported\n", name);
610  goto error;
611  }
612 
613  return TM_ECODE_OK;
614 
615 error:
616  return TM_ECODE_FAILED;
617 }
618 
619 /**
620  * \brief Appends a new entry to the slots.
621  *
622  * \param tv TV the slot is attached to.
623  * \param tm TM to append.
624  * \param data Data to be passed on to the slot init function.
625  *
626  * \retval The allocated TmSlot or NULL if there is an error
627  */
628 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
629 {
630  TmSlot *slot = SCMalloc(sizeof(TmSlot));
631  if (unlikely(slot == NULL))
632  return;
633  memset(slot, 0, sizeof(TmSlot));
634  SC_ATOMIC_INITPTR(slot->slot_data);
635  slot->SlotThreadInit = tm->ThreadInit;
636  slot->slot_initdata = data;
637  if (tm->Func) {
638  slot->SlotFunc = tm->Func;
639  } else if (tm->PktAcqLoop) {
640  slot->PktAcqLoop = tm->PktAcqLoop;
641  if (tm->PktAcqBreakLoop) {
642  tv->break_loop = true;
643  }
644  } else if (tm->Management) {
645  slot->Management = tm->Management;
646  }
648  slot->SlotThreadDeinit = tm->ThreadDeinit;
649  /* we don't have to check for the return value "-1". We wouldn't have
650  * received a TM as arg, if it didn't exist */
651  slot->tm_id = TmModuleGetIDForTM(tm);
652 
653  tv->tmm_flags |= tm->flags;
654  tv->cap_flags |= tm->cap_flags;
655 
656  if (tv->tm_slots == NULL) {
657  tv->tm_slots = slot;
658  } else {
659  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
660 
661  /* get the last slot */
662  for ( ; a != NULL; a = a->slot_next) {
663  b = a;
664  }
665  /* append the new slot */
666  if (b != NULL) {
667  b->slot_next = slot;
668  }
669  }
670  return;
671 }
672 
673 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
674 static int SetCPUAffinitySet(cpu_set_t *cs)
675 {
676 #if defined OS_FREEBSD
677  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
678  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
679 #elif OS_DARWIN
680  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
681  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
682 #else
683  pid_t tid = syscall(SYS_gettid);
684  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
685 #endif /* OS_FREEBSD */
686 
687  if (r != 0) {
688  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
689  strerror(errno));
690  return -1;
691  }
692 
693  return 0;
694 }
695 #endif
696 
697 
698 /**
699  * \brief Set the thread affinity on the calling thread.
700  *
701  * \param cpuid Id of the core/cpu to setup the affinity.
702  *
703  * \retval 0 If all goes well; -1 if something is wrong.
704  */
705 static int SetCPUAffinity(uint16_t cpuid)
706 {
707 #if defined __OpenBSD__ || defined sun
708  return 0;
709 #else
710  int cpu = (int)cpuid;
711 
712 #if defined OS_WIN32 || defined __CYGWIN__
713  DWORD cs = 1 << cpu;
714 
715  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
716  if (r != 0) {
717  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
718  strerror(errno));
719  return -1;
720  }
721  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
722  SCGetThreadIdLong(), cpu);
723 
724  return 0;
725 
726 #else
727  cpu_set_t cs;
728 
729  CPU_ZERO(&cs);
730  CPU_SET(cpu, &cs);
731  return SetCPUAffinitySet(&cs);
732 #endif /* windows */
733 #endif /* not supported */
734 }
735 
736 
737 /**
738  * \brief Set the thread options (thread priority).
739  *
740  * \param tv Pointer to the ThreadVars to setup the thread priority.
741  *
742  * \retval TM_ECODE_OK.
743  */
745 {
747  tv->thread_priority = prio;
748 
749  return TM_ECODE_OK;
750 }
751 
752 /**
753  * \brief Adjusting nice value for threads.
754  */
756 {
757  SCEnter();
758 #ifndef __CYGWIN__
759 #ifdef OS_WIN32
760  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
761  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for "
762  "thread %s: %s", tv->name, strerror(errno));
763  } else {
764  SCLogDebug("Priority set to %"PRId32" for thread %s",
766  }
767 #else
768  int ret = nice(tv->thread_priority);
769  if (ret == -1) {
770  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value %d "
771  "for thread %s: %s", tv->thread_priority, tv->name,
772  strerror(errno));
773  } else {
774  SCLogDebug("Nice value set to %"PRId32" for thread %s",
776  }
777 #endif /* OS_WIN32 */
778 #endif
779  SCReturn;
780 }
781 
782 
783 /**
784  * \brief Set the thread options (cpu affinity).
785  *
786  * \param tv pointer to the ThreadVars to setup the affinity.
787  * \param cpu cpu on which affinity is set.
788  *
789  * \retval TM_ECODE_OK
790  */
792 {
794  tv->cpu_affinity = cpu;
795 
796  return TM_ECODE_OK;
797 }
798 
799 
801 {
803  return TM_ECODE_OK;
804 
805  if (type > MAX_CPU_SET) {
806  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
807  return TM_ECODE_FAILED;
808  }
809 
811  tv->cpu_affinity = type;
812 
813  return TM_ECODE_OK;
814 }
815 
817 {
818  if (type >= MAX_CPU_SET) {
819  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
820  return 0;
821  }
822 
824 }
825 
826 /**
827  * \brief Set the thread options (cpu affinitythread).
828  * Priority should be already set by pthread_create.
829  *
830  * \param tv pointer to the ThreadVars of the calling thread.
831  */
833 {
835  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
836  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
838  SetCPUAffinity(tv->cpu_affinity);
839  }
840 
841 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
846  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
847  uint16_t cpu = AffinityGetNextCPU(taf);
848  SetCPUAffinity(cpu);
849  /* If CPU is in a set overwrite the default thread prio */
850  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
852  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
854  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
856  } else {
857  tv->thread_priority = taf->prio;
858  }
859  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
860  "%d, thread id %lu", tv->thread_priority,
861  tv->name, cpu, SCGetThreadIdLong());
862  } else {
863  SetCPUAffinitySet(&taf->cpu_set);
864  tv->thread_priority = taf->prio;
865  SCLogPerf("Setting prio %d for thread \"%s\", "
866  "thread id %lu", tv->thread_priority,
868  }
870  }
871 #endif
872 
873  return TM_ECODE_OK;
874 }
875 
876 /**
877  * \brief Creates and returns the TV instance for a new thread.
878  *
879  * \param name Name of this TV instance
880  * \param inq_name Incoming queue name
881  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
882  * \param outq_name Outgoing queue name
883  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
884  * \param slots String representation for the slot function to be used
885  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
886  * \param mucond Flag to indicate whether to initialize the condition
887  * and the mutex variables for this newly created TV.
888  *
889  * \retval the newly created TV instance, or NULL on error
890  */
891 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
892  const char *outq_name, const char *outqh_name, const char *slots,
893  void * (*fn_p)(void *), int mucond)
894 {
895  ThreadVars *tv = NULL;
896  Tmq *tmq = NULL;
897  Tmqh *tmqh = NULL;
898 
899  SCLogDebug("creating thread \"%s\"...", name);
900 
901  /* XXX create separate function for this: allocate a thread container */
902  tv = SCMalloc(sizeof(ThreadVars));
903  if (unlikely(tv == NULL))
904  goto error;
905  memset(tv, 0, sizeof(ThreadVars));
906 
907  SC_ATOMIC_INIT(tv->flags);
908  SCMutexInit(&tv->perf_public_ctx.m, NULL);
909 
910  strlcpy(tv->name, name, sizeof(tv->name));
911 
912  /* default state for every newly created thread */
915 
916  /* set the incoming queue */
917  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
918  SCLogDebug("inq_name \"%s\"", inq_name);
919 
920  tmq = TmqGetQueueByName(inq_name);
921  if (tmq == NULL) {
922  tmq = TmqCreateQueue(inq_name);
923  if (tmq == NULL)
924  goto error;
925  }
926  SCLogDebug("tmq %p", tmq);
927 
928  tv->inq = tmq;
929  tv->inq->reader_cnt++;
930  SCLogDebug("tv->inq %p", tv->inq);
931  }
932  if (inqh_name != NULL) {
933  SCLogDebug("inqh_name \"%s\"", inqh_name);
934 
935  int id = TmqhNameToID(inqh_name);
936  if (id <= 0) {
937  goto error;
938  }
939  tmqh = TmqhGetQueueHandlerByName(inqh_name);
940  if (tmqh == NULL)
941  goto error;
942 
943  tv->tmqh_in = tmqh->InHandler;
944  tv->inq_id = (uint8_t)id;
945  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
946  }
947 
948  /* set the outgoing queue */
949  if (outqh_name != NULL) {
950  SCLogDebug("outqh_name \"%s\"", outqh_name);
951 
952  int id = TmqhNameToID(outqh_name);
953  if (id <= 0) {
954  goto error;
955  }
956 
957  tmqh = TmqhGetQueueHandlerByName(outqh_name);
958  if (tmqh == NULL)
959  goto error;
960 
961  tv->tmqh_out = tmqh->OutHandler;
962  tv->outq_id = (uint8_t)id;
963 
964  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
965  SCLogDebug("outq_name \"%s\"", outq_name);
966 
967  if (tmqh->OutHandlerCtxSetup != NULL) {
968  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
969  if (tv->outctx == NULL)
970  goto error;
971  tv->outq = NULL;
972  } else {
973  tmq = TmqGetQueueByName(outq_name);
974  if (tmq == NULL) {
975  tmq = TmqCreateQueue(outq_name);
976  if (tmq == NULL)
977  goto error;
978  }
979  SCLogDebug("tmq %p", tmq);
980 
981  tv->outq = tmq;
982  tv->outctx = NULL;
983  tv->outq->writer_cnt++;
984  }
985  }
986  }
987 
988  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
989  goto error;
990  }
991 
992  if (mucond != 0)
994 
995  return tv;
996 
997 error:
998  SCLogError(SC_ERR_THREAD_CREATE, "failed to setup a thread");
999 
1000  if (tv != NULL)
1001  SCFree(tv);
1002  return NULL;
1003 }
1004 
1005 /**
1006  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1007  * This function doesn't support custom slots, and hence shouldn't be
1008  * supplied \"custom\" as its slot type. All PPT threads are created
1009  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1010  * conditional variables are not used to kill the thread.
1011  *
1012  * \param name Name of this TV instance
1013  * \param inq_name Incoming queue name
1014  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1015  * \param outq_name Outgoing queue name
1016  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1017  * \param slots String representation for the slot function to be used
1018  *
1019  * \retval the newly created TV instance, or NULL on error
1020  */
1021 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1022  const char *inqh_name, const char *outq_name,
1023  const char *outqh_name, const char *slots)
1024 {
1025  ThreadVars *tv = NULL;
1026 
1027  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1028  slots, NULL, 0);
1029 
1030  if (tv != NULL) {
1031  tv->type = TVT_PPT;
1033  }
1034 
1035 
1036  return tv;
1037 }
1038 
1039 /**
1040  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1041  * This function supports only custom slot functions and hence a
1042  * function pointer should be sent as an argument.
1043  *
1044  * \param name Name of this TV instance
1045  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1046  * \param mucond Flag to indicate whether to initialize the condition
1047  * and the mutex variables for this newly created TV.
1048  *
1049  * \retval the newly created TV instance, or NULL on error
1050  */
1051 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1052  int mucond)
1053 {
1054  ThreadVars *tv = NULL;
1055 
1056  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1057 
1058  if (tv != NULL) {
1059  tv->type = TVT_MGMT;
1062  }
1063 
1064  return tv;
1065 }
1066 
1067 /**
1068  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1069  * This function supports only custom slot functions and hence a
1070  * function pointer should be sent as an argument.
1071  *
1072  * \param name Name of this TV instance
1073  * \param module Name of TmModule with MANAGEMENT flag set.
1074  * \param mucond Flag to indicate whether to initialize the condition
1075  * and the mutex variables for this newly created TV.
1076  *
1077  * \retval the newly created TV instance, or NULL on error
1078  */
1079 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1080  int mucond)
1081 {
1082  ThreadVars *tv = NULL;
1083 
1084  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1085 
1086  if (tv != NULL) {
1087  tv->type = TVT_MGMT;
1090 
1091  TmModule *m = TmModuleGetByName(module);
1092  if (m) {
1093  TmSlotSetFuncAppend(tv, m, NULL);
1094  }
1095  }
1096 
1097  return tv;
1098 }
1099 
1100 /**
1101  * \brief Creates and returns the TV instance for a Command thread (CMD).
1102  * This function supports only custom slot functions and hence a
1103  * function pointer should be sent as an argument.
1104  *
1105  * \param name Name of this TV instance
1106  * \param module Name of TmModule with COMMAND flag set.
1107  * \param mucond Flag to indicate whether to initialize the condition
1108  * and the mutex variables for this newly created TV.
1109  *
1110  * \retval the newly created TV instance, or NULL on error
1111  */
1112 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1113  int mucond)
1114 {
1115  ThreadVars *tv = NULL;
1116 
1117  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1118 
1119  if (tv != NULL) {
1120  tv->type = TVT_CMD;
1123 
1124  TmModule *m = TmModuleGetByName(module);
1125  if (m) {
1126  TmSlotSetFuncAppend(tv, m, NULL);
1127  }
1128  }
1129 
1130  return tv;
1131 }
1132 
1133 /**
1134  * \brief Appends this TV to tv_root based on its type
1135  *
1136  * \param type holds the type this TV belongs to.
1137  */
1139 {
1141 
1142  if (tv_root[type] == NULL) {
1143  tv_root[type] = tv;
1144  tv->next = NULL;
1145 
1147 
1148  return;
1149  }
1150 
1151  ThreadVars *t = tv_root[type];
1152 
1153  while (t) {
1154  if (t->next == NULL) {
1155  t->next = tv;
1156  tv->next = NULL;
1157  break;
1158  }
1159 
1160  t = t->next;
1161  }
1162 
1164 
1165  return;
1166 }
1167 
1168 static bool ThreadStillHasPackets(ThreadVars *tv)
1169 {
1170  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1171  /* we wait till we dry out all the inq packets, before we
1172  * kill this thread. Do note that you should have disabled
1173  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1174  PacketQueue *q = tv->inq->pq;
1175  SCMutexLock(&q->mutex_q);
1176  uint32_t len = q->len;
1177  SCMutexUnlock(&q->mutex_q);
1178  if (len != 0) {
1179  return true;
1180  }
1181  }
1182 
1183  if (tv->stream_pq != NULL) {
1185  uint32_t len = tv->stream_pq->len;
1187 
1188  if (len != 0) {
1189  return true;
1190  }
1191  }
1192  return false;
1193 }
1194 
1195 /**
1196  * \brief Kill a thread.
1197  *
1198  * \param tv A ThreadVars instance corresponding to the thread that has to be
1199  * killed.
1200  *
1201  * \retval r 1 killed succesfully
1202  * 0 not yet ready, needs another look
1203  */
1204 static int TmThreadKillThread(ThreadVars *tv)
1205 {
1206  BUG_ON(tv == NULL);
1207 
1208  /* kill only once :) */
1209  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1210  return 1;
1211  }
1212 
1213  /* set the thread flag informing the thread that it needs to be
1214  * terminated */
1217 
1218  /* to be sure, signal more */
1219  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1220  if (tv->inq_id != TMQH_NOT_SET) {
1222  if (qh != NULL && qh->InShutdownHandler != NULL) {
1223  qh->InShutdownHandler(tv);
1224  }
1225  }
1226  if (tv->inq != NULL) {
1227  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1228  SCCondSignal(&tv->inq->pq->cond_q);
1229  }
1230  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1231  }
1232 
1233  if (tv->ctrl_cond != NULL ) {
1234  pthread_cond_broadcast(tv->ctrl_cond);
1235  }
1236  return 0;
1237  }
1238 
1239  if (tv->outctx != NULL) {
1240  if (tv->outq_id != TMQH_NOT_SET) {
1242  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1243  qh->OutHandlerCtxFree(tv->outctx);
1244  tv->outctx = NULL;
1245  }
1246  }
1247  }
1248 
1249  /* join it and flag it as dead */
1250  pthread_join(tv->t, NULL);
1251  SCLogDebug("thread %s stopped", tv->name);
1253  return 1;
1254 }
1255 
1256 /** \internal
1257  *
1258  * \brief make sure that all packet threads are done processing their
1259  * in-flight packets, including 'injected' flow packets.
1260  */
1261 static void TmThreadDrainPacketThreads(void)
1262 {
1263  ThreadVars *tv = NULL;
1264  struct timeval start_ts;
1265  struct timeval cur_ts;
1266  gettimeofday(&start_ts, NULL);
1267 
1268 again:
1269  gettimeofday(&cur_ts, NULL);
1270  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1271  SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads "
1272  "to process their packets in time");
1273  return;
1274  }
1275 
1277 
1278  /* all receive threads are part of packet processing threads */
1279  tv = tv_root[TVT_PPT];
1280  while (tv) {
1281  if (ThreadStillHasPackets(tv)) {
1282  /* we wait till we dry out all the inq packets, before we
1283  * kill this thread. Do note that you should have disabled
1284  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1286 
1287  /* sleep outside lock */
1288  SleepMsec(1);
1289  goto again;
1290  }
1291  if (tv->flow_queue) {
1293  bool fq_done = (tv->flow_queue->qlen == 0);
1295  if (!fq_done) {
1297 
1298  Packet *p = PacketGetFromAlloc();
1299  if (p != NULL) {
1302  PacketQueue *q = tv->stream_pq;
1303  SCMutexLock(&q->mutex_q);
1304  PacketEnqueue(q, p);
1305  SCCondSignal(&q->cond_q);
1306  SCMutexUnlock(&q->mutex_q);
1307  }
1308 
1309  /* don't sleep while holding a lock */
1310  SleepMsec(1);
1311  goto again;
1312  }
1313  }
1314  tv = tv->next;
1315  }
1316 
1318  return;
1319 }
1320 
1321 /**
1322  * \brief Disable all threads having the specified TMs.
1323  *
1324  * Breaks out of the packet acquisition loop, and bumps
1325  * into the 'flow loop', where it will process packets
1326  * from the flow engine's shutdown handling.
1327  */
1329 {
1330  ThreadVars *tv = NULL;
1331  struct timeval start_ts;
1332  struct timeval cur_ts;
1333  gettimeofday(&start_ts, NULL);
1334 
1335 again:
1336  gettimeofday(&cur_ts, NULL);
1337  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1338  FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1339  "thread - \"%s\". Killing engine", tv->name);
1340  }
1341 
1343 
1344  /* all receive threads are part of packet processing threads */
1345  tv = tv_root[TVT_PPT];
1346 
1347  /* we do have to keep in mind that TVs are arranged in the order
1348  * right from receive to log. The moment we fail to find a
1349  * receive TM amongst the slots in a tv, it indicates we are done
1350  * with all receive threads */
1351  while (tv) {
1352  int disable = 0;
1353  TmModule *tm = NULL;
1354  /* obtain the slots for this TV */
1355  TmSlot *slots = tv->tm_slots;
1356  while (slots != NULL) {
1357  tm = TmModuleGetById(slots->tm_id);
1358 
1359  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1360  disable = 1;
1361  break;
1362  }
1363 
1364  slots = slots->slot_next;
1365  continue;
1366  }
1367 
1368  if (disable) {
1369  if (ThreadStillHasPackets(tv)) {
1370  /* we wait till we dry out all the inq packets, before we
1371  * kill this thread. Do note that you should have disabled
1372  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1374  /* don't sleep while holding a lock */
1375  SleepMsec(1);
1376  goto again;
1377  }
1378 
1379  if (tv->flow_queue) {
1381  bool fq_done = (tv->flow_queue->qlen == 0);
1383  if (!fq_done) {
1385 
1386  Packet *p = PacketGetFromAlloc();
1387  if (p != NULL) {
1390  PacketQueue *q = tv->stream_pq;
1391  SCMutexLock(&q->mutex_q);
1392  PacketEnqueue(q, p);
1393  SCCondSignal(&q->cond_q);
1394  SCMutexUnlock(&q->mutex_q);
1395  }
1396 
1397  /* don't sleep while holding a lock */
1398  SleepMsec(1);
1399  goto again;
1400  }
1401  }
1402 
1403  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1404  if (tm && tm->PktAcqBreakLoop != NULL) {
1405  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1406  }
1408 
1409  if (tv->inq != NULL) {
1410  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1411  SCCondSignal(&tv->inq->pq->cond_q);
1412  }
1413  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1414  }
1415 
1416  /* wait for it to enter the 'flow loop' stage */
1417  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1419 
1420  SleepMsec(1);
1421  goto again;
1422  }
1423  }
1424 
1425  tv = tv->next;
1426  }
1427 
1429 
1430  /* finally wait for all packet threads to have
1431  * processed all of their 'live' packets so we
1432  * don't process the last live packets together
1433  * with FFR packets */
1434  TmThreadDrainPacketThreads();
1435  return;
1436 }
1437 
1438 #ifdef DEBUG_VALIDATION
1439 static void TmThreadDumpThreads(void);
1440 #endif
1441 
1442 static void TmThreadDebugValidateNoMorePackets(void)
1443 {
1444 #ifdef DEBUG_VALIDATION
1446  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1447  if (ThreadStillHasPackets(tv)) {
1449  TmThreadDumpThreads();
1450  abort();
1451  }
1452  }
1454 #endif
1455 }
1456 
1457 /**
1458  * \brief Disable all packet threads
1459  */
1461 {
1462  struct timeval start_ts;
1463  struct timeval cur_ts;
1464 
1465  /* first drain all packet threads of their packets */
1466  TmThreadDrainPacketThreads();
1467 
1468  /* since all the threads possibly able to produce more packets
1469  * are now gone or inactive, we should see no packets anywhere
1470  * anymore. */
1471  TmThreadDebugValidateNoMorePackets();
1472 
1473  gettimeofday(&start_ts, NULL);
1474 again:
1475  gettimeofday(&cur_ts, NULL);
1476  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1477  FatalError(SC_ERR_FATAL, "Engine unable to disable packet "
1478  "threads. Killing engine");
1479  }
1480 
1481  /* loop through the packet threads and kill them */
1483  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1485 
1486  /* separate worker threads (autofp) will still wait at their
1487  * input queues. So nudge them here so they will observe the
1488  * THV_KILL flag. */
1489  if (tv->inq != NULL) {
1490  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1491  SCCondSignal(&tv->inq->pq->cond_q);
1492  }
1493  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1494  }
1495 
1498 
1499  SleepMsec(1);
1500  goto again;
1501  }
1502  }
1504  return;
1505 }
1506 
1507 #define MIN_WAIT_TIME 100
1508 #define MAX_WAIT_TIME 999999
1510 {
1511  ThreadVars *tv = NULL;
1512  unsigned int sleep_usec = MIN_WAIT_TIME;
1513 
1514  BUG_ON((family < 0) || (family >= TVT_MAX));
1515 
1516 again:
1518  tv = tv_root[family];
1519 
1520  while (tv) {
1521  int r = TmThreadKillThread(tv);
1522  if (r == 0) {
1524  SleepUsec(sleep_usec);
1525  sleep_usec *= 2; /* slowly back off */
1526  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1527  goto again;
1528  }
1529  sleep_usec = MIN_WAIT_TIME; /* reset */
1530 
1531  tv = tv->next;
1532  }
1534 }
1535 #undef MIN_WAIT_TIME
1536 #undef MAX_WAIT_TIME
1537 
1539 {
1540  int i = 0;
1541 
1542  for (i = 0; i < TVT_MAX; i++) {
1544  }
1545 
1546  return;
1547 }
1548 
1549 static void TmThreadFree(ThreadVars *tv)
1550 {
1551  TmSlot *s;
1552  TmSlot *ps;
1553  if (tv == NULL)
1554  return;
1555 
1556  SCLogDebug("Freeing thread '%s'.", tv->name);
1557 
1558  if (tv->flow_queue) {
1559  BUG_ON(tv->flow_queue->qlen != 0);
1560  SCFree(tv->flow_queue);
1561  }
1562 
1564 
1565  TmThreadDeinitMC(tv);
1566 
1567  if (tv->thread_group_name) {
1569  }
1570 
1571  if (tv->printable_name) {
1573  }
1574 
1575  if (tv->stream_pq_local) {
1579  }
1580 
1581  s = (TmSlot *)tv->tm_slots;
1582  while (s) {
1583  ps = s;
1584  s = s->slot_next;
1585  SCFree(ps);
1586  }
1587 
1589  SCFree(tv);
1590 }
1591 
1592 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1593 {
1594  char *thread_group_name = NULL;
1595 
1596  if (name == NULL)
1597  return;
1598 
1599  if (tv == NULL)
1600  return;
1601 
1602  thread_group_name = SCStrdup(name);
1603  if (unlikely(thread_group_name == NULL)) {
1604  SCLogError(SC_ERR_RUNMODE, "error allocating memory");
1605  return;
1606  }
1607  tv->thread_group_name = thread_group_name;
1608 }
1609 
1611 {
1612  ThreadVars *tv = NULL;
1613  ThreadVars *ptv = NULL;
1614 
1615  if ((family < 0) || (family >= TVT_MAX))
1616  return;
1617 
1619  tv = tv_root[family];
1620 
1621  while (tv) {
1622  ptv = tv;
1623  tv = tv->next;
1624  TmThreadFree(ptv);
1625  }
1626  tv_root[family] = NULL;
1628 }
1629 
1630 /**
1631  * \brief Spawns a thread associated with the ThreadVars instance tv
1632  *
1633  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1634  */
1636 {
1637  pthread_attr_t attr;
1638  if (tv->tm_func == NULL) {
1639  FatalError(SC_ERR_TM_THREADS_ERROR, "No thread function set");
1640  }
1641 
1642  /* Initialize and set thread detached attribute */
1643  pthread_attr_init(&attr);
1644 
1645  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1646 
1647  /* Adjust thread stack size if configured */
1649  SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1650  if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1652  "Unable to increase stack size to %" PRIu64 " in thread attributes",
1654  }
1655  }
1656 
1657  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1658  if (rc) {
1660  "Unable to create thread with pthread_create() is %" PRId32, rc);
1661  }
1662 
1663 #if DEBUG && HAVE_PTHREAD_GETATTR_NP
1665  if (pthread_getattr_np(tv->t, &attr) == 0) {
1666  size_t stack_size;
1667  void *stack_addr;
1668  pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1669  SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1670  } else {
1671  SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1672  "pthread_getattr_np() is %" PRId32,
1673  rc);
1674  }
1675  }
1676 #endif
1677 
1679 
1680  TmThreadAppend(tv, tv->type);
1681  return TM_ECODE_OK;
1682 }
1683 
1684 /**
1685  * \brief Initializes the mutex and condition variables for this TV
1686  *
1687  * It can be used by a thread to control a wait loop that can also be
1688  * influenced by other threads.
1689  *
1690  * \param tv Pointer to a TV instance
1691  */
1693 {
1694  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1696  "Fatal error encountered in TmThreadInitMC. "
1697  "Exiting...");
1698  }
1699 
1700  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1701  printf("Error initializing the tv->m mutex\n");
1702  exit(EXIT_FAILURE);
1703  }
1704 
1705  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1707  "Fatal error encountered in TmThreadInitMC. "
1708  "Exiting...");
1709  }
1710 
1711  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1712  FatalError(SC_ERR_FATAL, "Error initializing the tv->cond condition "
1713  "variable");
1714  }
1715 
1716  return;
1717 }
1718 
1719 static void TmThreadDeinitMC(ThreadVars *tv)
1720 {
1721  if (tv->ctrl_mutex) {
1723  SCFree(tv->ctrl_mutex);
1724  }
1725  if (tv->ctrl_cond) {
1727  SCFree(tv->ctrl_cond);
1728  }
1729  return;
1730 }
1731 
1732 /**
1733  * \brief Tests if the thread represented in the arg has been unpaused or not.
1734  *
1735  * The function would return if the thread tv has been unpaused or if the
1736  * kill flag for the thread has been set.
1737  *
1738  * \param tv Pointer to the TV instance.
1739  */
1741 {
1742  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1743  SleepUsec(100);
1744 
1746  break;
1747  }
1748 
1749  return;
1750 }
1751 
1752 /**
1753  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1754  * the kill flag has been set or not on the thread.
1755  *
1756  * \param tv Pointer to the TV instance.
1757  */
1759 {
1760  while (!TmThreadsCheckFlag(tv, flags)) {
1761  SleepUsec(100);
1762  }
1763 
1764  return;
1765 }
1766 
1767 /**
1768  * \brief Unpauses a thread
1769  *
1770  * \param tv Pointer to a TV instance that has to be unpaused
1771  */
1773 {
1775 
1776  return;
1777 }
1778 
1779 /**
1780  * \brief Unpauses all threads present in tv_root
1781  */
1783 {
1785  for (int i = 0; i < TVT_MAX; i++) {
1786  ThreadVars *tv = tv_root[i];
1787  while (tv != NULL) {
1789  tv = tv->next;
1790  }
1791  }
1793  return;
1794 }
1795 
1796 /**
1797  * \brief Used to check the thread for certain conditions of failure.
1798  */
1800 {
1802  for (int i = 0; i < TVT_MAX; i++) {
1803  ThreadVars *tv = tv_root[i];
1804  while (tv) {
1806  FatalError(SC_ERR_FATAL, "thread %s failed", tv->name);
1807  }
1808  tv = tv->next;
1809  }
1810  }
1812  return;
1813 }
1814 
1815 /**
1816  * \brief Used to check if all threads have finished their initialization. On
1817  * finding an un-initialized thread, it waits till that thread completes
1818  * its initialization, before proceeding to the next thread.
1819  *
1820  * \retval TM_ECODE_OK all initialized properly
1821  * \retval TM_ECODE_FAILED failure
1822  */
1824 {
1825  uint16_t RX_num = 0;
1826  uint16_t W_num = 0;
1827  uint16_t FM_num = 0;
1828  uint16_t FR_num = 0;
1829  uint16_t TX_num = 0;
1830 
1831  struct timeval start_ts;
1832  struct timeval cur_ts;
1833  gettimeofday(&start_ts, NULL);
1834 
1835 again:
1837  for (int i = 0; i < TVT_MAX; i++) {
1838  ThreadVars *tv = tv_root[i];
1839  while (tv != NULL) {
1842 
1843  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1844  "initialize: flags %04x", tv->name,
1845  SC_ATOMIC_GET(tv->flags));
1846  return TM_ECODE_FAILED;
1847  }
1848 
1849  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
1851 
1852  gettimeofday(&cur_ts, NULL);
1853  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
1854  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1855  "initialize in time: flags %04x", tv->name,
1856  SC_ATOMIC_GET(tv->flags));
1857  return TM_ECODE_FAILED;
1858  }
1859 
1860  /* sleep a little to give the thread some
1861  * time to finish initialization */
1862  SleepUsec(100);
1863  goto again;
1864  }
1865 
1868  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1869  "initialize.", tv->name);
1870  return TM_ECODE_FAILED;
1871  }
1874  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" closed on "
1875  "initialization.", tv->name);
1876  return TM_ECODE_FAILED;
1877  }
1878 
1879  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1880  RX_num++;
1881  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1882  W_num++;
1883  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1884  TX_num++;
1885  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1886  FM_num++;
1887  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1888  FR_num++;
1889 
1890  tv = tv->next;
1891  }
1892  }
1894 
1895  /* Construct a welcome string displaying
1896  * initialized thread types and counts */
1897  uint16_t app_len = 32;
1898  uint16_t buf_len = 256;
1899 
1900  char append_str[app_len];
1901  char thread_counts[buf_len];
1902 
1903  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1904  if (RX_num > 0) {
1905  snprintf(append_str, app_len, "RX: %u ", RX_num);
1906  strlcat(thread_counts, append_str, buf_len);
1907  }
1908  if (W_num > 0) {
1909  snprintf(append_str, app_len, "W: %u ", W_num);
1910  strlcat(thread_counts, append_str, buf_len);
1911  }
1912  if (TX_num > 0) {
1913  snprintf(append_str, app_len, "TX: %u ", TX_num);
1914  strlcat(thread_counts, append_str, buf_len);
1915  }
1916  if (FM_num > 0) {
1917  snprintf(append_str, app_len, "FM: %u ", FM_num);
1918  strlcat(thread_counts, append_str, buf_len);
1919  }
1920  if (FR_num > 0) {
1921  snprintf(append_str, app_len, "FR: %u ", FR_num);
1922  strlcat(thread_counts, append_str, buf_len);
1923  }
1924  snprintf(append_str, app_len, " Engine started.");
1925  strlcat(thread_counts, append_str, buf_len);
1926  SCLogNotice("%s", thread_counts);
1927 
1928  return TM_ECODE_OK;
1929 }
1930 
1931 /**
1932  * \brief returns a count of all the threads that match the flag
1933  */
1935 {
1936  uint32_t cnt = 0;
1938  for (int i = 0; i < TVT_MAX; i++) {
1939  ThreadVars *tv = tv_root[i];
1940  while (tv != NULL) {
1941  if ((tv->tmm_flags & flags) == flags)
1942  cnt++;
1943 
1944  tv = tv->next;
1945  }
1946  }
1948  return cnt;
1949 }
1950 
1951 #ifdef DEBUG_VALIDATION
1952 static void TmThreadDoDumpSlots(const ThreadVars *tv)
1953 {
1954  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
1956  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
1957  tv, s, s->tm_id, m->name);
1958  }
1959 }
1960 
1961 static void TmThreadDumpThreads(void)
1962 {
1964  for (int i = 0; i < TVT_MAX; i++) {
1965  ThreadVars *tv = tv_root[i];
1966  while (tv != NULL) {
1967  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
1968  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
1969  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
1970  if (tv->inq && tv->stream_pq == tv->inq->pq) {
1971  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
1972  } else if (tv->stream_pq_local != NULL) {
1973  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
1974  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
1975  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
1976  }
1977  }
1978  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
1979  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
1980  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
1981  }
1982  TmThreadDoDumpSlots(tv);
1983  tv = tv->next;
1984  }
1985  }
1988 }
1989 #endif
1990 
1991 typedef struct Thread_ {
1992  ThreadVars *tv; /**< threadvars structure */
1993  const char *name;
1994  int type;
1995  int in_use; /**< bool to indicate this is in use */
1996 
1997  struct timeval pktts; /**< current packet time of this thread
1998  * (offline mode) */
1999  uint32_t sys_sec_stamp; /**< timestamp in seconds of the real system
2000  * time when the pktts was last updated. */
2002 
2003 typedef struct Threads_ {
2008 
2009 static Threads thread_store = { NULL, 0, 0 };
2010 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2011 
2013 {
2014  SCMutexLock(&thread_store_lock);
2015  for (size_t s = 0; s < thread_store.threads_size; s++) {
2016  Thread *t = &thread_store.threads[s];
2017  if (t == NULL || t->in_use == 0)
2018  continue;
2019 
2020  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2021  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2022  if (t->tv) {
2023  ThreadVars *tv = t->tv;
2024  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2025  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2026  tv, tv->type, tv->name, tv->tmm_flags, flags);
2027  }
2028  }
2029  SCMutexUnlock(&thread_store_lock);
2030 }
2031 
2032 #define STEP 32
2033 /**
2034  * \retval id thread id, or 0 if not found
2035  */
2037 {
2038  SCMutexLock(&thread_store_lock);
2039  if (thread_store.threads == NULL) {
2040  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2041  BUG_ON(thread_store.threads == NULL);
2042  thread_store.threads_size = STEP;
2043  }
2044 
2045  size_t s;
2046  for (s = 0; s < thread_store.threads_size; s++) {
2047  if (thread_store.threads[s].in_use == 0) {
2048  Thread *t = &thread_store.threads[s];
2049  t->name = tv->name;
2050  t->type = type;
2051  t->tv = tv;
2052  t->in_use = 1;
2053 
2054  SCMutexUnlock(&thread_store_lock);
2055  return (int)(s+1);
2056  }
2057  }
2058 
2059  /* if we get here the array is completely filled */
2060  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2061  BUG_ON(newmem == NULL);
2062  thread_store.threads = newmem;
2063  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2064 
2065  Thread *t = &thread_store.threads[thread_store.threads_size];
2066  t->name = tv->name;
2067  t->type = type;
2068  t->tv = tv;
2069  t->in_use = 1;
2070 
2071  s = thread_store.threads_size;
2072  thread_store.threads_size += STEP;
2073 
2074  SCMutexUnlock(&thread_store_lock);
2075  return (int)(s+1);
2076 }
2077 #undef STEP
2078 
2079 void TmThreadsUnregisterThread(const int id)
2080 {
2081  SCMutexLock(&thread_store_lock);
2082  if (id <= 0 || id > (int)thread_store.threads_size) {
2083  SCMutexUnlock(&thread_store_lock);
2084  return;
2085  }
2086 
2087  /* id is one higher than index */
2088  int idx = id - 1;
2089 
2090  /* reset thread_id, which serves as clearing the record */
2091  thread_store.threads[idx].in_use = 0;
2092 
2093  /* check if we have at least one registered thread left */
2094  size_t s;
2095  for (s = 0; s < thread_store.threads_size; s++) {
2096  Thread *t = &thread_store.threads[s];
2097  if (t->in_use == 1) {
2098  goto end;
2099  }
2100  }
2101 
2102  /* if we get here no threads are registered */
2103  SCFree(thread_store.threads);
2104  thread_store.threads = NULL;
2105  thread_store.threads_size = 0;
2106  thread_store.threads_cnt = 0;
2107 
2108 end:
2109  SCMutexUnlock(&thread_store_lock);
2110 }
2111 
2112 void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
2113 {
2114  SCMutexLock(&thread_store_lock);
2115  if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2116  SCMutexUnlock(&thread_store_lock);
2117  return;
2118  }
2119 
2120  int idx = id - 1;
2121  Thread *t = &thread_store.threads[idx];
2122  t->pktts = *ts;
2123  struct timeval systs;
2124  gettimeofday(&systs, NULL);
2125  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2126  SCMutexUnlock(&thread_store_lock);
2127 }
2128 
2130 {
2131  bool ready = true;
2132  SCMutexLock(&thread_store_lock);
2133  for (size_t s = 0; s < thread_store.threads_size; s++) {
2134  Thread *t = &thread_store.threads[s];
2135  if (!t->in_use)
2136  break;
2137  if (t->sys_sec_stamp == 0) {
2138  ready = false;
2139  break;
2140  }
2141  }
2142  SCMutexUnlock(&thread_store_lock);
2143  return ready;
2144 }
2145 
2146 void TmThreadsInitThreadsTimestamp(const struct timeval *ts)
2147 {
2148  struct timeval systs;
2149  gettimeofday(&systs, NULL);
2150  SCMutexLock(&thread_store_lock);
2151  for (size_t s = 0; s < thread_store.threads_size; s++) {
2152  Thread *t = &thread_store.threads[s];
2153  if (!t->in_use)
2154  break;
2155  t->pktts = *ts;
2156  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2157  }
2158  SCMutexUnlock(&thread_store_lock);
2159 }
2160 
2161 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2162 {
2163  struct timeval local, nullts;
2164  memset(&local, 0, sizeof(local));
2165  memset(&nullts, 0, sizeof(nullts));
2166  int set = 0;
2167  size_t s;
2168  struct timeval systs;
2169  gettimeofday(&systs, NULL);
2170 
2171  SCMutexLock(&thread_store_lock);
2172  for (s = 0; s < thread_store.threads_size; s++) {
2173  Thread *t = &thread_store.threads[s];
2174  if (t->in_use == 0)
2175  break;
2176  if (!(timercmp(&t->pktts, &nullts, ==))) {
2177  /* ignore sleeping threads */
2178  if (t->sys_sec_stamp + 1 < (uint32_t)systs.tv_sec)
2179  continue;
2180 
2181  if (!set) {
2182  local = t->pktts;
2183  set = 1;
2184  } else {
2185  if (timercmp(&t->pktts, &local, <)) {
2186  local = t->pktts;
2187  }
2188  }
2189  }
2190  }
2191  SCMutexUnlock(&thread_store_lock);
2192  *ts = local;
2193  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2194 }
2195 
2197 {
2198  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2199  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2200  /* always create at least one thread */
2201  if (thread_max == 0)
2202  thread_max = ncpus * threading_detect_ratio;
2203  if (thread_max < 1)
2204  thread_max = 1;
2205  if (thread_max > 1024) {
2206  SCLogWarning(SC_ERR_RUNMODE, "limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2207  thread_max = 1024;
2208  }
2209  return (uint16_t)thread_max;
2210 }
2211 
2212 static inline void ThreadBreakLoop(ThreadVars *tv)
2213 {
2214  if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
2215  return;
2216  }
2217  /* find the correct slot */
2218  TmSlot *s = tv->tm_slots;
2219  TmModule *tm = TmModuleGetById(s->tm_id);
2220  if (tm->flags & TM_FLAG_RECEIVE_TM) {
2221  /* if the method supports it, BreakLoop. Otherwise we rely on
2222  * the capture method's recv timeout */
2223  if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
2224  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
2225  }
2226  }
2227 }
2228 
2229 /** \brief inject a flow into a threads flow queue
2230  */
2231 void TmThreadsInjectFlowById(Flow *f, const int id)
2232 {
2233  BUG_ON(id <= 0 || id > (int)thread_store.threads_size);
2234 
2235  int idx = id - 1;
2236 
2237  Thread *t = &thread_store.threads[idx];
2238  ThreadVars *tv = t->tv;
2239 
2240  BUG_ON(tv == NULL || tv->flow_queue == NULL);
2241 
2242  FlowEnqueue(tv->flow_queue, f);
2243 
2244  /* wake up listening thread(s) if necessary */
2245  if (tv->inq != NULL) {
2246  SCCondSignal(&tv->inq->pq->cond_q);
2247  } else if (tv->break_loop) {
2248  ThreadBreakLoop(tv);
2249  }
2250 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:66
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:791
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:67
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:75
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:72
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:133
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2129
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1692
tm-threads.h
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:68
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:54
THV_USE
#define THV_USE
Definition: threadvars.h:37
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:113
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1635
Threads
struct Threads_ Threads
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1079
SC_ERR_TM_THREADS_ERROR
@ SC_ERR_TM_THREADS_ERROR
Definition: util-error.h:166
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:832
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:95
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:68
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:315
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1021
Thread_::pktts
struct timeval pktts
Definition: tm-threads.c:1997
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:387
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:379
TmThreadSetGroupName
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1592
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:105
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:49
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:298
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:97
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1758
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:173
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:47
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:55
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:46
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:84
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:61
Packet_::flags
uint32_t flags
Definition: decode.h:462
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:74
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:353
ThreadVars_::t
pthread_t t
Definition: threadvars.h:59
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:308
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:60
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:69
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:1934
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:104
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:64
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const struct timeval *ts)
Definition: tm-threads.c:2146
StatsSetupPrivate
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1191
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:115
MIN
#define MIN(x, y)
Definition: suricata-common.h:372
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:78
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:74
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1328
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:387
SC_ERR_THREAD_CREATE
@ SC_ERR_THREAD_CREATE
Definition: util-error.h:78
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:121
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:744
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:37
m
SCMutex m
Definition: flow-hash.h:6
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:43
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:140
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:375
SC_ERR_SHUTDOWN
@ SC_ERR_SHUTDOWN
Definition: util-error.h:220
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:83
rww_lock_contention
thread_local uint64_t rww_lock_contention
SC_ERR_RUNMODE
@ SC_ERR_RUNMODE
Definition: util-error.h:219
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:55
Threads_::threads
Thread * threads
Definition: tm-threads.c:2004
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:174
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:52
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2032
Thread_::in_use
int in_use
Definition: tm-threads.c:1995
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:39
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(void)
Disable all packet threads.
Definition: tm-threads.c:1460
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:302
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:82
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2231
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:81
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2161
FQLOCK_LOCK
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:73
TmThreadContinueThreads
void TmThreadContinueThreads()
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1782
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:54
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:53
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:47
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:54
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1227
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:105
util-signal.h
SC_ERR_THREAD_INIT
@ SC_ERR_THREAD_INIT
Definition: util-error.h:79
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:79
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:755
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1507
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:1992
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1772
AffinityGetNextCPU
uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
Definition: util-affinity.c:297
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:56
type
uint8_t type
Definition: decode-icmpv4.h:0
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:1823
Threads_::threads_cnt
int threads_cnt
Definition: tm-threads.c:2006
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:76
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:90
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:96
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:206
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:57
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
threading_set_stack_size
uint64_t threading_set_stack_size
Definition: runmodes.c:61
ThreadVars_::perf_public_ctx
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:126
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1177
SCEnter
#define SCEnter(...)
Definition: util-debug.h:300
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1740
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1610
THV_KILL
#define THV_KILL
Definition: threadvars.h:41
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:891
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:727
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2079
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2036
threading_set_cpu_affinity
int threading_set_cpu_affinity
Definition: runmodes.c:60
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
SC_ERR_INVALID_ARGUMENT
@ SC_ERR_INVALID_ARGUMENT
Definition: util-error.h:43
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:123
ThreadVars_::id
int id
Definition: threadvars.h:87
ThreadVars_::type
uint8_t type
Definition: threadvars.h:72
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:281
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:109
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:81
SCReturn
#define SCReturn
Definition: util-debug.h:302
stream.h
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1538
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:158
Packet_
Definition: decode.h:427
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:131
ThreadVars_::thread_group_name
char * thread_group_name
Definition: threadvars.h:67
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:90
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1508
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:73
TmSlot_
Definition: tm-threads.h:52
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:66
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:69
TmEcode
TmEcode
Definition: tm-threads-common.h:81
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
runmodes.h
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:55
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1051
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:31
TmModule_
Definition: tm-modules.h:43
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:260
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:114
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:40
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:38
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:628
StatsPublicThreadContext_::m
SCMutex m
Definition: counters.h:72
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1509
Thread_::sys_sec_stamp
uint32_t sys_sec_stamp
Definition: tm-threads.c:1999
Thread_::type
int type
Definition: tm-threads.c:1994
Thread
struct Thread_ Thread
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:90
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1138
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:75
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:44
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:55
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:800
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:91
Threads_
Definition: tm-threads.c:2003
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:33
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:35
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:106
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:42
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:134
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:101
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:93
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:816
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:224
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:50
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:65
SCLogError
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:257
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
Definition: tm-threads.c:2112
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:94
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:70
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
FatalError
#define FatalError(x,...)
Definition: util-debug.h:532
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:66
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1112
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:29
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:141
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:212
threadvars.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:173
ThreadsAffinityType_
Definition: util-affinity.h:64
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:35
THV_KILL_PKTACQ
#define THV_KILL_PKTACQ
Definition: threadvars.h:48
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:597
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:92
Thread_::name
const char * name
Definition: tm-threads.c:1993
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:150
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:48
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:91
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCLogWarning
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:244
PKT_SRC_DETECT_RELOAD_FLUSH
@ PKT_SRC_DETECT_RELOAD_FLUSH
Definition: decode.h:60
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:67
SC_ERR_FATAL
@ SC_ERR_FATAL
Definition: util-error.h:203
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:67
SC_ERR_THREAD_NICE_PRIO
@ SC_ERR_THREAD_NICE_PRIO
Definition: util-error.h:77
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:66
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:318
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:84
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:110
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
SC_ERR_MEM_ALLOC
@ SC_ERR_MEM_ALLOC
Definition: util-error.h:31
FlowQueueNew
FlowQueue * FlowQueueNew()
Definition: flow-queue.c:36
suricata.h
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:459
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:139
Tmq_
Definition: tm-queues.h:29
StatsSyncCounters
#define StatsSyncCounters(tv)
Definition: counters.h:134
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2012
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:61
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1799
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:130
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:83
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:67
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:376
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:106
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax()
Definition: tm-threads.c:2196
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:335
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:89
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:89
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:232
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:43
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:1991
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
Threads_::threads_size
size_t threads_size
Definition: tm-threads.c:2005
StatsThreadCleanup
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1290
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:63
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:211
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:70
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:927
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:360
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:200
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:351
FQLOCK_UNLOCK
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:75