suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2020 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-debug.h"
39 #include "util-privs.h"
40 #include "util-cpu.h"
41 #include "util-optimize.h"
42 #include "util-profiling.h"
43 #include "util-signal.h"
44 #include "queue.h"
45 
46 #ifdef PROFILE_LOCKING
47 thread_local uint64_t mutex_lock_contention;
48 thread_local uint64_t mutex_lock_wait_ticks;
49 thread_local uint64_t mutex_lock_cnt;
50 
51 thread_local uint64_t spin_lock_contention;
52 thread_local uint64_t spin_lock_wait_ticks;
53 thread_local uint64_t spin_lock_cnt;
54 
55 thread_local uint64_t rww_lock_contention;
56 thread_local uint64_t rww_lock_wait_ticks;
57 thread_local uint64_t rww_lock_cnt;
58 
59 thread_local uint64_t rwr_lock_contention;
60 thread_local uint64_t rwr_lock_wait_ticks;
61 thread_local uint64_t rwr_lock_cnt;
62 #endif
63 
64 #ifdef OS_FREEBSD
65 #include <sched.h>
66 #include <sys/param.h>
67 #include <sys/resource.h>
68 #include <sys/cpuset.h>
69 #include <sys/thr.h>
70 #define cpu_set_t cpuset_t
71 #endif /* OS_FREEBSD */
72 
73 /* prototypes */
74 static int SetCPUAffinity(uint16_t cpu);
75 static void TmThreadDeinitMC(ThreadVars *tv);
76 
77 /* root of the threadvars list */
78 ThreadVars *tv_root[TVT_MAX] = { NULL };
79 
80 /* lock to protect tv_root */
82 
83 /**
84  * \brief Check if a thread flag is set.
85  *
86  * \retval 1 flag is set.
87  * \retval 0 flag is not set.
88  */
89 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
90 {
91  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
92 }
93 
94 /**
95  * \brief Set a thread flag.
96  */
97 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
98 {
99  SC_ATOMIC_OR(tv->flags, flag);
100 }
101 
102 /**
103  * \brief Unset a thread flag.
104  */
105 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
106 {
107  SC_ATOMIC_AND(tv->flags, ~flag);
108 }
109 
110 /**
111  * \brief Separate run function so we can call it recursively.
112  */
114 {
115  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
116  PACKET_PROFILING_TMM_START(p, s->tm_id);
117  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
118  PACKET_PROFILING_TMM_END(p, s->tm_id);
119 
120  /* handle error */
121  if (unlikely(r == TM_ECODE_FAILED)) {
122  /* Encountered error. Return packets to packetpool and return */
123  TmThreadsSlotProcessPktFail(tv, s, NULL);
124  return TM_ECODE_FAILED;
125  }
126 
127  /* handle new packets */
128  while (tv->decode_pq.top != NULL) {
129  Packet *extra_p = PacketDequeueNoLock(&tv->decode_pq);
130  if (unlikely(extra_p == NULL))
131  continue;
132 
133  /* see if we need to process the packet */
134  if (s->slot_next != NULL) {
135  r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
136  if (unlikely(r == TM_ECODE_FAILED)) {
137  TmThreadsSlotProcessPktFail(tv, s, extra_p);
138  return TM_ECODE_FAILED;
139  }
140  }
141  tv->tmqh_out(tv, extra_p);
142  }
143  }
144 
145  return TM_ECODE_OK;
146 }
147 
148 /** \internal
149  *
150  * \brief Process flow timeout packets
151  *
152  * Process flow timeout pseudo packets. During shutdown this loop
153  * is run until the flow engine kills the thread and the queue is
154  * empty.
155  */
156 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
157 {
158  TmSlot *fw_slot = tv->tm_flowworker;
159  int r = TM_ECODE_OK;
160 
161  if (tv->stream_pq == NULL || fw_slot == NULL) {
162  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
163  return r;
164  }
165 
166  SCLogDebug("flow end loop starting");
167  while (1) {
169  uint32_t len = tv->stream_pq->len;
171  if (len > 0) {
172  while (len--) {
176  if (likely(p)) {
177  if ((r = TmThreadsSlotProcessPkt(tv, fw_slot, p) != TM_ECODE_OK)) {
178  if (r == TM_ECODE_FAILED)
179  break;
180  }
181  }
182  }
183  } else {
185  break;
186  }
187  SleepUsec(1);
188  }
189  }
190  SCLogDebug("flow end loop complete");
192 
193  return r;
194 }
195 
196 /*
197 
198  pcap/nfq
199 
200  pkt read
201  callback
202  process_pkt
203 
204  pfring
205 
206  pkt read
207  process_pkt
208 
209  slot:
210  setup
211 
212  pkt_ack_loop(tv, slot_data)
213 
214  deinit
215 
216  process_pkt:
217  while(s)
218  run s;
219  queue;
220 
221  */
222 
223 static void *TmThreadsSlotPktAcqLoop(void *td)
224 {
225  ThreadVars *tv = (ThreadVars *)td;
226  TmSlot *s = tv->tm_slots;
227  char run = 1;
228  TmEcode r = TM_ECODE_OK;
229  TmSlot *slot = NULL;
230 
231  /* Set the thread name */
232  if (SCSetThreadName(tv->name) < 0) {
233  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
234  }
235 
236  if (tv->thread_setup_flags != 0)
238 
239  /* Drop the capabilities for this thread */
240  SCDropCaps(tv);
241 
242  PacketPoolInit();
243 
244  /* check if we are setup properly */
245  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
246  SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
247  " PktAcqLoop=%p, tmqh_in=%p,"
248  " tmqh_out=%p",
249  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
251  pthread_exit((void *) -1);
252  return NULL;
253  }
254 
255  for (slot = s; slot != NULL; slot = slot->slot_next) {
256  if (slot->SlotThreadInit != NULL) {
257  void *slot_data = NULL;
258  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
259  if (r != TM_ECODE_OK) {
260  if (r == TM_ECODE_DONE) {
261  EngineDone();
263  goto error;
264  } else {
266  goto error;
267  }
268  }
269  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
270  }
271 
272  /* if the flowworker module is the first, get the threads input queue */
273  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
274  tv->stream_pq = tv->inq->pq;
275  tv->tm_flowworker = slot;
276  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
278  if (tv->flow_queue == NULL) {
280  pthread_exit((void *) -1);
281  return NULL;
282  }
283  /* setup a queue */
284  } else if (slot->tm_id == TMM_FLOWWORKER) {
285  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
286  if (tv->stream_pq_local == NULL)
287  FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
290  tv->tm_flowworker = slot;
291  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
293  if (tv->flow_queue == NULL) {
295  pthread_exit((void *) -1);
296  return NULL;
297  }
298  }
299  }
300 
302 
304 
305  while(run) {
310  }
311 
312  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
313 
314  if (r == TM_ECODE_FAILED) {
316  run = 0;
317  }
319  run = 0;
320  }
321  if (r == TM_ECODE_DONE) {
322  run = 0;
323  }
324  }
326 
328 
329  /* process all pseudo packets the flow timeout may throw at us */
330  TmThreadTimeoutLoop(tv, s);
331 
334 
336 
337  for (slot = s; slot != NULL; slot = slot->slot_next) {
338  if (slot->SlotThreadExitPrintStats != NULL) {
339  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
340  }
341 
342  if (slot->SlotThreadDeinit != NULL) {
343  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
344  if (r != TM_ECODE_OK) {
346  goto error;
347  }
348  }
349  }
350 
351  tv->stream_pq = NULL;
352  SCLogDebug("%s ending", tv->name);
354  pthread_exit((void *) 0);
355  return NULL;
356 
357 error:
358  tv->stream_pq = NULL;
359  pthread_exit((void *) -1);
360  return NULL;
361 }
362 
363 static void *TmThreadsSlotVar(void *td)
364 {
365  ThreadVars *tv = (ThreadVars *)td;
366  TmSlot *s = (TmSlot *)tv->tm_slots;
367  Packet *p = NULL;
368  char run = 1;
369  TmEcode r = TM_ECODE_OK;
370 
371  PacketPoolInit();//Empty();
372 
373  /* Set the thread name */
374  if (SCSetThreadName(tv->name) < 0) {
375  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
376  }
377 
378  if (tv->thread_setup_flags != 0)
380 
381  /* Drop the capabilities for this thread */
382  SCDropCaps(tv);
383 
384  /* check if we are setup properly */
385  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
387  pthread_exit((void *) -1);
388  return NULL;
389  }
390 
391  for (; s != NULL; s = s->slot_next) {
392  if (s->SlotThreadInit != NULL) {
393  void *slot_data = NULL;
394  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
395  if (r != TM_ECODE_OK) {
397  goto error;
398  }
399  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
400  }
401 
402  /* special case: we need to access the stream queue
403  * from the flow timeout code */
404 
405  /* if the flowworker module is the first, get the threads input queue */
406  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
407  tv->stream_pq = tv->inq->pq;
408  tv->tm_flowworker = s;
409  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
411  if (tv->flow_queue == NULL) {
413  pthread_exit((void *) -1);
414  return NULL;
415  }
416  /* setup a queue */
417  } else if (s->tm_id == TMM_FLOWWORKER) {
418  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
419  if (tv->stream_pq_local == NULL)
420  FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
423  tv->tm_flowworker = s;
424  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
426  if (tv->flow_queue == NULL) {
428  pthread_exit((void *) -1);
429  return NULL;
430  }
431  }
432  }
433 
435 
437 
438  s = (TmSlot *)tv->tm_slots;
439 
440  while (run) {
445  }
446 
447  /* input a packet */
448  p = tv->tmqh_in(tv);
449 
450  if (p != NULL) {
451  /* run the thread module(s) */
452  r = TmThreadsSlotVarRun(tv, p, s);
453  if (r == TM_ECODE_FAILED) {
456  break;
457  }
458 
459  /* output the packet */
460  tv->tmqh_out(tv, p);
461 
462  /* now handle the stream pq packets */
463  TmThreadsHandleInjectedPackets(tv);
464  }
465 
467  run = 0;
468  }
469  } /* while (run) */
471 
474 
476 
477  s = (TmSlot *)tv->tm_slots;
478 
479  for ( ; s != NULL; s = s->slot_next) {
480  if (s->SlotThreadExitPrintStats != NULL) {
481  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
482  }
483 
484  if (s->SlotThreadDeinit != NULL) {
485  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
486  if (r != TM_ECODE_OK) {
488  goto error;
489  }
490  }
491  }
492 
493  SCLogDebug("%s ending", tv->name);
494  tv->stream_pq = NULL;
496  pthread_exit((void *) 0);
497  return NULL;
498 
499 error:
500  tv->stream_pq = NULL;
501  pthread_exit((void *) -1);
502  return NULL;
503 }
504 
505 static void *TmThreadsManagement(void *td)
506 {
507  ThreadVars *tv = (ThreadVars *)td;
508  TmSlot *s = (TmSlot *)tv->tm_slots;
509  TmEcode r = TM_ECODE_OK;
510 
511  BUG_ON(s == NULL);
512 
513  /* Set the thread name */
514  if (SCSetThreadName(tv->name) < 0) {
515  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
516  }
517 
518  if (tv->thread_setup_flags != 0)
520 
521  /* Drop the capabilities for this thread */
522  SCDropCaps(tv);
523 
524  SCLogDebug("%s starting", tv->name);
525 
526  if (s->SlotThreadInit != NULL) {
527  void *slot_data = NULL;
528  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
529  if (r != TM_ECODE_OK) {
531  pthread_exit((void *) -1);
532  return NULL;
533  }
534  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
535  }
536 
538 
540 
541  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
542  /* handle error */
543  if (r == TM_ECODE_FAILED) {
545  }
546 
549  }
550 
553 
554  if (s->SlotThreadExitPrintStats != NULL) {
555  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
556  }
557 
558  if (s->SlotThreadDeinit != NULL) {
559  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
560  if (r != TM_ECODE_OK) {
562  pthread_exit((void *) -1);
563  return NULL;
564  }
565  }
566 
568  pthread_exit((void *) 0);
569  return NULL;
570 }
571 
572 /**
573  * \brief We set the slot functions.
574  *
575  * \param tv Pointer to the TV to set the slot function for.
576  * \param name Name of the slot variant.
577  * \param fn_p Pointer to a custom slot function. Used only if slot variant
578  * "name" is "custom".
579  *
580  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
581  */
582 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
583 {
584  if (name == NULL) {
585  if (fn_p == NULL) {
586  printf("Both slot name and function pointer can't be NULL inside "
587  "TmThreadSetSlots\n");
588  goto error;
589  } else {
590  name = "custom";
591  }
592  }
593 
594  if (strcmp(name, "varslot") == 0) {
595  tv->tm_func = TmThreadsSlotVar;
596  } else if (strcmp(name, "pktacqloop") == 0) {
597  tv->tm_func = TmThreadsSlotPktAcqLoop;
598  } else if (strcmp(name, "management") == 0) {
599  tv->tm_func = TmThreadsManagement;
600  } else if (strcmp(name, "command") == 0) {
601  tv->tm_func = TmThreadsManagement;
602  } else if (strcmp(name, "custom") == 0) {
603  if (fn_p == NULL)
604  goto error;
605  tv->tm_func = fn_p;
606  } else {
607  printf("Error: Slot \"%s\" not supported\n", name);
608  goto error;
609  }
610 
611  return TM_ECODE_OK;
612 
613 error:
614  return TM_ECODE_FAILED;
615 }
616 
618 {
620  for (int i = 0; i < TVT_MAX; i++) {
621  ThreadVars *tv = tv_root[i];
622  while (tv) {
623  TmSlot *slots = tv->tm_slots;
624  while (slots != NULL) {
625  if (slots == tm_slot) {
627  return tv;
628  }
629  slots = slots->slot_next;
630  }
631  tv = tv->next;
632  }
633  }
635  return NULL;
636 }
637 
638 /**
639  * \brief Appends a new entry to the slots.
640  *
641  * \param tv TV the slot is attached to.
642  * \param tm TM to append.
643  * \param data Data to be passed on to the slot init function.
644  *
645  * \retval The allocated TmSlot or NULL if there is an error
646  */
647 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
648 {
649  TmSlot *slot = SCMalloc(sizeof(TmSlot));
650  if (unlikely(slot == NULL))
651  return;
652  memset(slot, 0, sizeof(TmSlot));
653  SC_ATOMIC_INITPTR(slot->slot_data);
654  slot->SlotThreadInit = tm->ThreadInit;
655  slot->slot_initdata = data;
656  if (tm->Func) {
657  slot->SlotFunc = tm->Func;
658  } else if (tm->PktAcqLoop) {
659  slot->PktAcqLoop = tm->PktAcqLoop;
660  } else if (tm->Management) {
661  slot->Management = tm->Management;
662  }
664  slot->SlotThreadDeinit = tm->ThreadDeinit;
665  /* we don't have to check for the return value "-1". We wouldn't have
666  * received a TM as arg, if it didn't exist */
667  slot->tm_id = TmModuleGetIDForTM(tm);
668 
669  tv->tmm_flags |= tm->flags;
670  tv->cap_flags |= tm->cap_flags;
671 
672  if (tv->tm_slots == NULL) {
673  tv->tm_slots = slot;
674  } else {
675  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
676 
677  /* get the last slot */
678  for ( ; a != NULL; a = a->slot_next) {
679  b = a;
680  }
681  /* append the new slot */
682  if (b != NULL) {
683  b->slot_next = slot;
684  }
685  }
686  return;
687 }
688 
689 /**
690  * \brief Returns the slot holding a TM with the particular tm_id.
691  *
692  * \param tm_id TM id of the TM whose slot has to be returned.
693  *
694  * \retval slots Pointer to the slot.
695  */
697 {
699  for (int i = 0; i < TVT_MAX; i++) {
700  ThreadVars *tv = tv_root[i];
701  while (tv) {
702  TmSlot *slots = tv->tm_slots;
703  while (slots != NULL) {
704  if (slots->tm_id == tm_id) {
706  return slots;
707  }
708  slots = slots->slot_next;
709  }
710  tv = tv->next;
711  }
712  }
714  return NULL;
715 }
716 
717 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
718 static int SetCPUAffinitySet(cpu_set_t *cs)
719 {
720 #if defined OS_FREEBSD
721  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
722  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
723 #elif OS_DARWIN
724  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
725  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
726 #else
727  pid_t tid = syscall(SYS_gettid);
728  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
729 #endif /* OS_FREEBSD */
730 
731  if (r != 0) {
732  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
733  strerror(errno));
734  return -1;
735  }
736 
737  return 0;
738 }
739 #endif
740 
741 
742 /**
743  * \brief Set the thread affinity on the calling thread.
744  *
745  * \param cpuid Id of the core/cpu to setup the affinity.
746  *
747  * \retval 0 If all goes well; -1 if something is wrong.
748  */
749 static int SetCPUAffinity(uint16_t cpuid)
750 {
751 #if defined __OpenBSD__ || defined sun
752  return 0;
753 #else
754  int cpu = (int)cpuid;
755 
756 #if defined OS_WIN32 || defined __CYGWIN__
757  DWORD cs = 1 << cpu;
758 
759  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
760  if (r != 0) {
761  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
762  strerror(errno));
763  return -1;
764  }
765  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
766  SCGetThreadIdLong(), cpu);
767 
768  return 0;
769 
770 #else
771  cpu_set_t cs;
772 
773  CPU_ZERO(&cs);
774  CPU_SET(cpu, &cs);
775  return SetCPUAffinitySet(&cs);
776 #endif /* windows */
777 #endif /* not supported */
778 }
779 
780 
781 /**
782  * \brief Set the thread options (thread priority).
783  *
784  * \param tv Pointer to the ThreadVars to setup the thread priority.
785  *
786  * \retval TM_ECODE_OK.
787  */
789 {
791  tv->thread_priority = prio;
792 
793  return TM_ECODE_OK;
794 }
795 
796 /**
797  * \brief Adjusting nice value for threads.
798  */
800 {
801  SCEnter();
802 #ifndef __CYGWIN__
803 #ifdef OS_WIN32
804  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
805  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for "
806  "thread %s: %s", tv->name, strerror(errno));
807  } else {
808  SCLogDebug("Priority set to %"PRId32" for thread %s",
810  }
811 #else
812  int ret = nice(tv->thread_priority);
813  if (ret == -1) {
814  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value %d "
815  "for thread %s: %s", tv->thread_priority, tv->name,
816  strerror(errno));
817  } else {
818  SCLogDebug("Nice value set to %"PRId32" for thread %s",
820  }
821 #endif /* OS_WIN32 */
822 #endif
823  SCReturn;
824 }
825 
826 
827 /**
828  * \brief Set the thread options (cpu affinity).
829  *
830  * \param tv pointer to the ThreadVars to setup the affinity.
831  * \param cpu cpu on which affinity is set.
832  *
833  * \retval TM_ECODE_OK
834  */
836 {
838  tv->cpu_affinity = cpu;
839 
840  return TM_ECODE_OK;
841 }
842 
843 
845 {
847  return TM_ECODE_OK;
848 
849  if (type > MAX_CPU_SET) {
850  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
851  return TM_ECODE_FAILED;
852  }
853 
855  tv->cpu_affinity = type;
856 
857  return TM_ECODE_OK;
858 }
859 
861 {
862  if (type >= MAX_CPU_SET) {
863  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
864  return 0;
865  }
866 
868 }
869 
870 /**
871  * \brief Set the thread options (cpu affinitythread).
872  * Priority should be already set by pthread_create.
873  *
874  * \param tv pointer to the ThreadVars of the calling thread.
875  */
877 {
879  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
880  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
882  SetCPUAffinity(tv->cpu_affinity);
883  }
884 
885 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
890  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
891  int cpu = AffinityGetNextCPU(taf);
892  SetCPUAffinity(cpu);
893  /* If CPU is in a set overwrite the default thread prio */
894  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
896  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
898  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
900  } else {
901  tv->thread_priority = taf->prio;
902  }
903  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
904  "%d, thread id %lu", tv->thread_priority,
905  tv->name, cpu, SCGetThreadIdLong());
906  } else {
907  SetCPUAffinitySet(&taf->cpu_set);
908  tv->thread_priority = taf->prio;
909  SCLogPerf("Setting prio %d for thread \"%s\", "
910  "thread id %lu", tv->thread_priority,
912  }
914  }
915 #endif
916 
917  return TM_ECODE_OK;
918 }
919 
920 /**
921  * \brief Creates and returns the TV instance for a new thread.
922  *
923  * \param name Name of this TV instance
924  * \param inq_name Incoming queue name
925  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
926  * \param outq_name Outgoing queue name
927  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
928  * \param slots String representation for the slot function to be used
929  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
930  * \param mucond Flag to indicate whether to initialize the condition
931  * and the mutex variables for this newly created TV.
932  *
933  * \retval the newly created TV instance, or NULL on error
934  */
935 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
936  const char *outq_name, const char *outqh_name, const char *slots,
937  void * (*fn_p)(void *), int mucond)
938 {
939  ThreadVars *tv = NULL;
940  Tmq *tmq = NULL;
941  Tmqh *tmqh = NULL;
942 
943  SCLogDebug("creating thread \"%s\"...", name);
944 
945  /* XXX create separate function for this: allocate a thread container */
946  tv = SCMalloc(sizeof(ThreadVars));
947  if (unlikely(tv == NULL))
948  goto error;
949  memset(tv, 0, sizeof(ThreadVars));
950 
951  SC_ATOMIC_INIT(tv->flags);
952  SCMutexInit(&tv->perf_public_ctx.m, NULL);
953 
954  strlcpy(tv->name, name, sizeof(tv->name));
955 
956  /* default state for every newly created thread */
959 
960  /* set the incoming queue */
961  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
962  SCLogDebug("inq_name \"%s\"", inq_name);
963 
964  tmq = TmqGetQueueByName(inq_name);
965  if (tmq == NULL) {
966  tmq = TmqCreateQueue(inq_name);
967  if (tmq == NULL)
968  goto error;
969  }
970  SCLogDebug("tmq %p", tmq);
971 
972  tv->inq = tmq;
973  tv->inq->reader_cnt++;
974  SCLogDebug("tv->inq %p", tv->inq);
975  }
976  if (inqh_name != NULL) {
977  SCLogDebug("inqh_name \"%s\"", inqh_name);
978 
979  int id = TmqhNameToID(inqh_name);
980  if (id <= 0) {
981  goto error;
982  }
983  tmqh = TmqhGetQueueHandlerByName(inqh_name);
984  if (tmqh == NULL)
985  goto error;
986 
987  tv->tmqh_in = tmqh->InHandler;
988  tv->inq_id = (uint8_t)id;
989  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
990  }
991 
992  /* set the outgoing queue */
993  if (outqh_name != NULL) {
994  SCLogDebug("outqh_name \"%s\"", outqh_name);
995 
996  int id = TmqhNameToID(outqh_name);
997  if (id <= 0) {
998  goto error;
999  }
1000 
1001  tmqh = TmqhGetQueueHandlerByName(outqh_name);
1002  if (tmqh == NULL)
1003  goto error;
1004 
1005  tv->tmqh_out = tmqh->OutHandler;
1006  tv->outq_id = (uint8_t)id;
1007 
1008  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1009  SCLogDebug("outq_name \"%s\"", outq_name);
1010 
1011  if (tmqh->OutHandlerCtxSetup != NULL) {
1012  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1013  if (tv->outctx == NULL)
1014  goto error;
1015  tv->outq = NULL;
1016  } else {
1017  tmq = TmqGetQueueByName(outq_name);
1018  if (tmq == NULL) {
1019  tmq = TmqCreateQueue(outq_name);
1020  if (tmq == NULL)
1021  goto error;
1022  }
1023  SCLogDebug("tmq %p", tmq);
1024 
1025  tv->outq = tmq;
1026  tv->outctx = NULL;
1027  tv->outq->writer_cnt++;
1028  }
1029  }
1030  }
1031 
1032  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1033  goto error;
1034  }
1035 
1036  if (mucond != 0)
1037  TmThreadInitMC(tv);
1038 
1039  return tv;
1040 
1041 error:
1042  SCLogError(SC_ERR_THREAD_CREATE, "failed to setup a thread");
1043 
1044  if (tv != NULL)
1045  SCFree(tv);
1046  return NULL;
1047 }
1048 
1049 /**
1050  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1051  * This function doesn't support custom slots, and hence shouldn't be
1052  * supplied \"custom\" as its slot type. All PPT threads are created
1053  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1054  * conditional variables are not used to kill the thread.
1055  *
1056  * \param name Name of this TV instance
1057  * \param inq_name Incoming queue name
1058  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1059  * \param outq_name Outgoing queue name
1060  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1061  * \param slots String representation for the slot function to be used
1062  *
1063  * \retval the newly created TV instance, or NULL on error
1064  */
1065 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1066  const char *inqh_name, const char *outq_name,
1067  const char *outqh_name, const char *slots)
1068 {
1069  ThreadVars *tv = NULL;
1070 
1071  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1072  slots, NULL, 0);
1073 
1074  if (tv != NULL) {
1075  tv->type = TVT_PPT;
1077  }
1078 
1079 
1080  return tv;
1081 }
1082 
1083 /**
1084  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1085  * This function supports only custom slot functions and hence a
1086  * function pointer should be sent as an argument.
1087  *
1088  * \param name Name of this TV instance
1089  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1090  * \param mucond Flag to indicate whether to initialize the condition
1091  * and the mutex variables for this newly created TV.
1092  *
1093  * \retval the newly created TV instance, or NULL on error
1094  */
1095 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1096  int mucond)
1097 {
1098  ThreadVars *tv = NULL;
1099 
1100  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1101 
1102  if (tv != NULL) {
1103  tv->type = TVT_MGMT;
1106  }
1107 
1108  return tv;
1109 }
1110 
1111 /**
1112  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1113  * This function supports only custom slot functions and hence a
1114  * function pointer should be sent as an argument.
1115  *
1116  * \param name Name of this TV instance
1117  * \param module Name of TmModule with MANAGEMENT flag set.
1118  * \param mucond Flag to indicate whether to initialize the condition
1119  * and the mutex variables for this newly created TV.
1120  *
1121  * \retval the newly created TV instance, or NULL on error
1122  */
1123 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1124  int mucond)
1125 {
1126  ThreadVars *tv = NULL;
1127 
1128  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1129 
1130  if (tv != NULL) {
1131  tv->type = TVT_MGMT;
1134 
1135  TmModule *m = TmModuleGetByName(module);
1136  if (m) {
1137  TmSlotSetFuncAppend(tv, m, NULL);
1138  }
1139  }
1140 
1141  return tv;
1142 }
1143 
1144 /**
1145  * \brief Creates and returns the TV instance for a Command thread (CMD).
1146  * This function supports only custom slot functions and hence a
1147  * function pointer should be sent as an argument.
1148  *
1149  * \param name Name of this TV instance
1150  * \param module Name of TmModule with COMMAND flag set.
1151  * \param mucond Flag to indicate whether to initialize the condition
1152  * and the mutex variables for this newly created TV.
1153  *
1154  * \retval the newly created TV instance, or NULL on error
1155  */
1156 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1157  int mucond)
1158 {
1159  ThreadVars *tv = NULL;
1160 
1161  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1162 
1163  if (tv != NULL) {
1164  tv->type = TVT_CMD;
1167 
1168  TmModule *m = TmModuleGetByName(module);
1169  if (m) {
1170  TmSlotSetFuncAppend(tv, m, NULL);
1171  }
1172  }
1173 
1174  return tv;
1175 }
1176 
1177 /**
1178  * \brief Appends this TV to tv_root based on its type
1179  *
1180  * \param type holds the type this TV belongs to.
1181  */
1183 {
1185 
1186  if (tv_root[type] == NULL) {
1187  tv_root[type] = tv;
1188  tv->next = NULL;
1189 
1191 
1192  return;
1193  }
1194 
1195  ThreadVars *t = tv_root[type];
1196 
1197  while (t) {
1198  if (t->next == NULL) {
1199  t->next = tv;
1200  tv->next = NULL;
1201  break;
1202  }
1203 
1204  t = t->next;
1205  }
1206 
1208 
1209  return;
1210 }
1211 
1212 static bool ThreadStillHasPackets(ThreadVars *tv)
1213 {
1214  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1215  /* we wait till we dry out all the inq packets, before we
1216  * kill this thread. Do note that you should have disabled
1217  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1218  PacketQueue *q = tv->inq->pq;
1219  SCMutexLock(&q->mutex_q);
1220  uint32_t len = q->len;
1221  SCMutexUnlock(&q->mutex_q);
1222  if (len != 0) {
1223  return true;
1224  }
1225  }
1226 
1227  if (tv->stream_pq != NULL) {
1229  uint32_t len = tv->stream_pq->len;
1231 
1232  if (len != 0) {
1233  return true;
1234  }
1235  }
1236  return false;
1237 }
1238 
1239 /**
1240  * \brief Kill a thread.
1241  *
1242  * \param tv A ThreadVars instance corresponding to the thread that has to be
1243  * killed.
1244  *
1245  * \retval r 1 killed succesfully
1246  * 0 not yet ready, needs another look
1247  */
1248 static int TmThreadKillThread(ThreadVars *tv)
1249 {
1250  BUG_ON(tv == NULL);
1251 
1252  /* kill only once :) */
1253  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1254  return 1;
1255  }
1256 
1257  /* set the thread flag informing the thread that it needs to be
1258  * terminated */
1261 
1262  /* to be sure, signal more */
1263  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1264  if (tv->inq_id != TMQH_NOT_SET) {
1266  if (qh != NULL && qh->InShutdownHandler != NULL) {
1267  qh->InShutdownHandler(tv);
1268  }
1269  }
1270  if (tv->inq != NULL) {
1271  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1272  SCCondSignal(&tv->inq->pq->cond_q);
1273  }
1274  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1275  }
1276 
1277  if (tv->ctrl_cond != NULL ) {
1278  pthread_cond_broadcast(tv->ctrl_cond);
1279  }
1280  return 0;
1281  }
1282 
1283  if (tv->outctx != NULL) {
1284  if (tv->outq_id != TMQH_NOT_SET) {
1286  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1287  qh->OutHandlerCtxFree(tv->outctx);
1288  tv->outctx = NULL;
1289  }
1290  }
1291  }
1292 
1293  /* join it and flag it as dead */
1294  pthread_join(tv->t, NULL);
1295  SCLogDebug("thread %s stopped", tv->name);
1297  return 1;
1298 }
1299 
1300 /** \internal
1301  *
1302  * \brief make sure that all packet threads are done processing their
1303  * in-flight packets, including 'injected' flow packets.
1304  */
1305 static void TmThreadDrainPacketThreads(void)
1306 {
1307  ThreadVars *tv = NULL;
1308  struct timeval start_ts;
1309  struct timeval cur_ts;
1310  gettimeofday(&start_ts, NULL);
1311 
1312 again:
1313  gettimeofday(&cur_ts, NULL);
1314  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1315  SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads "
1316  "to process their packets in time");
1317  return;
1318  }
1319 
1321 
1322  /* all receive threads are part of packet processing threads */
1323  tv = tv_root[TVT_PPT];
1324  while (tv) {
1325  if (ThreadStillHasPackets(tv)) {
1326  /* we wait till we dry out all the inq packets, before we
1327  * kill this thread. Do note that you should have disabled
1328  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1330 
1331  /* sleep outside lock */
1332  SleepMsec(1);
1333  goto again;
1334  }
1335  if (tv->flow_queue) {
1337  bool fq_done = (tv->flow_queue->qlen == 0);
1339  if (!fq_done) {
1341 
1342  Packet *p = PacketGetFromAlloc();
1343  if (p != NULL) {
1344  //SCLogNotice("flush packet created");
1347  PacketQueue *q = tv->stream_pq;
1348  SCMutexLock(&q->mutex_q);
1349  PacketEnqueue(q, p);
1350  SCCondSignal(&q->cond_q);
1351  SCMutexUnlock(&q->mutex_q);
1352  }
1353 
1354  /* don't sleep while holding a lock */
1355  SleepMsec(1);
1356  goto again;
1357  }
1358  }
1359  tv = tv->next;
1360  }
1361 
1363  return;
1364 }
1365 
1366 /**
1367  * \brief Disable all threads having the specified TMs.
1368  *
1369  * Breaks out of the packet acquisition loop, and bumps
1370  * into the 'flow loop', where it will process packets
1371  * from the flow engine's shutdown handling.
1372  */
1374 {
1375  ThreadVars *tv = NULL;
1376  struct timeval start_ts;
1377  struct timeval cur_ts;
1378  gettimeofday(&start_ts, NULL);
1379 
1380 again:
1381  gettimeofday(&cur_ts, NULL);
1382  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1383  FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1384  "thread - \"%s\". Killing engine", tv->name);
1385  }
1386 
1388 
1389  /* all receive threads are part of packet processing threads */
1390  tv = tv_root[TVT_PPT];
1391 
1392  /* we do have to keep in mind that TVs are arranged in the order
1393  * right from receive to log. The moment we fail to find a
1394  * receive TM amongst the slots in a tv, it indicates we are done
1395  * with all receive threads */
1396  while (tv) {
1397  int disable = 0;
1398  TmModule *tm = NULL;
1399  /* obtain the slots for this TV */
1400  TmSlot *slots = tv->tm_slots;
1401  while (slots != NULL) {
1402  tm = TmModuleGetById(slots->tm_id);
1403 
1404  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1405  disable = 1;
1406  break;
1407  }
1408 
1409  slots = slots->slot_next;
1410  continue;
1411  }
1412 
1413  if (disable) {
1414  if (ThreadStillHasPackets(tv)) {
1415  /* we wait till we dry out all the inq packets, before we
1416  * kill this thread. Do note that you should have disabled
1417  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1419  /* don't sleep while holding a lock */
1420  SleepMsec(1);
1421  goto again;
1422  }
1423 
1424  if (tv->flow_queue) {
1426  bool fq_done = (tv->flow_queue->qlen == 0);
1428  if (!fq_done) {
1430 
1431  Packet *p = PacketGetFromAlloc();
1432  if (p != NULL) {
1433  //SCLogNotice("flush packet created");
1436  PacketQueue *q = tv->stream_pq;
1437  SCMutexLock(&q->mutex_q);
1438  PacketEnqueue(q, p);
1439  SCCondSignal(&q->cond_q);
1440  SCMutexUnlock(&q->mutex_q);
1441  }
1442 
1443  /* don't sleep while holding a lock */
1444  SleepMsec(1);
1445  goto again;
1446  }
1447  }
1448 
1449  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1450  if (tm && tm->PktAcqBreakLoop != NULL) {
1451  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1452  }
1454 
1455  if (tv->inq != NULL) {
1456  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1457  SCCondSignal(&tv->inq->pq->cond_q);
1458  }
1459  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1460  }
1461 
1462  /* wait for it to enter the 'flow loop' stage */
1463  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1465 
1466  SleepMsec(1);
1467  goto again;
1468  }
1469  }
1470 
1471  tv = tv->next;
1472  }
1473 
1475 
1476  /* finally wait for all packet threads to have
1477  * processed all of their 'live' packets so we
1478  * don't process the last live packets together
1479  * with FFR packets */
1480  TmThreadDrainPacketThreads();
1481  return;
1482 }
1483 
1484 static void TmThreadDebugValidateNoMorePackets(void)
1485 {
1486 #ifdef DEBUG_VALIDATION
1488  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1489  if (ThreadStillHasPackets(tv)) {
1492  abort();
1493  }
1494  }
1496 #endif
1497 }
1498 
1499 /**
1500  * \brief Disable all packet threads
1501  */
1503 {
1504  struct timeval start_ts;
1505  struct timeval cur_ts;
1506 
1507  /* first drain all packet threads of their packets */
1508  TmThreadDrainPacketThreads();
1509 
1510  /* since all the threads possibly able to produce more packets
1511  * are now gone or inactive, we should see no packets anywhere
1512  * anymore. */
1513  TmThreadDebugValidateNoMorePackets();
1514 
1515  gettimeofday(&start_ts, NULL);
1516 again:
1517  gettimeofday(&cur_ts, NULL);
1518  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1519  FatalError(SC_ERR_FATAL, "Engine unable to disable packet "
1520  "threads. Killing engine");
1521  }
1522 
1523  /* loop through the packet threads and kill them */
1525  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1527 
1528  /* separate worker threads (autofp) will still wait at their
1529  * input queues. So nudge them here so they will observe the
1530  * THV_KILL flag. */
1531  if (tv->inq != NULL) {
1532  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1533  SCCondSignal(&tv->inq->pq->cond_q);
1534  }
1535  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1536  }
1537 
1540 
1541  SleepMsec(1);
1542  goto again;
1543  }
1544  }
1546  return;
1547 }
1548 
1550 {
1551  ThreadVars *tv = NULL;
1552  TmSlot *slots = NULL;
1553 
1555 
1556  /* all receive threads are part of packet processing threads */
1557  tv = tv_root[TVT_PPT];
1558 
1559  while (tv) {
1560  slots = tv->tm_slots;
1561 
1562  while (slots != NULL) {
1563  TmModule *tm = TmModuleGetById(slots->tm_id);
1564 
1565  char *found = strstr(tm->name, tm_name);
1566  if (found != NULL)
1567  goto end;
1568 
1569  slots = slots->slot_next;
1570  }
1571 
1572  tv = tv->next;
1573  }
1574 
1575  end:
1577  return slots;
1578 }
1579 
1580 #define MIN_WAIT_TIME 100
1581 #define MAX_WAIT_TIME 999999
1583 {
1584  ThreadVars *tv = NULL;
1585  unsigned int sleep_usec = MIN_WAIT_TIME;
1586 
1587  BUG_ON((family < 0) || (family >= TVT_MAX));
1588 
1589 again:
1591  tv = tv_root[family];
1592 
1593  while (tv) {
1594  int r = TmThreadKillThread(tv);
1595  if (r == 0) {
1597  SleepUsec(sleep_usec);
1598  sleep_usec *= 2; /* slowly back off */
1599  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1600  goto again;
1601  }
1602  sleep_usec = MIN_WAIT_TIME; /* reset */
1603 
1604  tv = tv->next;
1605  }
1607 }
1608 #undef MIN_WAIT_TIME
1609 #undef MAX_WAIT_TIME
1610 
1612 {
1613  int i = 0;
1614 
1615  for (i = 0; i < TVT_MAX; i++) {
1617  }
1618 
1619  return;
1620 }
1621 
1622 static void TmThreadFree(ThreadVars *tv)
1623 {
1624  TmSlot *s;
1625  TmSlot *ps;
1626  if (tv == NULL)
1627  return;
1628 
1629  SCLogDebug("Freeing thread '%s'.", tv->name);
1630 
1631  if (tv->flow_queue) {
1632  BUG_ON(tv->flow_queue->qlen != 0);
1633  SCFree(tv->flow_queue);
1634  }
1635 
1637 
1638  TmThreadDeinitMC(tv);
1639 
1640  if (tv->thread_group_name) {
1642  }
1643 
1644  if (tv->printable_name) {
1646  }
1647 
1648  if (tv->stream_pq_local) {
1652  }
1653 
1654  s = (TmSlot *)tv->tm_slots;
1655  while (s) {
1656  ps = s;
1657  s = s->slot_next;
1658  SCFree(ps);
1659  }
1660 
1662  SCFree(tv);
1663 }
1664 
1665 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1666 {
1667  char *thread_group_name = NULL;
1668 
1669  if (name == NULL)
1670  return;
1671 
1672  if (tv == NULL)
1673  return;
1674 
1675  thread_group_name = SCStrdup(name);
1676  if (unlikely(thread_group_name == NULL)) {
1677  SCLogError(SC_ERR_RUNMODE, "error allocating memory");
1678  return;
1679  }
1680  tv->thread_group_name = thread_group_name;
1681 }
1682 
1684 {
1685  ThreadVars *tv = NULL;
1686  ThreadVars *ptv = NULL;
1687 
1688  if ((family < 0) || (family >= TVT_MAX))
1689  return;
1690 
1692  tv = tv_root[family];
1693 
1694  while (tv) {
1695  ptv = tv;
1696  tv = tv->next;
1697  TmThreadFree(ptv);
1698  }
1699  tv_root[family] = NULL;
1701 }
1702 
1703 /**
1704  * \brief Spawns a thread associated with the ThreadVars instance tv
1705  *
1706  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1707  */
1709 {
1710  pthread_attr_t attr;
1711  if (tv->tm_func == NULL) {
1712  printf("ERROR: no thread function set\n");
1713  return TM_ECODE_FAILED;
1714  }
1715 
1716  /* Initialize and set thread detached attribute */
1717  pthread_attr_init(&attr);
1718 
1719  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1720 
1721  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1722  if (rc) {
1723  printf("ERROR; return code from pthread_create() is %" PRId32 "\n", rc);
1724  return TM_ECODE_FAILED;
1725  }
1726 
1728 
1729  TmThreadAppend(tv, tv->type);
1730  return TM_ECODE_OK;
1731 }
1732 
1733 /**
1734  * \brief Initializes the mutex and condition variables for this TV
1735  *
1736  * It can be used by a thread to control a wait loop that can also be
1737  * influenced by other threads.
1738  *
1739  * \param tv Pointer to a TV instance
1740  */
1742 {
1743  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1745  "Fatal error encountered in TmThreadInitMC. "
1746  "Exiting...");
1747  }
1748 
1749  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1750  printf("Error initializing the tv->m mutex\n");
1751  exit(EXIT_FAILURE);
1752  }
1753 
1754  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1756  "Fatal error encountered in TmThreadInitMC. "
1757  "Exiting...");
1758  }
1759 
1760  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1761  FatalError(SC_ERR_FATAL, "Error initializing the tv->cond condition "
1762  "variable");
1763  }
1764 
1765  return;
1766 }
1767 
1768 static void TmThreadDeinitMC(ThreadVars *tv)
1769 {
1770  if (tv->ctrl_mutex) {
1772  SCFree(tv->ctrl_mutex);
1773  }
1774  if (tv->ctrl_cond) {
1776  SCFree(tv->ctrl_cond);
1777  }
1778  return;
1779 }
1780 
1781 /**
1782  * \brief Tests if the thread represented in the arg has been unpaused or not.
1783  *
1784  * The function would return if the thread tv has been unpaused or if the
1785  * kill flag for the thread has been set.
1786  *
1787  * \param tv Pointer to the TV instance.
1788  */
1790 {
1791  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1792  SleepUsec(100);
1793 
1795  break;
1796  }
1797 
1798  return;
1799 }
1800 
1801 /**
1802  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1803  * the kill flag has been set or not on the thread.
1804  *
1805  * \param tv Pointer to the TV instance.
1806  */
1808 {
1809  while (!TmThreadsCheckFlag(tv, flags)) {
1810  SleepUsec(100);
1811  }
1812 
1813  return;
1814 }
1815 
1816 /**
1817  * \brief Unpauses a thread
1818  *
1819  * \param tv Pointer to a TV instance that has to be unpaused
1820  */
1822 {
1824 
1825  return;
1826 }
1827 
1828 /**
1829  * \brief Unpauses all threads present in tv_root
1830  */
1832 {
1834  for (int i = 0; i < TVT_MAX; i++) {
1835  ThreadVars *tv = tv_root[i];
1836  while (tv != NULL) {
1838  tv = tv->next;
1839  }
1840  }
1842  return;
1843 }
1844 
1845 /**
1846  * \brief Pauses a thread
1847  *
1848  * \param tv Pointer to a TV instance that has to be paused
1849  */
1851 {
1853  return;
1854 }
1855 
1856 /**
1857  * \brief Pauses all threads present in tv_root
1858  */
1860 {
1862 
1864  for (int i = 0; i < TVT_MAX; i++) {
1865  ThreadVars *tv = tv_root[i];
1866  while (tv != NULL) {
1867  TmThreadPause(tv);
1868  tv = tv->next;
1869  }
1870  }
1872 }
1873 
1874 /**
1875  * \brief Used to check the thread for certain conditions of failure.
1876  */
1878 {
1880  for (int i = 0; i < TVT_MAX; i++) {
1881  ThreadVars *tv = tv_root[i];
1882  while (tv) {
1884  FatalError(SC_ERR_FATAL, "thread %s failed", tv->name);
1885  }
1886  tv = tv->next;
1887  }
1888  }
1890  return;
1891 }
1892 
1893 /**
1894  * \brief Used to check if all threads have finished their initialization. On
1895  * finding an un-initialized thread, it waits till that thread completes
1896  * its initialization, before proceeding to the next thread.
1897  *
1898  * \retval TM_ECODE_OK all initialized properly
1899  * \retval TM_ECODE_FAILED failure
1900  */
1902 {
1903  uint16_t mgt_num = 0;
1904  uint16_t ppt_num = 0;
1905 
1906  struct timeval start_ts;
1907  struct timeval cur_ts;
1908  gettimeofday(&start_ts, NULL);
1909 
1910 again:
1912  for (int i = 0; i < TVT_MAX; i++) {
1913  ThreadVars *tv = tv_root[i];
1914  while (tv != NULL) {
1917 
1918  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1919  "initialize: flags %04x", tv->name,
1920  SC_ATOMIC_GET(tv->flags));
1921  return TM_ECODE_FAILED;
1922  }
1923 
1924  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
1926 
1927  gettimeofday(&cur_ts, NULL);
1928  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
1929  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1930  "initialize in time: flags %04x", tv->name,
1931  SC_ATOMIC_GET(tv->flags));
1932  return TM_ECODE_FAILED;
1933  }
1934 
1935  /* sleep a little to give the thread some
1936  * time to finish initialization */
1937  SleepUsec(100);
1938  goto again;
1939  }
1940 
1943  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1944  "initialize.", tv->name);
1945  return TM_ECODE_FAILED;
1946  }
1949  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" closed on "
1950  "initialization.", tv->name);
1951  return TM_ECODE_FAILED;
1952  }
1953 
1954  if (i == TVT_MGMT)
1955  mgt_num++;
1956  else if (i == TVT_PPT)
1957  ppt_num++;
1958 
1959  tv = tv->next;
1960  }
1961  }
1963 
1964  SCLogNotice("all %"PRIu16" packet processing threads, %"PRIu16" management "
1965  "threads initialized, engine started.", ppt_num, mgt_num);
1966 
1967  return TM_ECODE_OK;
1968 }
1969 
1970 /**
1971  * \brief Returns the TV for the calling thread.
1972  *
1973  * \retval tv Pointer to the ThreadVars instance for the calling thread;
1974  * NULL on no match
1975  */
1977 {
1978  pthread_t self = pthread_self();
1979 
1981  for (int i = 0; i < TVT_MAX; i++) {
1982  ThreadVars *tv = tv_root[i];
1983  while (tv) {
1984  if (pthread_equal(self, tv->t)) {
1986  return tv;
1987  }
1988  tv = tv->next;
1989  }
1990  }
1992  return NULL;
1993 }
1994 
1995 /**
1996  * \brief returns a count of all the threads that match the flag
1997  */
1999 {
2000  uint32_t cnt = 0;
2002  for (int i = 0; i < TVT_MAX; i++) {
2003  ThreadVars *tv = tv_root[i];
2004  while (tv != NULL) {
2005  if ((tv->tmm_flags & flags) == flags)
2006  cnt++;
2007 
2008  tv = tv->next;
2009  }
2010  }
2012  return cnt;
2013 }
2014 
2015 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2016 {
2017  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2019  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2020  tv, s, s->tm_id, m->name);
2021  }
2022 }
2023 
2025 {
2027  for (int i = 0; i < TVT_MAX; i++) {
2028  ThreadVars *tv = tv_root[i];
2029  while (tv != NULL) {
2030  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2031  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2032  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2033  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2034  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2035  } else if (tv->stream_pq_local != NULL) {
2036  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2037  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2038  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2039  }
2040  }
2041  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2042  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2043  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2044  }
2045  TmThreadDoDumpSlots(tv);
2046  tv = tv->next;
2047  }
2048  }
2051 }
2052 
2053 typedef struct Thread_ {
2054  ThreadVars *tv; /**< threadvars structure */
2055  const char *name;
2056  int type;
2057  int in_use; /**< bool to indicate this is in use */
2058 
2059  struct timeval pktts; /**< current packet time of this thread
2060  * (offline mode) */
2061  uint32_t sys_sec_stamp; /**< timestamp in seconds of the real system
2062  * time when the pktts was last updated. */
2064 
2065 typedef struct Threads_ {
2070 
2071 static Threads thread_store = { NULL, 0, 0 };
2072 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2073 
2075 {
2076  SCMutexLock(&thread_store_lock);
2077  for (size_t s = 0; s < thread_store.threads_size; s++) {
2078  Thread *t = &thread_store.threads[s];
2079  if (t == NULL || t->in_use == 0)
2080  continue;
2081 
2082  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2083  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2084  if (t->tv) {
2085  ThreadVars *tv = t->tv;
2086  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2087  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2088  tv, tv->type, tv->name, tv->tmm_flags, flags);
2089  }
2090  }
2091  SCMutexUnlock(&thread_store_lock);
2092 }
2093 
2094 #define STEP 32
2095 /**
2096  * \retval id thread id, or 0 if not found
2097  */
2099 {
2100  SCMutexLock(&thread_store_lock);
2101  if (thread_store.threads == NULL) {
2102  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2103  BUG_ON(thread_store.threads == NULL);
2104  thread_store.threads_size = STEP;
2105  }
2106 
2107  size_t s;
2108  for (s = 0; s < thread_store.threads_size; s++) {
2109  if (thread_store.threads[s].in_use == 0) {
2110  Thread *t = &thread_store.threads[s];
2111  t->name = tv->name;
2112  t->type = type;
2113  t->tv = tv;
2114  t->in_use = 1;
2115 
2116  SCMutexUnlock(&thread_store_lock);
2117  return (int)(s+1);
2118  }
2119  }
2120 
2121  /* if we get here the array is completely filled */
2122  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2123  BUG_ON(newmem == NULL);
2124  thread_store.threads = newmem;
2125  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2126 
2127  Thread *t = &thread_store.threads[thread_store.threads_size];
2128  t->name = tv->name;
2129  t->type = type;
2130  t->tv = tv;
2131  t->in_use = 1;
2132 
2133  s = thread_store.threads_size;
2134  thread_store.threads_size += STEP;
2135 
2136  SCMutexUnlock(&thread_store_lock);
2137  return (int)(s+1);
2138 }
2139 #undef STEP
2140 
2141 void TmThreadsUnregisterThread(const int id)
2142 {
2143  SCMutexLock(&thread_store_lock);
2144  if (id <= 0 || id > (int)thread_store.threads_size) {
2145  SCMutexUnlock(&thread_store_lock);
2146  return;
2147  }
2148 
2149  /* id is one higher than index */
2150  int idx = id - 1;
2151 
2152  /* reset thread_id, which serves as clearing the record */
2153  thread_store.threads[idx].in_use = 0;
2154 
2155  /* check if we have at least one registered thread left */
2156  size_t s;
2157  for (s = 0; s < thread_store.threads_size; s++) {
2158  Thread *t = &thread_store.threads[s];
2159  if (t->in_use == 1) {
2160  goto end;
2161  }
2162  }
2163 
2164  /* if we get here no threads are registered */
2165  SCFree(thread_store.threads);
2166  thread_store.threads = NULL;
2167  thread_store.threads_size = 0;
2168  thread_store.threads_cnt = 0;
2169 
2170 end:
2171  SCMutexUnlock(&thread_store_lock);
2172 }
2173 
2174 void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
2175 {
2176  SCMutexLock(&thread_store_lock);
2177  if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2178  SCMutexUnlock(&thread_store_lock);
2179  return;
2180  }
2181 
2182  int idx = id - 1;
2183  Thread *t = &thread_store.threads[idx];
2184  t->pktts = *ts;
2185  struct timeval systs;
2186  gettimeofday(&systs, NULL);
2187  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2188  SCMutexUnlock(&thread_store_lock);
2189 }
2190 
2192 {
2193  bool ready = true;
2194  SCMutexLock(&thread_store_lock);
2195  for (size_t s = 0; s < thread_store.threads_size; s++) {
2196  Thread *t = &thread_store.threads[s];
2197  if (!t->in_use)
2198  break;
2199  if (t->sys_sec_stamp == 0) {
2200  ready = false;
2201  break;
2202  }
2203  }
2204  SCMutexUnlock(&thread_store_lock);
2205  return ready;
2206 }
2207 
2208 void TmThreadsInitThreadsTimestamp(const struct timeval *ts)
2209 {
2210  struct timeval systs;
2211  gettimeofday(&systs, NULL);
2212  SCMutexLock(&thread_store_lock);
2213  for (size_t s = 0; s < thread_store.threads_size; s++) {
2214  Thread *t = &thread_store.threads[s];
2215  if (!t->in_use)
2216  break;
2217  t->pktts = *ts;
2218  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2219  }
2220  SCMutexUnlock(&thread_store_lock);
2221 }
2222 
2223 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2224 {
2225  struct timeval local, nullts;
2226  memset(&local, 0, sizeof(local));
2227  memset(&nullts, 0, sizeof(nullts));
2228  int set = 0;
2229  size_t s;
2230  struct timeval systs;
2231  gettimeofday(&systs, NULL);
2232 
2233  SCMutexLock(&thread_store_lock);
2234  for (s = 0; s < thread_store.threads_size; s++) {
2235  Thread *t = &thread_store.threads[s];
2236  if (t->in_use == 0)
2237  break;
2238  if (!(timercmp(&t->pktts, &nullts, ==))) {
2239  /* ignore sleeping threads */
2240  if (t->sys_sec_stamp + 1 < (uint32_t)systs.tv_sec)
2241  continue;
2242 
2243  if (!set) {
2244  local = t->pktts;
2245  set = 1;
2246  } else {
2247  if (timercmp(&t->pktts, &local, <)) {
2248  local = t->pktts;
2249  }
2250  }
2251  }
2252  }
2253  SCMutexUnlock(&thread_store_lock);
2254  *ts = local;
2255  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2256 }
2257 
2259 {
2260  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2261  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2262  /* always create at least one thread */
2263  if (thread_max == 0)
2264  thread_max = ncpus * threading_detect_ratio;
2265  if (thread_max < 1)
2266  thread_max = 1;
2267  if (thread_max > 1024) {
2268  SCLogWarning(SC_ERR_RUNMODE, "limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2269  thread_max = 1024;
2270  }
2271  return thread_max;
2272 }
2273 
2274 /**
2275  * \retval r 1 if packet was accepted, 0 otherwise
2276  * \note if packet was not accepted, it's still the responsibility
2277  * of the caller.
2278  */
2279 int TmThreadsInjectPacketsById(Packet **packets, const int id)
2280 {
2281  if (id <= 0 || id > (int)thread_store.threads_size)
2282  return 0;
2283 
2284  int idx = id - 1;
2285 
2286  Thread *t = &thread_store.threads[idx];
2287  ThreadVars *tv = t->tv;
2288 
2289  if (tv == NULL || tv->stream_pq == NULL)
2290  return 0;
2291 
2293  while (*packets != NULL) {
2294  PacketEnqueue(tv->stream_pq, *packets);
2295  packets++;
2296  }
2298 
2299  /* wake up listening thread(s) if necessary */
2300  if (tv->inq != NULL) {
2301  SCCondSignal(&tv->inq->pq->cond_q);
2302  }
2303  return 1;
2304 }
2305 
2306 /** \brief inject a flow into a threads flow queue
2307  */
2308 void TmThreadsInjectFlowById(Flow *f, const int id)
2309 {
2310  BUG_ON(id <= 0 || id > (int)thread_store.threads_size);
2311 
2312  int idx = id - 1;
2313 
2314  Thread *t = &thread_store.threads[idx];
2315  ThreadVars *tv = t->tv;
2316 
2317  BUG_ON(tv == NULL || tv->flow_queue == NULL);
2318 
2319  FlowEnqueue(tv->flow_queue, f);
2320 
2321  /* wake up listening thread(s) if necessary */
2322  if (tv->inq != NULL) {
2323  SCCondSignal(&tv->inq->pq->cond_q);
2324  }
2325 }
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:835
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:67
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:75
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:72
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:133
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2191
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1741
tm-threads.h
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:68
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:54
THV_USE
#define THV_USE
Definition: threadvars.h:37
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:113
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1708
Threads
struct Threads_ Threads
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1123
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:876
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:315
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1065
Thread_::pktts
struct timeval pktts
Definition: tm-threads.c:2059
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:387
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:54
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:379
TmThreadSetGroupName
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1665
TmThreadsGetCallingThread
ThreadVars * TmThreadsGetCallingThread(void)
Returns the TV for the calling thread.
Definition: tm-threads.c:1976
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:105
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:49
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:298
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:97
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:87
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1807
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:173
TmThreadsGetTVContainingSlot
ThreadVars * TmThreadsGetTVContainingSlot(TmSlot *tm_slot)
Definition: tm-threads.c:617
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:47
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:55
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:46
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:82
Packet_::flags
uint32_t flags
Definition: decode.h:449
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:74
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:347
ThreadVars_::t
pthread_t t
Definition: threadvars.h:59
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:303
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
AffinityGetNextCPU
int AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
Definition: util-affinity.c:297
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:60
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:1998
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:104
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const struct timeval *ts)
Definition: tm-threads.c:2208
StatsSetupPrivate
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1194
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:52
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:115
MIN
#define MIN(x, y)
Definition: suricata-common.h:377
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:78
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:74
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1373
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:387
SC_ERR_THREAD_CREATE
@ SC_ERR_THREAD_CREATE
Definition: util-error.h:78
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:121
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:788
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:37
m
SCMutex m
Definition: flow-hash.h:6
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:43
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:139
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:375
SC_ERR_SHUTDOWN
@ SC_ERR_SHUTDOWN
Definition: util-error.h:220
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:81
rww_lock_contention
thread_local uint64_t rww_lock_contention
SC_ERR_RUNMODE
@ SC_ERR_RUNMODE
Definition: util-error.h:219
Threads_::threads
Thread * threads
Definition: tm-threads.c:2066
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:174
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2094
Thread_::in_use
int in_use
Definition: tm-threads.c:2057
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:39
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(void)
Disable all packet threads.
Definition: tm-threads.c:1502
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:88
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:302
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:80
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2308
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:81
TmThreadsInjectPacketsById
int TmThreadsInjectPacketsById(Packet **packets, const int id)
Definition: tm-threads.c:2279
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2223
FQLOCK_LOCK
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:73
TmThreadContinueThreads
void TmThreadContinueThreads()
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1831
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:54
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:53
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:47
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1150
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:105
util-signal.h
SC_ERR_THREAD_INIT
@ SC_ERR_THREAD_INIT
Definition: util-error.h:79
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:79
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:799
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1580
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2054
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1821
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:92
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:56
type
uint8_t type
Definition: decode-icmpv4.h:0
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:1901
Threads_::threads_cnt
int threads_cnt
Definition: tm-threads.c:2068
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:76
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:96
TmSlotGetSlotForTM
TmSlot * TmSlotGetSlotForTM(int tm_id)
Returns the slot holding a TM with the particular tm_id.
Definition: tm-threads.c:696
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:206
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:57
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
TmThreadPauseThreads
void TmThreadPauseThreads()
Pauses all threads present in tv_root.
Definition: tm-threads.c:1859
ThreadVars_::perf_public_ctx
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:126
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1110
TmThreadDumpThreads
void TmThreadDumpThreads(void)
Definition: tm-threads.c:2024
SCEnter
#define SCEnter(...)
Definition: util-debug.h:300
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1789
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1683
THV_KILL
#define THV_KILL
Definition: threadvars.h:41
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:935
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:691
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2141
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2098
threading_set_cpu_affinity
int threading_set_cpu_affinity
Definition: runmodes.c:61
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
SC_ERR_INVALID_ARGUMENT
@ SC_ERR_INVALID_ARGUMENT
Definition: util-error.h:43
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:123
ThreadVars_::id
int id
Definition: threadvars.h:87
ThreadVars_::type
uint8_t type
Definition: threadvars.h:72
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:282
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:109
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:81
SCReturn
#define SCReturn
Definition: util-debug.h:302
stream.h
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1611
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:158
Packet_
Definition: decode.h:414
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:131
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:90
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:55
ThreadVars_::thread_group_name
char * thread_group_name
Definition: threadvars.h:67
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:90
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1581
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:73
TmSlot_
Definition: tm-threads.h:52
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:66
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:69
TmEcode
TmEcode
Definition: tm-threads-common.h:79
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:89
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
TmModule_::name
const char * name
Definition: tm-modules.h:44
runmodes.h
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:55
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1095
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:31
TmThreadPause
void TmThreadPause(ThreadVars *tv)
Pauses a thread.
Definition: tm-threads.c:1850
TmModule_
Definition: tm-modules.h:43
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:257
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:114
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:40
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:38
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:647
StatsPublicThreadContext_::m
SCMutex m
Definition: counters.h:72
TmThreadGetFirstTmSlotForPartialPattern
TmSlot * TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name)
Definition: tm-threads.c:1549
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1582
Thread_::sys_sec_stamp
uint32_t sys_sec_stamp
Definition: tm-threads.c:2061
Thread_::type
int type
Definition: tm-threads.c:2056
Thread
struct Thread_ Thread
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:90
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1182
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:75
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:44
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:55
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:844
Threads_
Definition: tm-threads.c:2065
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:33
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:35
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:106
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:42
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:101
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:860
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:224
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:50
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:65
SCLogError
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:257
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
Definition: tm-threads.c:2174
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:70
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
FatalError
#define FatalError(x,...)
Definition: util-debug.h:532
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:66
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1156
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:29
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:140
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:212
threadvars.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:144
ThreadsAffinityType_
Definition: util-affinity.h:64
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:35
THV_KILL_PKTACQ
#define THV_KILL_PKTACQ
Definition: threadvars.h:48
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:579
Thread_::name
const char * name
Definition: tm-threads.c:2055
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:150
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:48
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:91
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCLogWarning
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:244
PKT_SRC_DETECT_RELOAD_FLUSH
@ PKT_SRC_DETECT_RELOAD_FLUSH
Definition: decode.h:60
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:67
SC_ERR_FATAL
@ SC_ERR_FATAL
Definition: util-error.h:203
SC_ERR_THREAD_NICE_PRIO
@ SC_ERR_THREAD_NICE_PRIO
Definition: util-error.h:77
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:66
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:318
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:84
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:110
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:90
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:91
SC_ERR_MEM_ALLOC
@ SC_ERR_MEM_ALLOC
Definition: util-error.h:31
FlowQueueNew
FlowQueue * FlowQueueNew()
Definition: flow-queue.c:36
suricata.h
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:402
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:138
Tmq_
Definition: tm-queues.h:29
StatsSyncCounters
#define StatsSyncCounters(tv)
Definition: counters.h:134
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2074
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:61
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1877
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:130
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:83
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:67
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:376
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:106
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax()
Definition: tm-threads.c:2258
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:335
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:89
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:232
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:43
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2053
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
Threads_::threads_size
size_t threads_size
Definition: tm-threads.c:2067
StatsThreadCleanup
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1293
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:63
SCMutex
#define SCMutex
Definition: threads-debug.h:114
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:70
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:916
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:360
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:198
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:351
FQLOCK_UNLOCK
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:75