suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2022 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-affinity.h"
39 #include "util-debug.h"
40 #include "util-privs.h"
41 #include "util-cpu.h"
42 #include "util-optimize.h"
43 #include "util-profiling.h"
44 #include "util-signal.h"
45 #include "queue.h"
46 #include "util-validate.h"
47 
48 #ifdef PROFILE_LOCKING
49 thread_local uint64_t mutex_lock_contention;
50 thread_local uint64_t mutex_lock_wait_ticks;
51 thread_local uint64_t mutex_lock_cnt;
52 
53 thread_local uint64_t spin_lock_contention;
54 thread_local uint64_t spin_lock_wait_ticks;
55 thread_local uint64_t spin_lock_cnt;
56 
57 thread_local uint64_t rww_lock_contention;
58 thread_local uint64_t rww_lock_wait_ticks;
59 thread_local uint64_t rww_lock_cnt;
60 
61 thread_local uint64_t rwr_lock_contention;
62 thread_local uint64_t rwr_lock_wait_ticks;
63 thread_local uint64_t rwr_lock_cnt;
64 #endif
65 
66 #ifdef OS_FREEBSD
67 #include <sched.h>
68 #include <sys/param.h>
69 #include <sys/resource.h>
70 #include <sys/cpuset.h>
71 #include <sys/thr.h>
72 #define cpu_set_t cpuset_t
73 #endif /* OS_FREEBSD */
74 
75 /* prototypes */
76 static int SetCPUAffinity(uint16_t cpu);
77 static void TmThreadDeinitMC(ThreadVars *tv);
78 
79 /* root of the threadvars list */
80 ThreadVars *tv_root[TVT_MAX] = { NULL };
81 
82 /* lock to protect tv_root */
84 
85 /**
86  * \brief Check if a thread flag is set.
87  *
88  * \retval 1 flag is set.
89  * \retval 0 flag is not set.
90  */
91 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
92 {
93  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
94 }
95 
96 /**
97  * \brief Set a thread flag.
98  */
99 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
100 {
101  SC_ATOMIC_OR(tv->flags, flag);
102 }
103 
104 /**
105  * \brief Unset a thread flag.
106  */
107 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
108 {
109  SC_ATOMIC_AND(tv->flags, ~flag);
110 }
111 
113  ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
114 {
115  while (decode_pq->top != NULL) {
116  Packet *extra_p = PacketDequeueNoLock(decode_pq);
117  if (unlikely(extra_p == NULL))
118  continue;
119  DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
120 
121  if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
123  }
124  }
126 }
127 
128 /**
129  * \brief Separate run function so we can call it recursively.
130  */
132 {
133  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
134  PACKET_PROFILING_TMM_START(p, s->tm_id);
135  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
136  PACKET_PROFILING_TMM_END(p, s->tm_id);
137  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
138 
139  /* handle error */
140  if (unlikely(r == TM_ECODE_FAILED)) {
141  /* Encountered error. Return packets to packetpool and return */
142  TmThreadsSlotProcessPktFail(tv, NULL);
143  return TM_ECODE_FAILED;
144  }
145  if (s->tm_flags & TM_FLAG_DECODE_TM) {
146  if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
147  TM_ECODE_OK) {
148  return TM_ECODE_FAILED;
149  }
150  }
151  }
152 
153  return TM_ECODE_OK;
154 }
155 
156 /** \internal
157  *
158  * \brief Process flow timeout packets
159  *
160  * Process flow timeout pseudo packets. During shutdown this loop
161  * is run until the flow engine kills the thread and the queue is
162  * empty.
163  */
164 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
165 {
166  TmSlot *fw_slot = tv->tm_flowworker;
167  int r = TM_ECODE_OK;
168 
169  if (tv->stream_pq == NULL || fw_slot == NULL) {
170  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
171  return r;
172  }
173 
174  SCLogDebug("flow end loop starting");
175  while (1) {
177  uint32_t len = tv->stream_pq->len;
179  if (len > 0) {
180  while (len--) {
184  if (likely(p)) {
185  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
186  r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
187  if (r == TM_ECODE_FAILED) {
188  break;
189  }
190  }
191  }
192  } else {
194  break;
195  }
196  SleepUsec(1);
197  }
198  }
199  SCLogDebug("flow end loop complete");
201 
202  return r;
203 }
204 
205 /*
206 
207  pcap/nfq
208 
209  pkt read
210  callback
211  process_pkt
212 
213  pkt read
214  process_pkt
215 
216  slot:
217  setup
218 
219  pkt_ack_loop(tv, slot_data)
220 
221  deinit
222 
223  process_pkt:
224  while(s)
225  run s;
226  queue;
227 
228  */
229 
230 static void *TmThreadsSlotPktAcqLoop(void *td)
231 {
232  ThreadVars *tv = (ThreadVars *)td;
233  TmSlot *s = tv->tm_slots;
234  char run = 1;
235  TmEcode r = TM_ECODE_OK;
236  TmSlot *slot = NULL;
237 
239 
240  if (tv->thread_setup_flags != 0)
242 
243  /* Drop the capabilities for this thread */
244  SCDropCaps(tv);
246  PacketPoolInit();
247 
248  /* check if we are setup properly */
249  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
250  SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
251  " PktAcqLoop=%p, tmqh_in=%p,"
252  " tmqh_out=%p",
253  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
255  pthread_exit((void *) -1);
256  return NULL;
257  }
258 
259  for (slot = s; slot != NULL; slot = slot->slot_next) {
260  if (slot->SlotThreadInit != NULL) {
261  void *slot_data = NULL;
262  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
263  if (r != TM_ECODE_OK) {
264  if (r == TM_ECODE_DONE) {
265  EngineDone();
267  goto error;
268  } else {
270  goto error;
271  }
272  }
273  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
274  }
275 
276  /* if the flowworker module is the first, get the threads input queue */
277  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
278  tv->stream_pq = tv->inq->pq;
279  tv->tm_flowworker = slot;
280  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
282  if (tv->flow_queue == NULL) {
284  pthread_exit((void *) -1);
285  return NULL;
286  }
287  /* setup a queue */
288  } else if (slot->tm_id == TMM_FLOWWORKER) {
289  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
290  if (tv->stream_pq_local == NULL)
291  FatalError("failed to alloc PacketQueue");
294  tv->tm_flowworker = slot;
295  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
297  if (tv->flow_queue == NULL) {
299  pthread_exit((void *) -1);
300  return NULL;
301  }
302  }
303  }
304 
306 
308 
309  while(run) {
314  }
315 
316  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
317 
318  if (r == TM_ECODE_FAILED) {
320  run = 0;
321  }
323  run = 0;
324  }
325  if (r == TM_ECODE_DONE) {
326  run = 0;
327  }
328  }
330 
332 
333  /* process all pseudo packets the flow timeout may throw at us */
334  TmThreadTimeoutLoop(tv, s);
335 
338 
340 
341  for (slot = s; slot != NULL; slot = slot->slot_next) {
342  if (slot->SlotThreadExitPrintStats != NULL) {
343  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
344  }
345 
346  if (slot->SlotThreadDeinit != NULL) {
347  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
348  if (r != TM_ECODE_OK) {
350  goto error;
351  }
352  }
353  }
354 
355  tv->stream_pq = NULL;
356  SCLogDebug("%s ending", tv->name);
358  pthread_exit((void *) 0);
359  return NULL;
360 
361 error:
362  tv->stream_pq = NULL;
363  pthread_exit((void *) -1);
364  return NULL;
365 }
366 
367 static void *TmThreadsSlotVar(void *td)
368 {
369  ThreadVars *tv = (ThreadVars *)td;
370  TmSlot *s = (TmSlot *)tv->tm_slots;
371  Packet *p = NULL;
372  char run = 1;
373  TmEcode r = TM_ECODE_OK;
374 
376  PacketPoolInit();//Empty();
377 
379 
380  if (tv->thread_setup_flags != 0)
382 
383  /* Drop the capabilities for this thread */
384  SCDropCaps(tv);
385 
386  /* check if we are setup properly */
387  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
389  pthread_exit((void *) -1);
390  return NULL;
391  }
392 
393  for (; s != NULL; s = s->slot_next) {
394  if (s->SlotThreadInit != NULL) {
395  void *slot_data = NULL;
396  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
397  if (r != TM_ECODE_OK) {
399  goto error;
400  }
401  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
402  }
403 
404  /* special case: we need to access the stream queue
405  * from the flow timeout code */
406 
407  /* if the flowworker module is the first, get the threads input queue */
408  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
409  tv->stream_pq = tv->inq->pq;
410  tv->tm_flowworker = s;
411  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
413  if (tv->flow_queue == NULL) {
415  pthread_exit((void *) -1);
416  return NULL;
417  }
418  /* setup a queue */
419  } else if (s->tm_id == TMM_FLOWWORKER) {
420  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
421  if (tv->stream_pq_local == NULL)
422  FatalError("failed to alloc PacketQueue");
425  tv->tm_flowworker = s;
426  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
428  if (tv->flow_queue == NULL) {
430  pthread_exit((void *) -1);
431  return NULL;
432  }
433  }
434  }
435 
437 
438  // Each 'worker' thread uses this func to process/decode the packet read.
439  // Each decode method is different to receive methods in that they do not
440  // enter infinite loops. They use this as the core loop. As a result, at this
441  // point the worker threads can be considered both initialized and running.
443 
444  s = (TmSlot *)tv->tm_slots;
445 
446  while (run) {
451  }
452 
453  /* input a packet */
454  p = tv->tmqh_in(tv);
455 
456  /* if we didn't get a packet see if we need to do some housekeeping */
457  if (unlikely(p == NULL)) {
458  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
460  if (p != NULL) {
463  }
464  }
465  }
466 
467  if (p != NULL) {
468  /* run the thread module(s) */
469  r = TmThreadsSlotVarRun(tv, p, s);
470  if (r == TM_ECODE_FAILED) {
473  break;
474  }
475 
476  /* output the packet */
477  tv->tmqh_out(tv, p);
478 
479  /* now handle the stream pq packets */
480  TmThreadsHandleInjectedPackets(tv);
481  }
482 
484  run = 0;
485  }
486  } /* while (run) */
488 
491 
493 
494  s = (TmSlot *)tv->tm_slots;
495 
496  for ( ; s != NULL; s = s->slot_next) {
497  if (s->SlotThreadExitPrintStats != NULL) {
498  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
499  }
500 
501  if (s->SlotThreadDeinit != NULL) {
502  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
503  if (r != TM_ECODE_OK) {
505  goto error;
506  }
507  }
508  }
509 
510  SCLogDebug("%s ending", tv->name);
511  tv->stream_pq = NULL;
513  pthread_exit((void *) 0);
514  return NULL;
515 
516 error:
517  tv->stream_pq = NULL;
518  pthread_exit((void *) -1);
519  return NULL;
520 }
521 
522 static void *TmThreadsManagement(void *td)
523 {
524  ThreadVars *tv = (ThreadVars *)td;
525  TmSlot *s = (TmSlot *)tv->tm_slots;
526  TmEcode r = TM_ECODE_OK;
527 
528  BUG_ON(s == NULL);
529 
531 
532  if (tv->thread_setup_flags != 0)
534 
535  /* Drop the capabilities for this thread */
536  SCDropCaps(tv);
537 
538  SCLogDebug("%s starting", tv->name);
539 
540  if (s->SlotThreadInit != NULL) {
541  void *slot_data = NULL;
542  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
543  if (r != TM_ECODE_OK) {
545  pthread_exit((void *) -1);
546  return NULL;
547  }
548  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
549  }
550 
552 
554 
555  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
556  /* handle error */
557  if (r == TM_ECODE_FAILED) {
559  }
560 
563  }
564 
567 
568  if (s->SlotThreadExitPrintStats != NULL) {
569  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
570  }
571 
572  if (s->SlotThreadDeinit != NULL) {
573  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
574  if (r != TM_ECODE_OK) {
576  pthread_exit((void *) -1);
577  return NULL;
578  }
579  }
580 
582  pthread_exit((void *) 0);
583  return NULL;
584 }
585 
586 /**
587  * \brief We set the slot functions.
588  *
589  * \param tv Pointer to the TV to set the slot function for.
590  * \param name Name of the slot variant.
591  * \param fn_p Pointer to a custom slot function. Used only if slot variant
592  * "name" is "custom".
593  *
594  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
595  */
596 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
597 {
598  if (name == NULL) {
599  if (fn_p == NULL) {
600  printf("Both slot name and function pointer can't be NULL inside "
601  "TmThreadSetSlots\n");
602  goto error;
603  } else {
604  name = "custom";
605  }
606  }
607 
608  if (strcmp(name, "varslot") == 0) {
609  tv->tm_func = TmThreadsSlotVar;
610  } else if (strcmp(name, "pktacqloop") == 0) {
611  tv->tm_func = TmThreadsSlotPktAcqLoop;
612  } else if (strcmp(name, "management") == 0) {
613  tv->tm_func = TmThreadsManagement;
614  } else if (strcmp(name, "command") == 0) {
615  tv->tm_func = TmThreadsManagement;
616  } else if (strcmp(name, "custom") == 0) {
617  if (fn_p == NULL)
618  goto error;
619  tv->tm_func = fn_p;
620  } else {
621  printf("Error: Slot \"%s\" not supported\n", name);
622  goto error;
623  }
624 
625  return TM_ECODE_OK;
626 
627 error:
628  return TM_ECODE_FAILED;
629 }
630 
631 /**
632  * \brief Appends a new entry to the slots.
633  *
634  * \param tv TV the slot is attached to.
635  * \param tm TM to append.
636  * \param data Data to be passed on to the slot init function.
637  *
638  * \retval The allocated TmSlot or NULL if there is an error
639  */
640 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
641 {
642  TmSlot *slot = SCCalloc(1, sizeof(TmSlot));
643  if (unlikely(slot == NULL))
644  return;
645  SC_ATOMIC_INITPTR(slot->slot_data);
646  slot->SlotThreadInit = tm->ThreadInit;
647  slot->slot_initdata = data;
648  if (tm->Func) {
649  slot->SlotFunc = tm->Func;
650  } else if (tm->PktAcqLoop) {
651  slot->PktAcqLoop = tm->PktAcqLoop;
652  if (tm->PktAcqBreakLoop) {
653  tv->break_loop = true;
654  }
655  } else if (tm->Management) {
656  slot->Management = tm->Management;
657  }
659  slot->SlotThreadDeinit = tm->ThreadDeinit;
660  /* we don't have to check for the return value "-1". We wouldn't have
661  * received a TM as arg, if it didn't exist */
662  slot->tm_id = TmModuleGetIDForTM(tm);
663  slot->tm_flags |= tm->flags;
664 
665  tv->tmm_flags |= tm->flags;
666  tv->cap_flags |= tm->cap_flags;
667 
668  if (tv->tm_slots == NULL) {
669  tv->tm_slots = slot;
670  } else {
671  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
672 
673  /* get the last slot */
674  for ( ; a != NULL; a = a->slot_next) {
675  b = a;
676  }
677  /* append the new slot */
678  if (b != NULL) {
679  b->slot_next = slot;
680  }
681  }
682 }
683 
684 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
685 static int SetCPUAffinitySet(cpu_set_t *cs)
686 {
687 #if defined OS_FREEBSD
688  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
689  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
690 #elif OS_DARWIN
691  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
692  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
693 #else
694  pid_t tid = syscall(SYS_gettid);
695  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
696 #endif /* OS_FREEBSD */
697 
698  if (r != 0) {
699  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
700  strerror(errno));
701  return -1;
702  }
703 
704  return 0;
705 }
706 #endif
707 
708 
709 /**
710  * \brief Set the thread affinity on the calling thread.
711  *
712  * \param cpuid Id of the core/cpu to setup the affinity.
713  *
714  * \retval 0 If all goes well; -1 if something is wrong.
715  */
716 static int SetCPUAffinity(uint16_t cpuid)
717 {
718 #if defined __OpenBSD__ || defined sun
719  return 0;
720 #else
721  int cpu = (int)cpuid;
722 
723 #if defined OS_WIN32 || defined __CYGWIN__
724  DWORD cs = 1 << cpu;
725 
726  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
727  if (r != 0) {
728  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
729  strerror(errno));
730  return -1;
731  }
732  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
733  SCGetThreadIdLong(), cpu);
734 
735  return 0;
736 
737 #else
738  cpu_set_t cs;
739  memset(&cs, 0, sizeof(cs));
740 
741  CPU_ZERO(&cs);
742  CPU_SET(cpu, &cs);
743  return SetCPUAffinitySet(&cs);
744 #endif /* windows */
745 #endif /* not supported */
746 }
747 
748 
749 /**
750  * \brief Set the thread options (thread priority).
751  *
752  * \param tv Pointer to the ThreadVars to setup the thread priority.
753  *
754  * \retval TM_ECODE_OK.
755  */
757 {
759  tv->thread_priority = prio;
760 
761  return TM_ECODE_OK;
762 }
763 
764 /**
765  * \brief Adjusting nice value for threads.
766  */
768 {
769  SCEnter();
770 #ifndef __CYGWIN__
771 #ifdef OS_WIN32
772  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
773  SCLogError("Error setting priority for "
774  "thread %s: %s",
775  tv->name, strerror(errno));
776  } else {
777  SCLogDebug("Priority set to %"PRId32" for thread %s",
779  }
780 #else
781  int ret = nice(tv->thread_priority);
782  if (ret == -1) {
783  SCLogError("Error setting nice value %d "
784  "for thread %s: %s",
785  tv->thread_priority, tv->name, strerror(errno));
786  } else {
787  SCLogDebug("Nice value set to %"PRId32" for thread %s",
789  }
790 #endif /* OS_WIN32 */
791 #endif
792  SCReturn;
793 }
794 
795 
796 /**
797  * \brief Set the thread options (cpu affinity).
798  *
799  * \param tv pointer to the ThreadVars to setup the affinity.
800  * \param cpu cpu on which affinity is set.
801  *
802  * \retval TM_ECODE_OK
803  */
805 {
807  tv->cpu_affinity = cpu;
808 
809  return TM_ECODE_OK;
810 }
811 
812 
814 {
816  return TM_ECODE_OK;
817 
818  if (type > MAX_CPU_SET) {
819  SCLogError("invalid cpu type family");
820  return TM_ECODE_FAILED;
821  }
822 
824  tv->cpu_affinity = type;
825 
826  return TM_ECODE_OK;
827 }
828 
830 {
831  if (type >= MAX_CPU_SET) {
832  SCLogError("invalid cpu type family");
833  return 0;
834  }
835 
837 }
838 
839 /**
840  * \brief Set the thread options (cpu affinitythread).
841  * Priority should be already set by pthread_create.
842  *
843  * \param tv pointer to the ThreadVars of the calling thread.
844  */
846 {
848  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
849  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
851  SetCPUAffinity(tv->cpu_affinity);
852  }
853 
854 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
859  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
860  uint16_t cpu = AffinityGetNextCPU(taf);
861  SetCPUAffinity(cpu);
862  /* If CPU is in a set overwrite the default thread prio */
863  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
865  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
867  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
869  } else {
870  tv->thread_priority = taf->prio;
871  }
872  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
873  "%d, thread id %lu", tv->thread_priority,
874  tv->name, cpu, SCGetThreadIdLong());
875  } else {
876  SetCPUAffinitySet(&taf->cpu_set);
877  tv->thread_priority = taf->prio;
878  SCLogPerf("Setting prio %d for thread \"%s\", "
879  "thread id %lu", tv->thread_priority,
881  }
883  }
884 #endif
885 
886  return TM_ECODE_OK;
887 }
888 
889 /**
890  * \brief Creates and returns the TV instance for a new thread.
891  *
892  * \param name Name of this TV instance
893  * \param inq_name Incoming queue name
894  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
895  * \param outq_name Outgoing queue name
896  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
897  * \param slots String representation for the slot function to be used
898  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
899  * \param mucond Flag to indicate whether to initialize the condition
900  * and the mutex variables for this newly created TV.
901  *
902  * \retval the newly created TV instance, or NULL on error
903  */
904 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
905  const char *outq_name, const char *outqh_name, const char *slots,
906  void * (*fn_p)(void *), int mucond)
907 {
908  ThreadVars *tv = NULL;
909  Tmq *tmq = NULL;
910  Tmqh *tmqh = NULL;
911 
912  SCLogDebug("creating thread \"%s\"...", name);
913 
914  /* XXX create separate function for this: allocate a thread container */
915  tv = SCCalloc(1, sizeof(ThreadVars));
916  if (unlikely(tv == NULL))
917  goto error;
918 
919  SC_ATOMIC_INIT(tv->flags);
920  SCMutexInit(&tv->perf_public_ctx.m, NULL);
921 
922  strlcpy(tv->name, name, sizeof(tv->name));
923 
924  /* default state for every newly created thread */
927 
928  /* set the incoming queue */
929  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
930  SCLogDebug("inq_name \"%s\"", inq_name);
931 
932  tmq = TmqGetQueueByName(inq_name);
933  if (tmq == NULL) {
934  tmq = TmqCreateQueue(inq_name);
935  if (tmq == NULL)
936  goto error;
937  }
938  SCLogDebug("tmq %p", tmq);
939 
940  tv->inq = tmq;
941  tv->inq->reader_cnt++;
942  SCLogDebug("tv->inq %p", tv->inq);
943  }
944  if (inqh_name != NULL) {
945  SCLogDebug("inqh_name \"%s\"", inqh_name);
946 
947  int id = TmqhNameToID(inqh_name);
948  if (id <= 0) {
949  goto error;
950  }
951  tmqh = TmqhGetQueueHandlerByName(inqh_name);
952  if (tmqh == NULL)
953  goto error;
954 
955  tv->tmqh_in = tmqh->InHandler;
956  tv->inq_id = (uint8_t)id;
957  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
958  }
959 
960  /* set the outgoing queue */
961  if (outqh_name != NULL) {
962  SCLogDebug("outqh_name \"%s\"", outqh_name);
963 
964  int id = TmqhNameToID(outqh_name);
965  if (id <= 0) {
966  goto error;
967  }
968 
969  tmqh = TmqhGetQueueHandlerByName(outqh_name);
970  if (tmqh == NULL)
971  goto error;
972 
973  tv->tmqh_out = tmqh->OutHandler;
974  tv->outq_id = (uint8_t)id;
975 
976  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
977  SCLogDebug("outq_name \"%s\"", outq_name);
978 
979  if (tmqh->OutHandlerCtxSetup != NULL) {
980  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
981  if (tv->outctx == NULL)
982  goto error;
983  tv->outq = NULL;
984  } else {
985  tmq = TmqGetQueueByName(outq_name);
986  if (tmq == NULL) {
987  tmq = TmqCreateQueue(outq_name);
988  if (tmq == NULL)
989  goto error;
990  }
991  SCLogDebug("tmq %p", tmq);
992 
993  tv->outq = tmq;
994  tv->outctx = NULL;
995  tv->outq->writer_cnt++;
996  }
997  }
998  }
999 
1000  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1001  goto error;
1002  }
1003 
1004  if (mucond != 0)
1005  TmThreadInitMC(tv);
1006 
1007  return tv;
1008 
1009 error:
1010  SCLogError("failed to setup a thread");
1011 
1012  if (tv != NULL)
1013  SCFree(tv);
1014  return NULL;
1015 }
1016 
1017 /**
1018  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1019  * This function doesn't support custom slots, and hence shouldn't be
1020  * supplied \"custom\" as its slot type. All PPT threads are created
1021  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1022  * conditional variables are not used to kill the thread.
1023  *
1024  * \param name Name of this TV instance
1025  * \param inq_name Incoming queue name
1026  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1027  * \param outq_name Outgoing queue name
1028  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1029  * \param slots String representation for the slot function to be used
1030  *
1031  * \retval the newly created TV instance, or NULL on error
1032  */
1033 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1034  const char *inqh_name, const char *outq_name,
1035  const char *outqh_name, const char *slots)
1036 {
1037  ThreadVars *tv = NULL;
1038 
1039  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1040  slots, NULL, 0);
1041 
1042  if (tv != NULL) {
1043  tv->type = TVT_PPT;
1045  }
1046 
1047  return tv;
1048 }
1049 
1050 /**
1051  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1052  * This function supports only custom slot functions and hence a
1053  * function pointer should be sent as an argument.
1054  *
1055  * \param name Name of this TV instance
1056  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1057  * \param mucond Flag to indicate whether to initialize the condition
1058  * and the mutex variables for this newly created TV.
1059  *
1060  * \retval the newly created TV instance, or NULL on error
1061  */
1062 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1063  int mucond)
1064 {
1065  ThreadVars *tv = NULL;
1066 
1067  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1068 
1069  if (tv != NULL) {
1070  tv->type = TVT_MGMT;
1073  }
1074 
1075  return tv;
1076 }
1077 
1078 /**
1079  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1080  * This function supports only custom slot functions and hence a
1081  * function pointer should be sent as an argument.
1082  *
1083  * \param name Name of this TV instance
1084  * \param module Name of TmModule with MANAGEMENT flag set.
1085  * \param mucond Flag to indicate whether to initialize the condition
1086  * and the mutex variables for this newly created TV.
1087  *
1088  * \retval the newly created TV instance, or NULL on error
1089  */
1090 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1091  int mucond)
1092 {
1093  ThreadVars *tv = NULL;
1094 
1095  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1096 
1097  if (tv != NULL) {
1098  tv->type = TVT_MGMT;
1101 
1102  TmModule *m = TmModuleGetByName(module);
1103  if (m) {
1104  TmSlotSetFuncAppend(tv, m, NULL);
1105  }
1106  }
1107 
1108  return tv;
1109 }
1110 
1111 /**
1112  * \brief Creates and returns the TV instance for a Command thread (CMD).
1113  * This function supports only custom slot functions and hence a
1114  * function pointer should be sent as an argument.
1115  *
1116  * \param name Name of this TV instance
1117  * \param module Name of TmModule with COMMAND flag set.
1118  * \param mucond Flag to indicate whether to initialize the condition
1119  * and the mutex variables for this newly created TV.
1120  *
1121  * \retval the newly created TV instance, or NULL on error
1122  */
1123 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1124  int mucond)
1125 {
1126  ThreadVars *tv = NULL;
1127 
1128  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1129 
1130  if (tv != NULL) {
1131  tv->type = TVT_CMD;
1134 
1135  TmModule *m = TmModuleGetByName(module);
1136  if (m) {
1137  TmSlotSetFuncAppend(tv, m, NULL);
1138  }
1139  }
1140 
1141  return tv;
1142 }
1143 
1144 /**
1145  * \brief Appends this TV to tv_root based on its type
1146  *
1147  * \param type holds the type this TV belongs to.
1148  */
1150 {
1152 
1153  if (tv_root[type] == NULL) {
1154  tv_root[type] = tv;
1155  tv->next = NULL;
1156 
1158 
1159  return;
1160  }
1161 
1162  ThreadVars *t = tv_root[type];
1163 
1164  while (t) {
1165  if (t->next == NULL) {
1166  t->next = tv;
1167  tv->next = NULL;
1168  break;
1169  }
1170 
1171  t = t->next;
1172  }
1173 
1175 }
1176 
1177 static bool ThreadStillHasPackets(ThreadVars *tv)
1178 {
1179  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1180  /* we wait till we dry out all the inq packets, before we
1181  * kill this thread. Do note that you should have disabled
1182  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1183  PacketQueue *q = tv->inq->pq;
1184  SCMutexLock(&q->mutex_q);
1185  uint32_t len = q->len;
1186  SCMutexUnlock(&q->mutex_q);
1187  if (len != 0) {
1188  return true;
1189  }
1190  }
1191 
1192  if (tv->stream_pq != NULL) {
1194  uint32_t len = tv->stream_pq->len;
1196 
1197  if (len != 0) {
1198  return true;
1199  }
1200  }
1201  return false;
1202 }
1203 
1204 /**
1205  * \brief Kill a thread.
1206  *
1207  * \param tv A ThreadVars instance corresponding to the thread that has to be
1208  * killed.
1209  *
1210  * \retval r 1 killed successfully
1211  * 0 not yet ready, needs another look
1212  */
1213 static int TmThreadKillThread(ThreadVars *tv)
1214 {
1215  BUG_ON(tv == NULL);
1216 
1217  /* kill only once :) */
1218  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1219  return 1;
1220  }
1221 
1222  /* set the thread flag informing the thread that it needs to be
1223  * terminated */
1226 
1227  /* to be sure, signal more */
1228  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1229  if (tv->inq_id != TMQH_NOT_SET) {
1231  if (qh != NULL && qh->InShutdownHandler != NULL) {
1232  qh->InShutdownHandler(tv);
1233  }
1234  }
1235  if (tv->inq != NULL) {
1236  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1237  SCMutexLock(&tv->inq->pq->mutex_q);
1238  SCCondSignal(&tv->inq->pq->cond_q);
1239  SCMutexUnlock(&tv->inq->pq->mutex_q);
1240  }
1241  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1242  }
1243 
1244  if (tv->ctrl_cond != NULL ) {
1246  pthread_cond_broadcast(tv->ctrl_cond);
1248  }
1249  return 0;
1250  }
1251 
1252  if (tv->outctx != NULL) {
1253  if (tv->outq_id != TMQH_NOT_SET) {
1255  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1256  qh->OutHandlerCtxFree(tv->outctx);
1257  tv->outctx = NULL;
1258  }
1259  }
1260  }
1261 
1262  /* join it and flag it as dead */
1263  pthread_join(tv->t, NULL);
1264  SCLogDebug("thread %s stopped", tv->name);
1266  return 1;
1267 }
1268 
1269 static bool ThreadBusy(ThreadVars *tv)
1270 {
1271  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
1272  TmModule *tm = TmModuleGetById(s->tm_id);
1273  if (tm && tm->ThreadBusy != NULL) {
1274  if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
1275  return true;
1276  }
1277  }
1278  return false;
1279 }
1280 
1281 /** \internal
1282  *
1283  * \brief make sure that all packet threads are done processing their
1284  * in-flight packets, including 'injected' flow packets.
1285  */
1286 static void TmThreadDrainPacketThreads(void)
1287 {
1288  ThreadVars *tv = NULL;
1289  struct timeval start_ts;
1290  struct timeval cur_ts;
1291  gettimeofday(&start_ts, NULL);
1292 
1293 again:
1294  gettimeofday(&cur_ts, NULL);
1295  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1296  SCLogWarning("unable to get all packet threads "
1297  "to process their packets in time");
1298  return;
1299  }
1300 
1302 
1303  /* all receive threads are part of packet processing threads */
1304  tv = tv_root[TVT_PPT];
1305  while (tv) {
1306  if (ThreadStillHasPackets(tv)) {
1307  /* we wait till we dry out all the inq packets, before we
1308  * kill this thread. Do note that you should have disabled
1309  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1311 
1312  /* sleep outside lock */
1313  SleepMsec(1);
1314  goto again;
1315  }
1316  if (ThreadBusy(tv)) {
1318 
1319  Packet *p = PacketGetFromAlloc();
1320  if (p != NULL) {
1323  PacketQueue *q = tv->stream_pq;
1324  SCMutexLock(&q->mutex_q);
1325  PacketEnqueue(q, p);
1326  SCCondSignal(&q->cond_q);
1327  SCMutexUnlock(&q->mutex_q);
1328  }
1329 
1330  /* don't sleep while holding a lock */
1331  SleepMsec(1);
1332  goto again;
1333  }
1334  tv = tv->next;
1335  }
1336 
1338 }
1339 
1340 /**
1341  * \brief Disable all threads having the specified TMs.
1342  *
1343  * Breaks out of the packet acquisition loop, and bumps
1344  * into the 'flow loop', where it will process packets
1345  * from the flow engine's shutdown handling.
1346  */
1348 {
1349  ThreadVars *tv = NULL;
1350  struct timeval start_ts;
1351  struct timeval cur_ts;
1352  gettimeofday(&start_ts, NULL);
1353 
1354 again:
1355  gettimeofday(&cur_ts, NULL);
1356  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1357  FatalError("Engine unable to disable detect "
1358  "thread - \"%s\". Killing engine",
1359  tv->name);
1360  }
1361 
1363 
1364  /* all receive threads are part of packet processing threads */
1365  tv = tv_root[TVT_PPT];
1366 
1367  /* we do have to keep in mind that TVs are arranged in the order
1368  * right from receive to log. The moment we fail to find a
1369  * receive TM amongst the slots in a tv, it indicates we are done
1370  * with all receive threads */
1371  while (tv) {
1372  int disable = 0;
1373  TmModule *tm = NULL;
1374  /* obtain the slots for this TV */
1375  TmSlot *slots = tv->tm_slots;
1376  while (slots != NULL) {
1377  tm = TmModuleGetById(slots->tm_id);
1378 
1379  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1380  disable = 1;
1381  break;
1382  }
1383 
1384  slots = slots->slot_next;
1385  continue;
1386  }
1387 
1388  if (disable) {
1389  if (ThreadStillHasPackets(tv)) {
1390  /* we wait till we dry out all the inq packets, before we
1391  * kill this thread. Do note that you should have disabled
1392  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1394  /* don't sleep while holding a lock */
1395  SleepMsec(1);
1396  goto again;
1397  }
1398 
1399  if (ThreadBusy(tv)) {
1401 
1402  Packet *p = PacketGetFromAlloc();
1403  if (p != NULL) {
1406  PacketQueue *q = tv->stream_pq;
1407  SCMutexLock(&q->mutex_q);
1408  PacketEnqueue(q, p);
1409  SCCondSignal(&q->cond_q);
1410  SCMutexUnlock(&q->mutex_q);
1411  }
1412 
1413  /* don't sleep while holding a lock */
1414  SleepMsec(1);
1415  goto again;
1416  }
1417 
1418  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1419  if (tm && tm->PktAcqBreakLoop != NULL) {
1420  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1421  }
1423 
1424  if (tv->inq != NULL) {
1425  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1426  SCMutexLock(&tv->inq->pq->mutex_q);
1427  SCCondSignal(&tv->inq->pq->cond_q);
1428  SCMutexUnlock(&tv->inq->pq->mutex_q);
1429  }
1430  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1431  }
1432 
1433  /* wait for it to enter the 'flow loop' stage */
1434  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1436 
1437  SleepMsec(1);
1438  goto again;
1439  }
1440  }
1441 
1442  tv = tv->next;
1443  }
1444 
1446 
1447  /* finally wait for all packet threads to have
1448  * processed all of their 'live' packets so we
1449  * don't process the last live packets together
1450  * with FFR packets */
1451  TmThreadDrainPacketThreads();
1452 }
1453 
1454 #ifdef DEBUG_VALIDATION
1455 static void TmThreadDumpThreads(void);
1456 #endif
1457 
1458 static void TmThreadDebugValidateNoMorePackets(void)
1459 {
1460 #ifdef DEBUG_VALIDATION
1462  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1463  if (ThreadStillHasPackets(tv)) {
1465  TmThreadDumpThreads();
1466  abort();
1467  }
1468  }
1470 #endif
1471 }
1472 
1473 /**
1474  * \brief Disable all packet threads
1475  */
1477 {
1478  struct timeval start_ts;
1479  struct timeval cur_ts;
1480 
1481  /* first drain all packet threads of their packets */
1482  TmThreadDrainPacketThreads();
1483 
1484  /* since all the threads possibly able to produce more packets
1485  * are now gone or inactive, we should see no packets anywhere
1486  * anymore. */
1487  TmThreadDebugValidateNoMorePackets();
1488 
1489  gettimeofday(&start_ts, NULL);
1490 again:
1491  gettimeofday(&cur_ts, NULL);
1492  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1493  FatalError("Engine unable to disable packet "
1494  "threads. Killing engine");
1495  }
1496 
1497  /* loop through the packet threads and kill them */
1499  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1501 
1502  /* separate worker threads (autofp) will still wait at their
1503  * input queues. So nudge them here so they will observe the
1504  * THV_KILL flag. */
1505  if (tv->inq != NULL) {
1506  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1507  SCMutexLock(&tv->inq->pq->mutex_q);
1508  SCCondSignal(&tv->inq->pq->cond_q);
1509  SCMutexUnlock(&tv->inq->pq->mutex_q);
1510  }
1511  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1512  }
1513 
1516 
1517  SleepMsec(1);
1518  goto again;
1519  }
1520  }
1522 }
1523 
1524 #define MIN_WAIT_TIME 100
1525 #define MAX_WAIT_TIME 999999
1527 {
1528  ThreadVars *tv = NULL;
1529  unsigned int sleep_usec = MIN_WAIT_TIME;
1530 
1531  BUG_ON((family < 0) || (family >= TVT_MAX));
1532 
1533 again:
1535  tv = tv_root[family];
1536 
1537  while (tv) {
1538  int r = TmThreadKillThread(tv);
1539  if (r == 0) {
1541  SleepUsec(sleep_usec);
1542  sleep_usec *= 2; /* slowly back off */
1543  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1544  goto again;
1545  }
1546  sleep_usec = MIN_WAIT_TIME; /* reset */
1547 
1548  tv = tv->next;
1549  }
1551 }
1552 #undef MIN_WAIT_TIME
1553 #undef MAX_WAIT_TIME
1554 
1556 {
1557  int i = 0;
1558 
1559  for (i = 0; i < TVT_MAX; i++) {
1561  }
1562 }
1563 
1564 static void TmThreadFree(ThreadVars *tv)
1565 {
1566  TmSlot *s;
1567  TmSlot *ps;
1568  if (tv == NULL)
1569  return;
1570 
1571  SCLogDebug("Freeing thread '%s'.", tv->name);
1572 
1573  if (tv->flow_queue) {
1574  BUG_ON(tv->flow_queue->qlen != 0);
1575  SCFree(tv->flow_queue);
1576  }
1577 
1579 
1580  TmThreadDeinitMC(tv);
1581 
1582  if (tv->thread_group_name) {
1584  }
1585 
1586  if (tv->printable_name) {
1588  }
1589 
1590  if (tv->stream_pq_local) {
1594  }
1595 
1596  s = (TmSlot *)tv->tm_slots;
1597  while (s) {
1598  ps = s;
1599  s = s->slot_next;
1600  SCFree(ps);
1601  }
1602 
1604  SCFree(tv);
1605 }
1606 
1607 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1608 {
1609  char *thread_group_name = NULL;
1610 
1611  if (name == NULL)
1612  return;
1613 
1614  if (tv == NULL)
1615  return;
1616 
1617  thread_group_name = SCStrdup(name);
1618  if (unlikely(thread_group_name == NULL)) {
1619  SCLogError("error allocating memory");
1620  return;
1621  }
1622  tv->thread_group_name = thread_group_name;
1623 }
1624 
1626 {
1627  ThreadVars *tv = NULL;
1628  ThreadVars *ptv = NULL;
1629 
1630  if ((family < 0) || (family >= TVT_MAX))
1631  return;
1632 
1634  tv = tv_root[family];
1635 
1636  while (tv) {
1637  ptv = tv;
1638  tv = tv->next;
1639  TmThreadFree(ptv);
1640  }
1641  tv_root[family] = NULL;
1643 }
1644 
1645 /**
1646  * \brief Spawns a thread associated with the ThreadVars instance tv
1647  *
1648  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1649  */
1651 {
1652  pthread_attr_t attr;
1653  if (tv->tm_func == NULL) {
1654  FatalError("No thread function set");
1655  }
1656 
1657  /* Initialize and set thread detached attribute */
1658  pthread_attr_init(&attr);
1659 
1660  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1661 
1662  /* Adjust thread stack size if configured */
1664  SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1665  if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1666  FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
1668  }
1669  }
1670 
1671  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1672  if (rc) {
1673  FatalError("Unable to create thread with pthread_create(): retval %d: %s", rc,
1674  strerror(errno));
1675  }
1676 
1677 #if DEBUG && HAVE_PTHREAD_GETATTR_NP
1679  if (pthread_getattr_np(tv->t, &attr) == 0) {
1680  size_t stack_size;
1681  void *stack_addr;
1682  pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1683  SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1684  } else {
1685  SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1686  "pthread_getattr_np() is %" PRId32,
1687  rc);
1688  }
1689  }
1690 #endif
1691 
1693 
1694  TmThreadAppend(tv, tv->type);
1695  return TM_ECODE_OK;
1696 }
1697 
1698 /**
1699  * \brief Initializes the mutex and condition variables for this TV
1700  *
1701  * It can be used by a thread to control a wait loop that can also be
1702  * influenced by other threads.
1703  *
1704  * \param tv Pointer to a TV instance
1705  */
1707 {
1708  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1709  FatalError("Fatal error encountered in TmThreadInitMC. "
1710  "Exiting...");
1711  }
1712 
1713  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1714  printf("Error initializing the tv->m mutex\n");
1715  exit(EXIT_FAILURE);
1716  }
1717 
1718  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1719  FatalError("Fatal error encountered in TmThreadInitMC. "
1720  "Exiting...");
1721  }
1722 
1723  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1724  FatalError("Error initializing the tv->cond condition "
1725  "variable");
1726  }
1727 }
1728 
1729 static void TmThreadDeinitMC(ThreadVars *tv)
1730 {
1731  if (tv->ctrl_mutex) {
1733  SCFree(tv->ctrl_mutex);
1734  }
1735  if (tv->ctrl_cond) {
1737  SCFree(tv->ctrl_cond);
1738  }
1739 }
1740 
1741 /**
1742  * \brief Tests if the thread represented in the arg has been unpaused or not.
1743  *
1744  * The function would return if the thread tv has been unpaused or if the
1745  * kill flag for the thread has been set.
1746  *
1747  * \param tv Pointer to the TV instance.
1748  */
1750 {
1751  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1752  SleepUsec(100);
1753 
1755  break;
1756  }
1757 }
1758 
1759 /**
1760  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1761  * the kill flag has been set or not on the thread.
1762  *
1763  * \param tv Pointer to the TV instance.
1764  */
1766 {
1767  while (!TmThreadsCheckFlag(tv, flags)) {
1768  SleepUsec(100);
1769  }
1770 }
1771 
1772 /**
1773  * \brief Unpauses a thread
1774  *
1775  * \param tv Pointer to a TV instance that has to be unpaused
1776  */
1778 {
1780 }
1781 
1782 static TmEcode WaitOnThreadsRunningByType(const int t)
1783 {
1784  struct timeval start_ts;
1785  struct timeval cur_ts;
1786  uint32_t thread_cnt = 0;
1787 
1788  /* on retries, this will init to the last thread that started up already */
1789  ThreadVars *tv_start = tv_root[t];
1791  for (ThreadVars *tv = tv_start; tv != NULL; tv = tv->next) {
1792  thread_cnt++;
1793  }
1795 
1796  /* give threads a second each to start up, plus a margin of a minute. */
1797  uint32_t time_budget = 60 + thread_cnt;
1798 
1799  gettimeofday(&start_ts, NULL);
1800 again:
1802  ThreadVars *tv = tv_start;
1803  while (tv != NULL) {
1806 
1807  SCLogError("thread \"%s\" failed to "
1808  "start: flags %04x",
1809  tv->name, SC_ATOMIC_GET(tv->flags));
1810  return TM_ECODE_FAILED;
1811  }
1812 
1815 
1816  /* 60 seconds provided for the thread to transition from
1817  * THV_INIT_DONE to THV_RUNNING */
1818  gettimeofday(&cur_ts, NULL);
1819  if (((uint32_t)cur_ts.tv_sec - (uint32_t)start_ts.tv_sec) > time_budget) {
1820  SCLogError("thread \"%s\" failed to "
1821  "start in time: flags %04x. Total threads: %u. Time budget %us",
1822  tv->name, SC_ATOMIC_GET(tv->flags), thread_cnt, time_budget);
1823  return TM_ECODE_FAILED;
1824  }
1825 
1826  /* sleep a little to give the thread some
1827  * time to start running */
1828  SleepUsec(100);
1829  goto again;
1830  }
1831  tv_start = tv;
1832 
1833  tv = tv->next;
1834  }
1836  return TM_ECODE_OK;
1837 }
1838 
1839 /**
1840  * \brief Waits for all threads to be in a running state
1841  *
1842  * \retval TM_ECODE_OK if all are running or error if a thread failed
1843  */
1845 {
1846  uint16_t RX_num = 0;
1847  uint16_t W_num = 0;
1848  uint16_t FM_num = 0;
1849  uint16_t FR_num = 0;
1850  uint16_t TX_num = 0;
1851 
1852  for (int i = 0; i < TVT_MAX; i++) {
1853  if (WaitOnThreadsRunningByType(i) != TM_ECODE_OK)
1854  return TM_ECODE_FAILED;
1855  }
1856 
1858  for (int i = 0; i < TVT_MAX; i++) {
1859  for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
1860  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1861  RX_num++;
1862  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1863  W_num++;
1864  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1865  TX_num++;
1866  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1867  FM_num++;
1868  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1869  FR_num++;
1870  }
1871  }
1873 
1874  /* Construct a welcome string displaying
1875  * initialized thread types and counts */
1876  uint16_t app_len = 32;
1877  uint16_t buf_len = 256;
1878 
1879  char append_str[app_len];
1880  char thread_counts[buf_len];
1881 
1882  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1883  if (RX_num > 0) {
1884  snprintf(append_str, app_len, "RX: %u ", RX_num);
1885  strlcat(thread_counts, append_str, buf_len);
1886  }
1887  if (W_num > 0) {
1888  snprintf(append_str, app_len, "W: %u ", W_num);
1889  strlcat(thread_counts, append_str, buf_len);
1890  }
1891  if (TX_num > 0) {
1892  snprintf(append_str, app_len, "TX: %u ", TX_num);
1893  strlcat(thread_counts, append_str, buf_len);
1894  }
1895  if (FM_num > 0) {
1896  snprintf(append_str, app_len, "FM: %u ", FM_num);
1897  strlcat(thread_counts, append_str, buf_len);
1898  }
1899  if (FR_num > 0) {
1900  snprintf(append_str, app_len, "FR: %u ", FR_num);
1901  strlcat(thread_counts, append_str, buf_len);
1902  }
1903  snprintf(append_str, app_len, " Engine started.");
1904  strlcat(thread_counts, append_str, buf_len);
1905  SCLogNotice("%s", thread_counts);
1906 
1907  return TM_ECODE_OK;
1908 }
1909 
1910 /**
1911  * \brief Unpauses all threads present in tv_root
1912  */
1914 {
1916  for (int i = 0; i < TVT_MAX; i++) {
1917  ThreadVars *tv = tv_root[i];
1918  while (tv != NULL) {
1920  tv = tv->next;
1921  }
1922  }
1924 }
1925 
1926 /**
1927  * \brief Used to check the thread for certain conditions of failure.
1928  */
1930 {
1932  for (int i = 0; i < TVT_MAX; i++) {
1933  ThreadVars *tv = tv_root[i];
1934  while (tv) {
1936  FatalError("thread %s failed", tv->name);
1937  }
1938  tv = tv->next;
1939  }
1940  }
1942 }
1943 
1944 /**
1945  * \brief Used to check if all threads have finished their initialization. On
1946  * finding an un-initialized thread, it waits till that thread completes
1947  * its initialization, before proceeding to the next thread.
1948  *
1949  * \retval TM_ECODE_OK all initialized properly
1950  * \retval TM_ECODE_FAILED failure
1951  */
1953 {
1954  struct timeval start_ts;
1955  struct timeval cur_ts;
1956  gettimeofday(&start_ts, NULL);
1957 
1958 again:
1960  for (int i = 0; i < TVT_MAX; i++) {
1961  ThreadVars *tv = tv_root[i];
1962  while (tv != NULL) {
1965 
1966  SCLogError("thread \"%s\" failed to "
1967  "initialize: flags %04x",
1968  tv->name, SC_ATOMIC_GET(tv->flags));
1969  return TM_ECODE_FAILED;
1970  }
1971 
1972  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
1974 
1975  gettimeofday(&cur_ts, NULL);
1976  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
1977  SCLogError("thread \"%s\" failed to "
1978  "initialize in time: flags %04x",
1979  tv->name, SC_ATOMIC_GET(tv->flags));
1980  return TM_ECODE_FAILED;
1981  }
1982 
1983  /* sleep a little to give the thread some
1984  * time to finish initialization */
1985  SleepUsec(100);
1986  goto again;
1987  }
1988 
1991  SCLogError("thread \"%s\" failed to "
1992  "initialize.",
1993  tv->name);
1994  return TM_ECODE_FAILED;
1995  }
1998  SCLogError("thread \"%s\" closed on "
1999  "initialization.",
2000  tv->name);
2001  return TM_ECODE_FAILED;
2002  }
2003 
2004  tv = tv->next;
2005  }
2006  }
2008 
2009  return TM_ECODE_OK;
2010 }
2011 
2012 /**
2013  * \brief returns a count of all the threads that match the flag
2014  */
2016 {
2017  uint32_t cnt = 0;
2019  for (int i = 0; i < TVT_MAX; i++) {
2020  ThreadVars *tv = tv_root[i];
2021  while (tv != NULL) {
2022  if ((tv->tmm_flags & flags) == flags)
2023  cnt++;
2024 
2025  tv = tv->next;
2026  }
2027  }
2029  return cnt;
2030 }
2031 
2032 #ifdef DEBUG_VALIDATION
2033 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2034 {
2035  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2037  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2038  tv, s, s->tm_id, m->name);
2039  }
2040 }
2041 
2042 static void TmThreadDumpThreads(void)
2043 {
2045  for (int i = 0; i < TVT_MAX; i++) {
2046  ThreadVars *tv = tv_root[i];
2047  while (tv != NULL) {
2048  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2049  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2050  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2051  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2052  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2053  } else if (tv->stream_pq_local != NULL) {
2054  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2055  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2056  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2057  }
2058  }
2059  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2060  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2061  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2062  }
2063  TmThreadDoDumpSlots(tv);
2064  tv = tv->next;
2065  }
2066  }
2069 }
2070 #endif
2071 
2072 typedef struct Thread_ {
2073  ThreadVars *tv; /**< threadvars structure */
2074  const char *name;
2075  int type;
2076  int in_use; /**< bool to indicate this is in use */
2077 
2078  SCTime_t pktts; /**< current packet time of this thread
2079  * (offline mode) */
2080  uint32_t sys_sec_stamp; /**< timestamp in seconds of the real system
2081  * time when the pktts was last updated. */
2083 
2084 typedef struct Threads_ {
2089 
2090 static Threads thread_store = { NULL, 0, 0 };
2091 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2092 
2094 {
2095  SCMutexLock(&thread_store_lock);
2096  for (size_t s = 0; s < thread_store.threads_size; s++) {
2097  Thread *t = &thread_store.threads[s];
2098  if (t == NULL || t->in_use == 0)
2099  continue;
2100 
2101  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2102  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2103  if (t->tv) {
2104  ThreadVars *tv = t->tv;
2105  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2106  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2107  tv, tv->type, tv->name, tv->tmm_flags, flags);
2108  }
2109  }
2110  SCMutexUnlock(&thread_store_lock);
2111 }
2112 
2113 #define STEP 32
2114 /**
2115  * \retval id thread id, or 0 if not found
2116  */
2118 {
2119  SCMutexLock(&thread_store_lock);
2120  if (thread_store.threads == NULL) {
2121  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2122  BUG_ON(thread_store.threads == NULL);
2123  thread_store.threads_size = STEP;
2124  }
2125 
2126  size_t s;
2127  for (s = 0; s < thread_store.threads_size; s++) {
2128  if (thread_store.threads[s].in_use == 0) {
2129  Thread *t = &thread_store.threads[s];
2130  t->name = tv->name;
2131  t->type = type;
2132  t->tv = tv;
2133  t->in_use = 1;
2134 
2135  SCMutexUnlock(&thread_store_lock);
2136  return (int)(s+1);
2137  }
2138  }
2139 
2140  /* if we get here the array is completely filled */
2141  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2142  BUG_ON(newmem == NULL);
2143  thread_store.threads = newmem;
2144  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2145 
2146  Thread *t = &thread_store.threads[thread_store.threads_size];
2147  t->name = tv->name;
2148  t->type = type;
2149  t->tv = tv;
2150  t->in_use = 1;
2151 
2152  s = thread_store.threads_size;
2153  thread_store.threads_size += STEP;
2154 
2155  SCMutexUnlock(&thread_store_lock);
2156  return (int)(s+1);
2157 }
2158 #undef STEP
2159 
2160 void TmThreadsUnregisterThread(const int id)
2161 {
2162  SCMutexLock(&thread_store_lock);
2163  if (id <= 0 || id > (int)thread_store.threads_size) {
2164  SCMutexUnlock(&thread_store_lock);
2165  return;
2166  }
2167 
2168  /* id is one higher than index */
2169  int idx = id - 1;
2170 
2171  /* reset thread_id, which serves as clearing the record */
2172  thread_store.threads[idx].in_use = 0;
2173 
2174  /* check if we have at least one registered thread left */
2175  size_t s;
2176  for (s = 0; s < thread_store.threads_size; s++) {
2177  Thread *t = &thread_store.threads[s];
2178  if (t->in_use == 1) {
2179  goto end;
2180  }
2181  }
2182 
2183  /* if we get here no threads are registered */
2184  SCFree(thread_store.threads);
2185  thread_store.threads = NULL;
2186  thread_store.threads_size = 0;
2187  thread_store.threads_cnt = 0;
2188 
2189 end:
2190  SCMutexUnlock(&thread_store_lock);
2191 }
2192 
2193 void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2194 {
2195  SCMutexLock(&thread_store_lock);
2196  if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2197  SCMutexUnlock(&thread_store_lock);
2198  return;
2199  }
2200 
2201  int idx = id - 1;
2202  Thread *t = &thread_store.threads[idx];
2203  t->pktts = ts;
2204  struct timeval systs;
2205  gettimeofday(&systs, NULL);
2206  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2207  SCMutexUnlock(&thread_store_lock);
2208 }
2209 
2211 {
2212  bool ready = true;
2213  SCMutexLock(&thread_store_lock);
2214  for (size_t s = 0; s < thread_store.threads_size; s++) {
2215  Thread *t = &thread_store.threads[s];
2216  if (!t->in_use)
2217  break;
2218  if (t->type != TVT_PPT)
2219  continue;
2220  if (t->sys_sec_stamp == 0) {
2221  ready = false;
2222  break;
2223  }
2224  }
2225  SCMutexUnlock(&thread_store_lock);
2226  return ready;
2227 }
2228 
2230 {
2231  struct timeval systs;
2232  gettimeofday(&systs, NULL);
2233  SCMutexLock(&thread_store_lock);
2234  for (size_t s = 0; s < thread_store.threads_size; s++) {
2235  Thread *t = &thread_store.threads[s];
2236  if (!t->in_use)
2237  break;
2238  if (t->type != TVT_PPT)
2239  continue;
2240  t->pktts = ts;
2241  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2242  }
2243  SCMutexUnlock(&thread_store_lock);
2244 }
2245 
2246 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2247 {
2248  struct timeval local = { 0 };
2249  static struct timeval nullts;
2250  bool set = false;
2251  size_t s;
2252  struct timeval systs;
2253  gettimeofday(&systs, NULL);
2254 
2255  SCMutexLock(&thread_store_lock);
2256  for (s = 0; s < thread_store.threads_size; s++) {
2257  Thread *t = &thread_store.threads[s];
2258  if (t->in_use == 0)
2259  break;
2260  /* only packet threads set timestamps based on packets */
2261  if (t->type != TVT_PPT)
2262  continue;
2263  struct timeval pkttv = { .tv_sec = SCTIME_SECS(t->pktts),
2264  .tv_usec = SCTIME_USECS(t->pktts) };
2265  if (!(timercmp(&pkttv, &nullts, ==))) {
2266  /* ignore sleeping threads */
2267  if (t->sys_sec_stamp + 1 < (uint32_t)systs.tv_sec)
2268  continue;
2269 
2270  if (!set) {
2271  SCTIME_TO_TIMEVAL(&local, t->pktts);
2272  set = true;
2273  } else {
2274  if (SCTIME_CMP_LT(t->pktts, SCTIME_FROM_TIMEVAL(&local))) {
2275  SCTIME_TO_TIMEVAL(&local, t->pktts);
2276  }
2277  }
2278  }
2279  }
2280  SCMutexUnlock(&thread_store_lock);
2281  *ts = local;
2282  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2283 }
2284 
2286 {
2287  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2288  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2289  /* always create at least one thread */
2290  if (thread_max == 0)
2291  thread_max = ncpus * threading_detect_ratio;
2292  if (thread_max < 1)
2293  thread_max = 1;
2294  if (thread_max > 1024) {
2295  SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2296  thread_max = 1024;
2297  }
2298  return (uint16_t)thread_max;
2299 }
2300 
2301 /** \brief inject a flow into a threads flow queue
2302  */
2303 void TmThreadsInjectFlowById(Flow *f, const int id)
2304 {
2305  BUG_ON(id <= 0 || id > (int)thread_store.threads_size);
2306 
2307  int idx = id - 1;
2308 
2309  Thread *t = &thread_store.threads[idx];
2310  ThreadVars *tv = t->tv;
2311 
2312  BUG_ON(tv == NULL || tv->flow_queue == NULL);
2313 
2314  FlowEnqueue(tv->flow_queue, f);
2315 
2316  /* wake up listening thread(s) if necessary */
2317  if (tv->inq != NULL) {
2318  SCMutexLock(&tv->inq->pq->mutex_q);
2319  SCCondSignal(&tv->inq->pq->cond_q);
2320  SCMutexUnlock(&tv->inq->pq->mutex_q);
2321  } else if (tv->break_loop) {
2322  TmThreadsCaptureBreakLoop(tv);
2323  }
2324 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:67
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:804
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:74
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:76
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:70
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:134
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2210
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1706
tm-threads.h
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:70
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
THV_USE
#define THV_USE
Definition: threadvars.h:35
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:131
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1650
Threads
struct Threads_ Threads
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:61
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1090
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
threading_set_cpu_affinity
bool threading_set_cpu_affinity
Definition: runmodes.c:61
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:845
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:69
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:314
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:55
TmThreadContinueThreads
void TmThreadContinueThreads(void)
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1913
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1033
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:56
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:379
TmThreadSetGroupName
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1607
CaptureStatsSetup
void CaptureStatsSetup(ThreadVars *tv)
Definition: decode.c:983
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:104
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:47
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
PKT_SRC_SHUTDOWN_FLUSH
@ PKT_SRC_SHUTDOWN_FLUSH
Definition: decode.h:67
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:99
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1765
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:175
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:56
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:44
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:84
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:65
Packet_::flags
uint32_t flags
Definition: decode.h:516
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:75
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:360
ThreadVars_::t
pthread_t t
Definition: threadvars.h:58
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:303
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:70
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:54
Thread_::pktts
SCTime_t pktts
Definition: tm-threads.c:2078
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2015
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:103
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:65
StatsSetupPrivate
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1214
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:116
MIN
#define MIN(x, y)
Definition: suricata-common.h:391
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:80
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:73
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1347
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:387
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:121
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:756
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:89
m
SCMutex m
Definition: flow-hash.h:6
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:44
TmModule_::ThreadBusy
bool(* ThreadBusy)(ThreadVars *tv, void *thread_data)
Definition: tm-modules.h:64
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:141
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:314
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:83
rww_lock_contention
thread_local uint64_t rww_lock_contention
Threads_::threads
Thread * threads
Definition: tm-threads.c:2085
PacketQueueNoLock_
simple fifo queue for packets
Definition: packet-queue.h:34
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:173
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2113
Thread_::in_use
int in_use
Definition: tm-threads.c:2076
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:37
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(void)
Disable all packet threads.
Definition: tm-threads.c:1476
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:55
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:244
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:82
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2303
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:80
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2246
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:88
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmThreadsProcessDecodePseudoPackets
TmEcode TmThreadsProcessDecodePseudoPackets(ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
Definition: tm-threads.c:112
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:50
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:53
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:45
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1329
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:107
util-signal.h
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:78
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:767
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1524
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:89
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2073
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1777
AffinityGetNextCPU
uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
Definition: util-affinity.c:282
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:57
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:1952
Threads_::threads_cnt
int threads_cnt
Definition: tm-threads.c:2087
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:58
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:77
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:95
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:208
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:58
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
threading_set_stack_size
uint64_t threading_set_stack_size
Definition: runmodes.c:62
ThreadVars_::perf_public_ctx
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:127
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1275
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
Definition: tm-threads.c:2193
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
util-affinity.h
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:79
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1749
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:66
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:53
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1625
THV_KILL
#define THV_KILL
Definition: threadvars.h:39
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:904
FlowQueueNew
FlowQueue * FlowQueueNew(void)
Definition: flow-queue.c:35
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:824
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2160
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2117
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:92
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:90
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
ThreadVars_::id
int id
Definition: threadvars.h:86
ThreadVars_::type
uint8_t type
Definition: threadvars.h:71
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:300
SCTIME_TO_TIMEVAL
#define SCTIME_TO_TIMEVAL(tv, t)
Definition: util-time.h:97
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:99
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:83
SCReturn
#define SCReturn
Definition: util-debug.h:273
stream.h
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:376
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1555
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:139
Packet_
Definition: decode.h:479
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
type
uint16_t type
Definition: decode-vlan.c:107
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:132
ThreadVars_::thread_group_name
char * thread_group_name
Definition: threadvars.h:66
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:79
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1525
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:74
TmSlot_
Definition: tm-threads.h:53
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:67
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:68
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:81
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:91
runmodes.h
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1062
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:90
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
TmModule_
Definition: tm-modules.h:44
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:255
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
Definition: tm-threads.c:2229
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:115
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
SCTIME_CMP_LT
#define SCTIME_CMP_LT(a, b)
Definition: util-time.h:105
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:38
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:36
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:640
StatsPublicThreadContext_::m
SCMutex m
Definition: counters.h:75
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:378
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1526
Thread_::sys_sec_stamp
uint32_t sys_sec_stamp
Definition: tm-threads.c:2080
Thread_::type
int type
Definition: tm-threads.c:2075
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
Thread
struct Thread_ Thread
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:89
Packet_::flow
struct Flow_ * flow
Definition: decode.h:518
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1149
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:74
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:53
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:813
Threads_
Definition: tm-threads.c:2084
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:35
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:34
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:105
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:40
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:135
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:100
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:829
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
SCTIME_SECS
#define SCTIME_SECS(t)
Definition: util-time.h:57
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:72
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:48
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:77
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
FatalError
#define FatalError(...)
Definition: util-debug.h:502
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax(void)
Definition: tm-threads.c:2285
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:65
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1123
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
StatsSyncCounters
void StatsSyncCounters(ThreadVars *tv)
Definition: counters.c:449
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:142
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:49
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:216
threadvars.h
util-validate.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:232
ThreadsAffinityType_
Definition: util-affinity.h:65
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:37
THV_KILL_PKTACQ
#define THV_KILL_PKTACQ
Definition: threadvars.h:46
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:610
TmSlot_::tm_flags
uint8_t tm_flags
Definition: tm-threads.h:67
Thread_::name
const char * name
Definition: tm-threads.c:2074
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:131
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:50
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:90
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:74
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:68
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:73
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:317
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:83
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:111
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
suricata.h
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:89
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:458
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:140
Tmq_
Definition: tm-queues.h:29
TmThreadWaitOnThreadRunning
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
Definition: tm-threads.c:1844
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2093
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1929
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:131
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:82
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:53
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:69
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:108
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:274
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:91
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:41
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2072
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:275
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
Threads_::threads_size
size_t threads_size
Definition: tm-threads.c:2086
StatsThreadCleanup
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1313
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:62
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:267
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:102
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:77
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:934
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:359
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:170
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:350
SCTIME_USECS
#define SCTIME_USECS(t)
Definition: util-time.h:56