suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2024 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "thread-callbacks.h"
33 #include "threadvars.h"
34 #include "thread-storage.h"
35 #include "tm-queues.h"
36 #include "tm-queuehandlers.h"
37 #include "tm-threads.h"
38 #include "capture-hooks.h"
39 #include "tmqh-packetpool.h"
40 #include "threads.h"
41 #include "util-affinity.h"
42 #include "util-debug.h"
43 #include "util-privs.h"
44 #include "util-cpu.h"
45 #include "util-optimize.h"
46 #include "util-profiling.h"
47 #include "util-signal.h"
48 #include "queue.h"
49 #include "util-validate.h"
51 
52 #ifdef PROFILE_LOCKING
53 thread_local uint64_t mutex_lock_contention;
54 thread_local uint64_t mutex_lock_wait_ticks;
55 thread_local uint64_t mutex_lock_cnt;
56 
57 thread_local uint64_t spin_lock_contention;
58 thread_local uint64_t spin_lock_wait_ticks;
59 thread_local uint64_t spin_lock_cnt;
60 
61 thread_local uint64_t rww_lock_contention;
62 thread_local uint64_t rww_lock_wait_ticks;
63 thread_local uint64_t rww_lock_cnt;
64 
65 thread_local uint64_t rwr_lock_contention;
66 thread_local uint64_t rwr_lock_wait_ticks;
67 thread_local uint64_t rwr_lock_cnt;
68 #endif
69 
70 #ifdef OS_FREEBSD
71 #include <sched.h>
72 #include <sys/param.h>
73 #include <sys/resource.h>
74 #include <sys/cpuset.h>
75 #include <sys/thr.h>
76 #define cpu_set_t cpuset_t
77 #endif /* OS_FREEBSD */
78 
79 /* prototypes */
80 static int SetCPUAffinity(uint16_t cpu);
81 static void TmThreadDeinitMC(ThreadVars *tv);
82 
83 /* root of the threadvars list */
84 ThreadVars *tv_root[TVT_MAX] = { NULL };
85 
86 /* lock to protect tv_root */
88 
89 /**
90  * \brief Check if a thread flag is set.
91  *
92  * \retval 1 flag is set.
93  * \retval 0 flag is not set.
94  */
95 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
96 {
97  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
98 }
99 
100 /**
101  * \brief Set a thread flag.
102  */
103 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
104 {
105  SC_ATOMIC_OR(tv->flags, flag);
106 }
107 
108 /**
109  * \brief Unset a thread flag.
110  */
111 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
112 {
113  SC_ATOMIC_AND(tv->flags, ~flag);
114 }
115 
117  ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
118 {
119  while (decode_pq->top != NULL) {
120  Packet *extra_p = PacketDequeueNoLock(decode_pq);
121  if (unlikely(extra_p == NULL))
122  continue;
123  DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
124 
125  if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
127  }
128  }
130 }
131 
132 /**
133  * \brief Separate run function so we can call it recursively.
134  */
136 {
137  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
138  PACKET_PROFILING_TMM_START(p, s->tm_id);
139  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
140  PACKET_PROFILING_TMM_END(p, s->tm_id);
141  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
142 
143  /* handle error */
144  if (unlikely(r == TM_ECODE_FAILED)) {
145  /* Encountered error. Return packets to packetpool and return */
146  TmThreadsSlotProcessPktFail(tv, NULL);
147  return TM_ECODE_FAILED;
148  }
149  if (s->tm_flags & TM_FLAG_DECODE_TM) {
150  if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
151  TM_ECODE_OK) {
152  return TM_ECODE_FAILED;
153  }
154  }
155  }
156 
157  return TM_ECODE_OK;
158 }
159 
160 /** \internal
161  *
162  * \brief Process flow timeout packets
163  *
164  * Process flow timeout pseudo packets. During shutdown this loop
165  * is run until the flow engine kills the thread and the queue is
166  * empty.
167  */
169 {
170  TmSlot *fw_slot = tv->tm_flowworker;
171  int r = TM_ECODE_OK;
172 
173  if (tv->stream_pq == NULL || fw_slot == NULL) {
174  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
175  return r;
176  }
177 
178  SCLogDebug("flow end loop starting");
179  while (1) {
181  uint32_t len = tv->stream_pq->len;
183  if (len > 0) {
184  while (len--) {
188  if (likely(p)) {
189  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
190  r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
191  if (r == TM_ECODE_FAILED) {
192  break;
193  }
194  }
195  }
196  } else {
198  break;
199  }
200  SleepUsec(1);
201  }
202  }
203  SCLogDebug("flow end loop complete");
205 
206  return r;
207 }
208 
209 static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv)
210 {
211  TmSlot *s = tv->tm_slots;
212 
214 
215  if (tv->thread_setup_flags != 0)
217 
219  PacketPoolInit();
220 
221  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
222  if (slot->SlotThreadInit != NULL) {
223  void *slot_data = NULL;
224  TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
225  if (r != TM_ECODE_OK) {
226  if (r == TM_ECODE_DONE) {
227  EngineDone();
229  goto error;
230  } else {
232  goto error;
233  }
234  }
235  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
236  }
237 
238  /* if the flowworker module is the first, get the threads input queue */
239  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
240  tv->stream_pq = tv->inq->pq;
241  tv->tm_flowworker = slot;
242  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
244  if (tv->flow_queue == NULL) {
246  goto error;
247  }
248  /* setup a queue */
249  } else if (slot->tm_id == TMM_FLOWWORKER) {
250  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
251  if (tv->stream_pq_local == NULL)
252  FatalError("failed to alloc PacketQueue");
255  tv->tm_flowworker = slot;
256  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
258  if (tv->flow_queue == NULL) {
260  goto error;
261  }
262  }
263  }
264 
266 
268 
269  return true;
270 
271 error:
272  return false;
273 }
274 
276 {
277  TmSlot *s = tv->tm_slots;
278  bool rc = true;
279 
281 
283 
284  /* process all pseudo packets the flow timeout may throw at us */
286 
289 
291 
292  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
293  if (slot->SlotThreadExitPrintStats != NULL) {
294  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
295  }
296 
297  if (slot->SlotThreadDeinit != NULL) {
298  TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
299  if (r != TM_ECODE_OK) {
301  rc = false;
302  break;
303  }
304  }
305  }
306 
307  tv->stream_pq = NULL;
309  return rc;
310 }
311 
312 static void *TmThreadsSlotPktAcqLoop(void *td)
313 {
314  ThreadVars *tv = (ThreadVars *)td;
315  TmSlot *s = tv->tm_slots;
316  TmEcode r = TM_ECODE_OK;
317 
318  /* check if we are setup properly */
319  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
320  SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
321  " PktAcqLoop=%p, tmqh_in=%p,"
322  " tmqh_out=%p",
323  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
325  pthread_exit(NULL);
326  return NULL;
327  }
328 
329  if (!TmThreadsSlotPktAcqLoopInit(td)) {
330  goto error;
331  }
332 
333  bool run = TmThreadsWaitForUnpause(tv);
334 
335  while (run) {
336  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
337 
338  if (r == TM_ECODE_FAILED) {
340  run = false;
341  }
343  run = false;
344  }
345  if (r == TM_ECODE_DONE) {
346  run = false;
347  }
348  }
350  goto error;
351  }
352 
353  SCLogDebug("%s ending", tv->name);
354  pthread_exit((void *) 0);
355  return NULL;
356 
357 error:
358  pthread_exit(NULL);
359  return NULL;
360 }
361 
362 /**
363  * Also returns if the kill flag is set.
364  */
366 {
369 
370  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
371  SleepUsec(100);
372 
374  return false;
375  }
376 
378  }
379 
380  return true;
381 }
382 
383 static void *TmThreadsLib(void *td)
384 {
385  ThreadVars *tv = (ThreadVars *)td;
386  TmSlot *s = tv->tm_slots;
387 
388  /* check if we are setup properly */
389  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
390  SCLogError("TmSlot or ThreadVars badly setup: s=%p, tmqh_in=%p,"
391  " tmqh_out=%p",
392  s, tv->tmqh_in, tv->tmqh_out);
394  return NULL;
395  }
396 
397  if (!TmThreadsSlotPktAcqLoopInit(tv)) {
398  goto error;
399  }
400 
401  if (!TmThreadsWaitForUnpause(tv)) {
402  goto error;
403  }
404 
405  return NULL;
406 
407 error:
408  tv->stream_pq = NULL;
409  return (void *)-1;
410 }
411 
412 static void *TmThreadsSlotVar(void *td)
413 {
414  ThreadVars *tv = (ThreadVars *)td;
415  TmSlot *s = (TmSlot *)tv->tm_slots;
416  Packet *p = NULL;
417  TmEcode r = TM_ECODE_OK;
418 
420  PacketPoolInit();//Empty();
421 
423 
424  if (tv->thread_setup_flags != 0)
426 
427  /* Drop the capabilities for this thread */
428  SCDropCaps(tv);
429 
430  /* check if we are setup properly */
431  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
433  pthread_exit(NULL);
434  return NULL;
435  }
436 
437  for (; s != NULL; s = s->slot_next) {
438  if (s->SlotThreadInit != NULL) {
439  void *slot_data = NULL;
440  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
441  if (r != TM_ECODE_OK) {
443  goto error;
444  }
445  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
446  }
447 
448  /* special case: we need to access the stream queue
449  * from the flow timeout code */
450 
451  /* if the flowworker module is the first, get the threads input queue */
452  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
453  tv->stream_pq = tv->inq->pq;
454  tv->tm_flowworker = s;
455  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
457  if (tv->flow_queue == NULL) {
459  pthread_exit(NULL);
460  return NULL;
461  }
462  /* setup a queue */
463  } else if (s->tm_id == TMM_FLOWWORKER) {
464  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
465  if (tv->stream_pq_local == NULL)
466  FatalError("failed to alloc PacketQueue");
469  tv->tm_flowworker = s;
470  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
472  if (tv->flow_queue == NULL) {
474  pthread_exit(NULL);
475  return NULL;
476  }
477  }
478  }
479 
481 
482  // Each 'worker' thread uses this func to process/decode the packet read.
483  // Each decode method is different to receive methods in that they do not
484  // enter infinite loops. They use this as the core loop. As a result, at this
485  // point the worker threads can be considered both initialized and running.
487  bool run = TmThreadsWaitForUnpause(tv);
488 
489  s = (TmSlot *)tv->tm_slots;
490 
491  while (run) {
492  /* input a packet */
493  p = tv->tmqh_in(tv);
494 
495  /* if we didn't get a packet see if we need to do some housekeeping */
496  if (unlikely(p == NULL)) {
497  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
499  if (p != NULL) {
500  p->flags |= PKT_PSEUDO_STREAM_END;
502  }
503  }
504  }
505 
506  if (p != NULL) {
507  /* run the thread module(s) */
508  r = TmThreadsSlotVarRun(tv, p, s);
509  if (r == TM_ECODE_FAILED) {
512  break;
513  }
514 
515  /* output the packet */
516  tv->tmqh_out(tv, p);
517 
518  /* now handle the stream pq packets */
519  TmThreadsHandleInjectedPackets(tv);
520  }
521 
523  run = false;
524  }
525  }
527  goto error;
528  }
530 
531  pthread_exit(NULL);
532  return NULL;
533 
534 error:
535  tv->stream_pq = NULL;
536  pthread_exit(NULL);
537  return NULL;
538 }
539 
540 static void *TmThreadsManagement(void *td)
541 {
542  ThreadVars *tv = (ThreadVars *)td;
543  TmSlot *s = (TmSlot *)tv->tm_slots;
544  TmEcode r = TM_ECODE_OK;
545 
546  BUG_ON(s == NULL);
547 
549 
550  if (tv->thread_setup_flags != 0)
552 
553  /* Drop the capabilities for this thread */
554  SCDropCaps(tv);
555 
556  SCLogDebug("%s starting", tv->name);
557 
558  if (s->SlotThreadInit != NULL) {
559  void *slot_data = NULL;
560  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
561  if (r != TM_ECODE_OK) {
563  pthread_exit(NULL);
564  return NULL;
565  }
566  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
567  }
568 
570 
572 
573  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
574  /* handle error */
575  if (r == TM_ECODE_FAILED) {
577  }
578 
581  }
582 
585 
586  if (s->SlotThreadExitPrintStats != NULL) {
587  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
588  }
589 
590  if (s->SlotThreadDeinit != NULL) {
591  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
592  if (r != TM_ECODE_OK) {
594  pthread_exit(NULL);
595  return NULL;
596  }
597  }
598 
600  pthread_exit((void *) 0);
601  return NULL;
602 }
603 
604 /**
605  * \brief We set the slot functions.
606  *
607  * \param tv Pointer to the TV to set the slot function for.
608  * \param name Name of the slot variant.
609  * \param fn_p Pointer to a custom slot function. Used only if slot variant
610  * "name" is "custom".
611  *
612  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
613  */
614 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
615 {
616  if (name == NULL) {
617  if (fn_p == NULL) {
618  printf("Both slot name and function pointer can't be NULL inside "
619  "TmThreadSetSlots\n");
620  goto error;
621  } else {
622  name = "custom";
623  }
624  }
625 
626  if (strcmp(name, "varslot") == 0) {
627  tv->tm_func = TmThreadsSlotVar;
628  } else if (strcmp(name, "pktacqloop") == 0) {
629  tv->tm_func = TmThreadsSlotPktAcqLoop;
630  } else if (strcmp(name, "management") == 0) {
631  tv->tm_func = TmThreadsManagement;
632  } else if (strcmp(name, "command") == 0) {
633  tv->tm_func = TmThreadsManagement;
634  } else if (strcmp(name, "lib") == 0) {
635  tv->tm_func = TmThreadsLib;
636  } else if (strcmp(name, "custom") == 0) {
637  if (fn_p == NULL)
638  goto error;
639  tv->tm_func = fn_p;
640  } else {
641  printf("Error: Slot \"%s\" not supported\n", name);
642  goto error;
643  }
644 
645  return TM_ECODE_OK;
646 
647 error:
648  return TM_ECODE_FAILED;
649 }
650 
651 /**
652  * \brief Appends a new entry to the slots.
653  *
654  * \param tv TV the slot is attached to.
655  * \param tm TM to append.
656  * \param data Data to be passed on to the slot init function.
657  *
658  * \retval The allocated TmSlot or NULL if there is an error
659  */
660 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
661 {
662  TmSlot *slot = SCCalloc(1, sizeof(TmSlot));
663  if (unlikely(slot == NULL))
664  return;
665  SC_ATOMIC_INITPTR(slot->slot_data);
666  slot->SlotThreadInit = tm->ThreadInit;
667  slot->slot_initdata = data;
668  if (tm->Func) {
669  slot->SlotFunc = tm->Func;
670  } else if (tm->PktAcqLoop) {
671  slot->PktAcqLoop = tm->PktAcqLoop;
672  if (tm->PktAcqBreakLoop) {
673  tv->break_loop = true;
674  }
675  } else if (tm->Management) {
676  slot->Management = tm->Management;
677  }
679  slot->SlotThreadDeinit = tm->ThreadDeinit;
680  /* we don't have to check for the return value "-1". We wouldn't have
681  * received a TM as arg, if it didn't exist */
682  slot->tm_id = TmModuleGetIDForTM(tm);
683  slot->tm_flags |= tm->flags;
684 
685  tv->tmm_flags |= tm->flags;
686  tv->cap_flags |= tm->cap_flags;
687 
688  if (tv->tm_slots == NULL) {
689  tv->tm_slots = slot;
690  } else {
691  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
692 
693  /* get the last slot */
694  for ( ; a != NULL; a = a->slot_next) {
695  b = a;
696  }
697  /* append the new slot */
698  if (b != NULL) {
699  b->slot_next = slot;
700  }
701  }
702 }
703 
704 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
705 static int SetCPUAffinitySet(cpu_set_t *cs)
706 {
707 #if defined OS_FREEBSD
708  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
709  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
710 #elif OS_DARWIN
711  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
712  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
713 #else
714  pid_t tid = (pid_t)syscall(SYS_gettid);
715  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
716 #endif /* OS_FREEBSD */
717 
718  if (r != 0) {
719  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
720  strerror(errno));
721  return -1;
722  }
723 
724  return 0;
725 }
726 #endif
727 
728 
729 /**
730  * \brief Set the thread affinity on the calling thread.
731  *
732  * \param cpuid Id of the core/cpu to setup the affinity.
733  *
734  * \retval 0 If all goes well; -1 if something is wrong.
735  */
736 static int SetCPUAffinity(uint16_t cpuid)
737 {
738 #if defined __OpenBSD__ || defined sun
739  return 0;
740 #else
741  int cpu = (int)cpuid;
742 
743 #if defined OS_WIN32 || defined __CYGWIN__
744  DWORD cs = 1 << cpu;
745 
746  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
747  if (r != 0) {
748  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
749  strerror(errno));
750  return -1;
751  }
752  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
753  SCGetThreadIdLong(), cpu);
754 
755  return 0;
756 
757 #else
758  cpu_set_t cs;
759  memset(&cs, 0, sizeof(cs));
760 
761  CPU_ZERO(&cs);
762  CPU_SET(cpu, &cs);
763  return SetCPUAffinitySet(&cs);
764 #endif /* windows */
765 #endif /* not supported */
766 }
767 
768 
769 /**
770  * \brief Set the thread options (thread priority).
771  *
772  * \param tv Pointer to the ThreadVars to setup the thread priority.
773  *
774  * \retval TM_ECODE_OK.
775  */
777 {
779  tv->thread_priority = prio;
780 
781  return TM_ECODE_OK;
782 }
783 
784 /**
785  * \brief Adjusting nice value for threads.
786  */
788 {
789  SCEnter();
790 #ifndef __CYGWIN__
791 #ifdef OS_WIN32
792  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
793  SCLogError("Error setting priority for "
794  "thread %s: %s",
795  tv->name, strerror(errno));
796  } else {
797  SCLogDebug("Priority set to %"PRId32" for thread %s",
799  }
800 #else
801  int ret = nice(tv->thread_priority);
802  if (ret == -1) {
803  SCLogError("Error setting nice value %d "
804  "for thread %s: %s",
805  tv->thread_priority, tv->name, strerror(errno));
806  } else {
807  SCLogDebug("Nice value set to %"PRId32" for thread %s",
809  }
810 #endif /* OS_WIN32 */
811 #endif
812  SCReturn;
813 }
814 
815 
816 /**
817  * \brief Set the thread options (cpu affinity).
818  *
819  * \param tv pointer to the ThreadVars to setup the affinity.
820  * \param cpu cpu on which affinity is set.
821  *
822  * \retval TM_ECODE_OK
823  */
825 {
827  tv->cpu_affinity = cpu;
828 
829  return TM_ECODE_OK;
830 }
831 
832 
834 {
836  return TM_ECODE_OK;
837 
838  if (type > MAX_CPU_SET) {
839  SCLogError("invalid cpu type family");
840  return TM_ECODE_FAILED;
841  }
842 
844  tv->cpu_affinity = type;
845 
846  return TM_ECODE_OK;
847 }
848 
850 {
851  if (type >= MAX_CPU_SET) {
852  SCLogError("invalid cpu type family");
853  return 0;
854  }
855 
857 }
858 
859 /**
860  * \brief Set the thread options (cpu affinitythread).
861  * Priority should be already set by pthread_create.
862  *
863  * \param tv pointer to the ThreadVars of the calling thread.
864  */
866 {
868  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
869  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
871  SetCPUAffinity(tv->cpu_affinity);
872  }
873 
874 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
879  bool use_iface_affinity = RunmodeIsAutofp() && tv->cpu_affinity == RECEIVE_CPU_SET &&
880  FindAffinityByInterface(taf, tv->iface_name) != NULL;
881  use_iface_affinity |= RunmodeIsWorkers() && tv->cpu_affinity == WORKER_CPU_SET &&
882  FindAffinityByInterface(taf, tv->iface_name) != NULL;
883 
884  if (use_iface_affinity) {
886  }
887 
888  if (UtilAffinityGetAffinedCPUNum(taf) == 0) {
889  if (!taf->nocpu_warned) {
890  SCLogWarning("No CPU affinity set for %s", AffinityGetYamlPath(taf));
891  taf->nocpu_warned = true;
892  }
893  }
894 
895  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
896  uint16_t cpu = AffinityGetNextCPU(tv, taf);
897  SetCPUAffinity(cpu);
898  /* If CPU is in a set overwrite the default thread prio */
899  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
901  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
903  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
905  } else {
906  tv->thread_priority = taf->prio;
907  }
908  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
909  "%d, thread id %lu", tv->thread_priority,
910  tv->name, cpu, SCGetThreadIdLong());
911  } else {
912  SetCPUAffinitySet(&taf->cpu_set);
913  tv->thread_priority = taf->prio;
914  SCLogPerf("Setting prio %d for thread \"%s\", "
915  "thread id %lu", tv->thread_priority,
917  }
919  }
920 #endif
921 
922  return TM_ECODE_OK;
923 }
924 
925 /**
926  * \brief Creates and returns the TV instance for a new thread.
927  *
928  * \param name Name of this TV instance
929  * \param inq_name Incoming queue name
930  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
931  * \param outq_name Outgoing queue name
932  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
933  * \param slots String representation for the slot function to be used
934  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
935  * \param mucond Flag to indicate whether to initialize the condition
936  * and the mutex variables for this newly created TV.
937  *
938  * \retval the newly created TV instance, or NULL on error
939  */
940 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
941  const char *outq_name, const char *outqh_name, const char *slots,
942  void * (*fn_p)(void *), int mucond)
943 {
944  ThreadVars *tv = NULL;
945  Tmq *tmq = NULL;
946  Tmqh *tmqh = NULL;
947 
948  SCLogDebug("creating thread \"%s\"...", name);
949 
950  /* XXX create separate function for this: allocate a thread container */
951  tv = SCCalloc(1, sizeof(ThreadVars) + ThreadStorageSize());
952  if (unlikely(tv == NULL))
953  goto error;
954 
955  SC_ATOMIC_INIT(tv->flags);
957 
958  strlcpy(tv->name, name, sizeof(tv->name));
959 
960  /* default state for every newly created thread */
962 
963  /* set the incoming queue */
964  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
965  SCLogDebug("inq_name \"%s\"", inq_name);
966 
967  tmq = TmqGetQueueByName(inq_name);
968  if (tmq == NULL) {
969  tmq = TmqCreateQueue(inq_name);
970  if (tmq == NULL)
971  goto error;
972  }
973  SCLogDebug("tmq %p", tmq);
974 
975  tv->inq = tmq;
976  tv->inq->reader_cnt++;
977  SCLogDebug("tv->inq %p", tv->inq);
978  }
979  if (inqh_name != NULL) {
980  SCLogDebug("inqh_name \"%s\"", inqh_name);
981 
982  int id = TmqhNameToID(inqh_name);
983  if (id <= 0) {
984  goto error;
985  }
986  tmqh = TmqhGetQueueHandlerByName(inqh_name);
987  if (tmqh == NULL)
988  goto error;
989 
990  tv->tmqh_in = tmqh->InHandler;
991  tv->inq_id = (uint8_t)id;
992  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
993  }
994 
995  /* set the outgoing queue */
996  if (outqh_name != NULL) {
997  SCLogDebug("outqh_name \"%s\"", outqh_name);
998 
999  int id = TmqhNameToID(outqh_name);
1000  if (id <= 0) {
1001  goto error;
1002  }
1003 
1004  tmqh = TmqhGetQueueHandlerByName(outqh_name);
1005  if (tmqh == NULL)
1006  goto error;
1007 
1008  tv->tmqh_out = tmqh->OutHandler;
1009  tv->outq_id = (uint8_t)id;
1010 
1011  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1012  SCLogDebug("outq_name \"%s\"", outq_name);
1013 
1014  if (tmqh->OutHandlerCtxSetup != NULL) {
1015  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1016  if (tv->outctx == NULL)
1017  goto error;
1018  tv->outq = NULL;
1019  } else {
1020  tmq = TmqGetQueueByName(outq_name);
1021  if (tmq == NULL) {
1022  tmq = TmqCreateQueue(outq_name);
1023  if (tmq == NULL)
1024  goto error;
1025  }
1026  SCLogDebug("tmq %p", tmq);
1027 
1028  tv->outq = tmq;
1029  tv->outctx = NULL;
1030  tv->outq->writer_cnt++;
1031  }
1032  }
1033  }
1034 
1035  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1036  goto error;
1037  }
1038 
1039  if (mucond != 0)
1040  TmThreadInitMC(tv);
1041 
1043 
1044  return tv;
1045 
1046 error:
1047  SCLogError("failed to setup a thread");
1048 
1049  if (tv != NULL)
1050  SCFree(tv);
1051  return NULL;
1052 }
1053 
1054 /**
1055  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1056  * This function doesn't support custom slots, and hence shouldn't be
1057  * supplied \"custom\" as its slot type. All PPT threads are created
1058  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1059  * conditional variables are not used to kill the thread.
1060  *
1061  * \param name Name of this TV instance
1062  * \param inq_name Incoming queue name
1063  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1064  * \param outq_name Outgoing queue name
1065  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1066  * \param slots String representation for the slot function to be used
1067  *
1068  * \retval the newly created TV instance, or NULL on error
1069  */
1070 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1071  const char *inqh_name, const char *outq_name,
1072  const char *outqh_name, const char *slots)
1073 {
1074  ThreadVars *tv = NULL;
1075 
1076  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1077  slots, NULL, 0);
1078 
1079  if (tv != NULL) {
1080  tv->type = TVT_PPT;
1082  }
1083 
1084  return tv;
1085 }
1086 
1087 /**
1088  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1089  * This function supports only custom slot functions and hence a
1090  * function pointer should be sent as an argument.
1091  *
1092  * \param name Name of this TV instance
1093  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1094  * \param mucond Flag to indicate whether to initialize the condition
1095  * and the mutex variables for this newly created TV.
1096  *
1097  * \retval the newly created TV instance, or NULL on error
1098  */
1099 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1100  int mucond)
1101 {
1102  ThreadVars *tv = NULL;
1103 
1104  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1105 
1106  if (tv != NULL) {
1107  tv->type = TVT_MGMT;
1110  }
1111 
1112  return tv;
1113 }
1114 
1115 /**
1116  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1117  * This function supports only custom slot functions and hence a
1118  * function pointer should be sent as an argument.
1119  *
1120  * \param name Name of this TV instance
1121  * \param module Name of TmModule with MANAGEMENT flag set.
1122  * \param mucond Flag to indicate whether to initialize the condition
1123  * and the mutex variables for this newly created TV.
1124  *
1125  * \retval the newly created TV instance, or NULL on error
1126  */
1127 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1128  int mucond)
1129 {
1130  ThreadVars *tv = NULL;
1131 
1132  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1133 
1134  if (tv != NULL) {
1135  tv->type = TVT_MGMT;
1138 
1139  TmModule *m = TmModuleGetByName(module);
1140  if (m) {
1141  TmSlotSetFuncAppend(tv, m, NULL);
1142  }
1143  }
1144 
1145  return tv;
1146 }
1147 
1148 /**
1149  * \brief Creates and returns the TV instance for a Command thread (CMD).
1150  * This function supports only custom slot functions and hence a
1151  * function pointer should be sent as an argument.
1152  *
1153  * \param name Name of this TV instance
1154  * \param module Name of TmModule with COMMAND flag set.
1155  * \param mucond Flag to indicate whether to initialize the condition
1156  * and the mutex variables for this newly created TV.
1157  *
1158  * \retval the newly created TV instance, or NULL on error
1159  */
1160 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1161  int mucond)
1162 {
1163  ThreadVars *tv = NULL;
1164 
1165  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1166 
1167  if (tv != NULL) {
1168  tv->type = TVT_CMD;
1171 
1172  TmModule *m = TmModuleGetByName(module);
1173  if (m) {
1174  TmSlotSetFuncAppend(tv, m, NULL);
1175  }
1176  }
1177 
1178  return tv;
1179 }
1180 
1181 /**
1182  * \brief Appends this TV to tv_root based on its type
1183  *
1184  * \param type holds the type this TV belongs to.
1185  */
1187 {
1189 
1190  if (tv_root[type] == NULL) {
1191  tv_root[type] = tv;
1192  tv->next = NULL;
1193 
1195 
1196  return;
1197  }
1198 
1199  ThreadVars *t = tv_root[type];
1200 
1201  while (t) {
1202  if (t->next == NULL) {
1203  t->next = tv;
1204  tv->next = NULL;
1205  break;
1206  }
1207 
1208  t = t->next;
1209  }
1210 
1212 }
1213 
1214 static bool ThreadStillHasPackets(ThreadVars *tv)
1215 {
1216  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1217  /* we wait till we dry out all the inq packets, before we
1218  * kill this thread. Do note that you should have disabled
1219  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1220  PacketQueue *q = tv->inq->pq;
1221  SCMutexLock(&q->mutex_q);
1222  uint32_t len = q->len;
1223  SCMutexUnlock(&q->mutex_q);
1224  if (len != 0) {
1225  return true;
1226  }
1227  }
1228 
1229  if (tv->stream_pq != NULL) {
1231  uint32_t len = tv->stream_pq->len;
1233 
1234  if (len != 0) {
1235  return true;
1236  }
1237  }
1238  return false;
1239 }
1240 
1241 /**
1242  * \brief Kill a thread.
1243  *
1244  * \param tv A ThreadVars instance corresponding to the thread that has to be
1245  * killed.
1246  *
1247  * \retval r 1 killed successfully
1248  * 0 not yet ready, needs another look
1249  */
1250 static int TmThreadKillThread(ThreadVars *tv)
1251 {
1252  BUG_ON(tv == NULL);
1253 
1254  /* kill only once :) */
1255  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1256  return 1;
1257  }
1258 
1259  /* set the thread flag informing the thread that it needs to be
1260  * terminated */
1263 
1264  /* to be sure, signal more */
1265  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1266  if (tv->inq_id != TMQH_NOT_SET) {
1268  if (qh != NULL && qh->InShutdownHandler != NULL) {
1269  qh->InShutdownHandler(tv);
1270  }
1271  }
1272  if (tv->inq != NULL) {
1273  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1274  SCMutexLock(&tv->inq->pq->mutex_q);
1275  SCCondSignal(&tv->inq->pq->cond_q);
1276  SCMutexUnlock(&tv->inq->pq->mutex_q);
1277  }
1278  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1279  }
1280 
1281  if (tv->ctrl_cond != NULL ) {
1283  pthread_cond_broadcast(tv->ctrl_cond);
1285  }
1286  return 0;
1287  }
1288 
1289  if (tv->outctx != NULL) {
1290  if (tv->outq_id != TMQH_NOT_SET) {
1292  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1293  qh->OutHandlerCtxFree(tv->outctx);
1294  tv->outctx = NULL;
1295  }
1296  }
1297  }
1298 
1299  /* Join the thread and flag as dead, unless the thread ID is 0 as
1300  * its not a thread created by Suricata. */
1301  if (tv->t) {
1302  pthread_join(tv->t, NULL);
1303  SCLogDebug("thread %s stopped", tv->name);
1304  }
1306  return 1;
1307 }
1308 
1309 static bool ThreadBusy(ThreadVars *tv)
1310 {
1311  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
1312  TmModule *tm = TmModuleGetById(s->tm_id);
1313  if (tm && tm->ThreadBusy != NULL) {
1314  if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
1315  return true;
1316  }
1317  }
1318  return false;
1319 }
1320 
1321 /** \internal
1322  *
1323  * \brief make sure that all packet threads are done processing their
1324  * in-flight packets, including 'injected' flow packets.
1325  */
1326 static void TmThreadDrainPacketThreads(void)
1327 {
1328  ThreadVars *tv = NULL;
1329  struct timeval start_ts;
1330  struct timeval cur_ts;
1331  gettimeofday(&start_ts, NULL);
1332 
1333 again:
1334  gettimeofday(&cur_ts, NULL);
1335  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1336  SCLogWarning("unable to get all packet threads "
1337  "to process their packets in time");
1338  return;
1339  }
1340 
1342 
1343  /* all receive threads are part of packet processing threads */
1344  tv = tv_root[TVT_PPT];
1345  while (tv) {
1346  if (ThreadStillHasPackets(tv)) {
1347  /* we wait till we dry out all the inq packets, before we
1348  * kill this thread. Do note that you should have disabled
1349  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1351 
1352  /* sleep outside lock */
1353  SleepMsec(1);
1354  goto again;
1355  }
1356  if (ThreadBusy(tv)) {
1358 
1359  Packet *p = PacketGetFromAlloc();
1360  if (p != NULL) {
1364  PacketQueue *q = tv->stream_pq;
1365  SCMutexLock(&q->mutex_q);
1366  PacketEnqueue(q, p);
1367  SCCondSignal(&q->cond_q);
1368  SCMutexUnlock(&q->mutex_q);
1369  }
1370 
1371  /* don't sleep while holding a lock */
1372  SleepMsec(1);
1373  goto again;
1374  }
1375  tv = tv->next;
1376  }
1377 
1379 }
1380 
1381 /**
1382  * \brief Disable all threads having the specified TMs.
1383  *
1384  * Breaks out of the packet acquisition loop, and bumps
1385  * into the 'flow loop', where it will process packets
1386  * from the flow engine's shutdown handling.
1387  */
1389 {
1390  ThreadVars *tv = NULL;
1391  struct timeval start_ts;
1392  struct timeval cur_ts;
1393  gettimeofday(&start_ts, NULL);
1394 
1395 again:
1396  gettimeofday(&cur_ts, NULL);
1397  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1398  FatalError("Engine unable to disable detect "
1399  "thread - \"%s\". Killing engine",
1400  tv->name);
1401  }
1402 
1404 
1405  /* all receive threads are part of packet processing threads */
1406  tv = tv_root[TVT_PPT];
1407 
1408  /* we do have to keep in mind that TVs are arranged in the order
1409  * right from receive to log. The moment we fail to find a
1410  * receive TM amongst the slots in a tv, it indicates we are done
1411  * with all receive threads */
1412  while (tv) {
1413  int disable = 0;
1414  TmModule *tm = NULL;
1415  /* obtain the slots for this TV */
1416  TmSlot *slots = tv->tm_slots;
1417  while (slots != NULL) {
1418  tm = TmModuleGetById(slots->tm_id);
1419 
1420  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1421  disable = 1;
1422  break;
1423  }
1424 
1425  slots = slots->slot_next;
1426  continue;
1427  }
1428 
1429  if (disable) {
1430  if (ThreadStillHasPackets(tv)) {
1431  /* we wait till we dry out all the inq packets, before we
1432  * kill this thread. Do note that you should have disabled
1433  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1435  /* don't sleep while holding a lock */
1436  SleepMsec(1);
1437  goto again;
1438  }
1439 
1440  if (ThreadBusy(tv)) {
1442 
1443  Packet *p = PacketGetFromAlloc();
1444  if (p != NULL) {
1448  PacketQueue *q = tv->stream_pq;
1449  SCMutexLock(&q->mutex_q);
1450  PacketEnqueue(q, p);
1451  SCCondSignal(&q->cond_q);
1452  SCMutexUnlock(&q->mutex_q);
1453  }
1454 
1455  /* don't sleep while holding a lock */
1456  SleepMsec(1);
1457  goto again;
1458  }
1459 
1460  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1461  if (tm && tm->PktAcqBreakLoop != NULL) {
1462  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1463  }
1465 
1466  if (tv->inq != NULL) {
1467  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1468  SCMutexLock(&tv->inq->pq->mutex_q);
1469  SCCondSignal(&tv->inq->pq->cond_q);
1470  SCMutexUnlock(&tv->inq->pq->mutex_q);
1471  }
1472  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1473  }
1474 
1475  /* wait for it to enter the 'flow loop' stage */
1476  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1478 
1479  SleepMsec(1);
1480  goto again;
1481  }
1482  }
1483 
1484  tv = tv->next;
1485  }
1486 
1488 
1489  /* finally wait for all packet threads to have
1490  * processed all of their 'live' packets so we
1491  * don't process the last live packets together
1492  * with FFR packets */
1493  TmThreadDrainPacketThreads();
1494 }
1495 
1496 #ifdef DEBUG_VALIDATION
1497 static void TmThreadDumpThreads(void);
1498 #endif
1499 
1500 static void TmThreadDebugValidateNoMorePackets(void)
1501 {
1502 #ifdef DEBUG_VALIDATION
1504  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1505  if (ThreadStillHasPackets(tv)) {
1507  TmThreadDumpThreads();
1509  }
1510  }
1512 #endif
1513 }
1514 
1515 /** \internal
1516  * \brief check if a thread has any of the modules indicated by TM_FLAG_*
1517  * \param tv thread
1518  * \param flags TM_FLAG_*'s
1519  * \retval bool true if at least on of the flags is present */
1520 static inline bool CheckModuleFlags(const ThreadVars *tv, const uint8_t flags)
1521 {
1522  return (tv->tmm_flags & flags) != 0;
1523 }
1524 
1525 /**
1526  * \brief Disable all packet threads
1527  * \param set flag to set
1528  * \param check flag to check
1529  * \param module_flags bitflags of TmModule's to apply the `set` flag to.
1530  *
1531  * Support 2 stages in shutting down the packet threads:
1532  * 1. set THV_REQ_FLOW_LOOP and wait for THV_FLOW_LOOP
1533  * 2. set THV_KILL and wait for THV_RUNNING_DONE
1534  *
1535  * During step 1 the main loop is exited, and the flow loop logic is entered.
1536  * During step 2, the flow loop logic is done and the thread closes.
1537  *
1538  * `module_flags` limits which threads are disabled
1539  */
1541  const uint16_t set, const uint16_t check, const uint8_t module_flags)
1542 {
1543  struct timeval start_ts;
1544  struct timeval cur_ts;
1545 
1546  /* first drain all packet threads of their packets */
1547  TmThreadDrainPacketThreads();
1548 
1549  /* since all the threads possibly able to produce more packets
1550  * are now gone or inactive, we should see no packets anywhere
1551  * anymore. */
1552  TmThreadDebugValidateNoMorePackets();
1553 
1554  gettimeofday(&start_ts, NULL);
1555 again:
1556  gettimeofday(&cur_ts, NULL);
1557  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1558  FatalError("Engine unable to disable packet "
1559  "threads. Killing engine");
1560  }
1561 
1562  /* loop through the packet threads and kill them */
1564  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1565  /* only set flow worker threads to THV_REQ_FLOW_LOOP */
1566  if (!CheckModuleFlags(tv, module_flags)) {
1567  SCLogDebug("%s does not have any of the modules %02x, skip", tv->name, module_flags);
1568  continue;
1569  }
1570  TmThreadsSetFlag(tv, set);
1571 
1572  /* separate worker threads (autofp) will still wait at their
1573  * input queues. So nudge them here so they will observe the
1574  * THV_KILL flag. */
1575  if (tv->inq != NULL) {
1576  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1577  SCMutexLock(&tv->inq->pq->mutex_q);
1578  SCCondSignal(&tv->inq->pq->cond_q);
1579  SCMutexUnlock(&tv->inq->pq->mutex_q);
1580  }
1581  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1582  }
1583 
1584  /* wait for it to reach the expected state */
1585  if (!TmThreadsCheckFlag(tv, check)) {
1587  SCLogDebug("%s did not reach state %u, again", tv->name, check);
1588 
1589  SleepMsec(1);
1590  goto again;
1591  }
1592  }
1594 }
1595 
1596 #define MIN_WAIT_TIME 100
1597 #define MAX_WAIT_TIME 999999
1599 {
1600  ThreadVars *tv = NULL;
1601  unsigned int sleep_usec = MIN_WAIT_TIME;
1602 
1603  BUG_ON((family < 0) || (family >= TVT_MAX));
1604 
1605 again:
1607  tv = tv_root[family];
1608 
1609  while (tv) {
1610  int r = TmThreadKillThread(tv);
1611  if (r == 0) {
1613  SleepUsec(sleep_usec);
1614  sleep_usec *= 2; /* slowly back off */
1615  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1616  goto again;
1617  }
1618  sleep_usec = MIN_WAIT_TIME; /* reset */
1619 
1620  tv = tv->next;
1621  }
1623 }
1624 #undef MIN_WAIT_TIME
1625 #undef MAX_WAIT_TIME
1626 
1628 {
1629  int i = 0;
1630 
1631  for (i = 0; i < TVT_MAX; i++) {
1633  }
1634 }
1635 
1636 static void TmThreadFree(ThreadVars *tv)
1637 {
1638  TmSlot *s;
1639  TmSlot *ps;
1640  if (tv == NULL)
1641  return;
1642 
1643  SCLogDebug("Freeing thread '%s'.", tv->name);
1644 
1646 
1647  if (tv->flow_queue) {
1648  BUG_ON(tv->flow_queue->qlen != 0);
1649  SCFree(tv->flow_queue);
1650  }
1651 
1653 
1654  TmThreadDeinitMC(tv);
1655 
1656  if (tv->printable_name) {
1658  }
1659 
1660  if (tv->iface_name) {
1661  SCFree(tv->iface_name);
1662  }
1663 
1664  if (tv->stream_pq_local) {
1668  }
1669 
1670  s = (TmSlot *)tv->tm_slots;
1671  while (s) {
1672  ps = s;
1673  s = s->slot_next;
1674  SCFree(ps);
1675  }
1676 
1678  SCFree(tv);
1679 }
1680 
1682 {
1683  ThreadVars *tv = NULL;
1684  ThreadVars *ptv = NULL;
1685 
1686  if ((family < 0) || (family >= TVT_MAX))
1687  return;
1688 
1690  tv = tv_root[family];
1691 
1692  while (tv) {
1693  ptv = tv;
1694  tv = tv->next;
1695  TmThreadFree(ptv);
1696  }
1697  tv_root[family] = NULL;
1699 }
1700 
1701 /**
1702  * \brief Spawns a thread associated with the ThreadVars instance tv
1703  *
1704  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1705  */
1707 {
1708  pthread_attr_t attr;
1709  if (tv->tm_func == NULL) {
1710  FatalError("No thread function set");
1711  }
1712 
1713  /* Initialize and set thread detached attribute */
1714  pthread_attr_init(&attr);
1715 
1716  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1717 
1718  /* Adjust thread stack size if configured */
1720  SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1721  if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1722  FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
1724  }
1725  }
1726 
1727  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1728  if (rc) {
1729  FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc,
1730  strerror(errno));
1731  }
1732 
1733 #if DEBUG && HAVE_PTHREAD_GETATTR_NP
1735  if (pthread_getattr_np(tv->t, &attr) == 0) {
1736  size_t stack_size;
1737  void *stack_addr;
1738  pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1739  SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1740  } else {
1741  SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1742  "pthread_getattr_np() is %" PRId32,
1743  rc);
1744  }
1745  }
1746 #endif
1747 
1749 
1750  TmThreadAppend(tv, tv->type);
1751  return TM_ECODE_OK;
1752 }
1753 
1754 /**
1755  * \brief Spawns a "fake" lib thread associated with the ThreadVars instance tv
1756  *
1757  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1758  */
1760 {
1761  if (tv->tm_func == NULL) {
1762  printf("ERROR: no thread function set\n");
1763  return TM_ECODE_FAILED;
1764  }
1765 
1766  if (tv->tm_func((void *)tv) == (void *)-1) {
1767  return TM_ECODE_FAILED;
1768  }
1769 
1771 
1772  return TM_ECODE_OK;
1773 }
1774 
1775 /**
1776  * \brief Initializes the mutex and condition variables for this TV
1777  *
1778  * It can be used by a thread to control a wait loop that can also be
1779  * influenced by other threads.
1780  *
1781  * \param tv Pointer to a TV instance
1782  */
1784 {
1785  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1786  FatalError("Fatal error encountered in TmThreadInitMC. "
1787  "Exiting...");
1788  }
1789 
1790  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1791  printf("Error initializing the tv->m mutex\n");
1792  exit(EXIT_FAILURE);
1793  }
1794 
1795  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1796  FatalError("Fatal error encountered in TmThreadInitMC. "
1797  "Exiting...");
1798  }
1799 
1800  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1801  FatalError("Error initializing the tv->cond condition "
1802  "variable");
1803  }
1804 }
1805 
1806 static void TmThreadDeinitMC(ThreadVars *tv)
1807 {
1808  if (tv->ctrl_mutex) {
1810  SCFree(tv->ctrl_mutex);
1811  }
1812  if (tv->ctrl_cond) {
1814  SCFree(tv->ctrl_cond);
1815  }
1816 }
1817 
1818 /**
1819  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1820  * the kill flag has been set or not on the thread.
1821  *
1822  * \param tv Pointer to the TV instance.
1823  */
1825 {
1826  while (!TmThreadsCheckFlag(tv, flags)) {
1827  SleepUsec(100);
1828  }
1829 }
1830 
1831 /**
1832  * \brief Unpauses a thread
1833  *
1834  * \param tv Pointer to a TV instance that has to be unpaused
1835  */
1837 {
1839 }
1840 
1841 static TmEcode WaitOnThreadsRunningByType(const int t)
1842 {
1843  struct timeval start_ts;
1844  struct timeval cur_ts;
1845  uint32_t thread_cnt = 0;
1846 
1847  /* on retries, this will init to the last thread that started up already */
1848  ThreadVars *tv_start = tv_root[t];
1850  for (ThreadVars *tv = tv_start; tv != NULL; tv = tv->next) {
1851  thread_cnt++;
1852  }
1854 
1855  /* give threads a second each to start up, plus a margin of a minute. */
1856  uint32_t time_budget = 60 + thread_cnt;
1857 
1858  gettimeofday(&start_ts, NULL);
1859 again:
1861  ThreadVars *tv = tv_start;
1862  while (tv != NULL) {
1865 
1866  SCLogError("thread \"%s\" failed to "
1867  "start: flags %04x",
1868  tv->name, SC_ATOMIC_GET(tv->flags));
1869  return TM_ECODE_FAILED;
1870  }
1871 
1874 
1875  /* 60 seconds provided for the thread to transition from
1876  * THV_INIT_DONE to THV_RUNNING */
1877  gettimeofday(&cur_ts, NULL);
1878  if (((uint32_t)cur_ts.tv_sec - (uint32_t)start_ts.tv_sec) > time_budget) {
1879  SCLogError("thread \"%s\" failed to "
1880  "start in time: flags %04x. Total threads: %u. Time budget %us",
1881  tv->name, SC_ATOMIC_GET(tv->flags), thread_cnt, time_budget);
1882  return TM_ECODE_FAILED;
1883  }
1884 
1885  /* sleep a little to give the thread some
1886  * time to start running */
1887  SleepUsec(100);
1888  goto again;
1889  }
1890  tv_start = tv;
1891 
1892  tv = tv->next;
1893  }
1895  return TM_ECODE_OK;
1896 }
1897 
1898 /**
1899  * \brief Waits for all threads to be in a running state
1900  *
1901  * \retval TM_ECODE_OK if all are running or error if a thread failed
1902  */
1904 {
1905  uint16_t RX_num = 0;
1906  uint16_t W_num = 0;
1907  uint16_t FM_num = 0;
1908  uint16_t FR_num = 0;
1909  uint16_t TX_num = 0;
1910 
1911  for (int i = 0; i < TVT_MAX; i++) {
1912  if (WaitOnThreadsRunningByType(i) != TM_ECODE_OK)
1913  return TM_ECODE_FAILED;
1914  }
1915 
1917  for (int i = 0; i < TVT_MAX; i++) {
1918  for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
1919  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1920  RX_num++;
1921  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1922  W_num++;
1923  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1924  TX_num++;
1925  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1926  FM_num++;
1927  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1928  FR_num++;
1929  }
1930  }
1932 
1933  /* Construct a welcome string displaying
1934  * initialized thread types and counts */
1935  uint16_t app_len = 32;
1936  uint16_t buf_len = 256;
1937 
1938  char append_str[app_len];
1939  char thread_counts[buf_len];
1940 
1941  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1942  if (RX_num > 0) {
1943  snprintf(append_str, app_len, "RX: %u ", RX_num);
1944  strlcat(thread_counts, append_str, buf_len);
1945  }
1946  if (W_num > 0) {
1947  snprintf(append_str, app_len, "W: %u ", W_num);
1948  strlcat(thread_counts, append_str, buf_len);
1949  }
1950  if (TX_num > 0) {
1951  snprintf(append_str, app_len, "TX: %u ", TX_num);
1952  strlcat(thread_counts, append_str, buf_len);
1953  }
1954  if (FM_num > 0) {
1955  snprintf(append_str, app_len, "FM: %u ", FM_num);
1956  strlcat(thread_counts, append_str, buf_len);
1957  }
1958  if (FR_num > 0) {
1959  snprintf(append_str, app_len, "FR: %u ", FR_num);
1960  strlcat(thread_counts, append_str, buf_len);
1961  }
1962  snprintf(append_str, app_len, " Engine started.");
1963  strlcat(thread_counts, append_str, buf_len);
1964  SCLogNotice("%s", thread_counts);
1965 
1966  return TM_ECODE_OK;
1967 }
1968 
1969 /**
1970  * \brief Unpauses all threads present in tv_root
1971  */
1973 {
1975  for (int i = 0; i < TVT_MAX; i++) {
1976  ThreadVars *tv = tv_root[i];
1977  while (tv != NULL) {
1979  tv = tv->next;
1980  }
1981  }
1983 }
1984 
1985 /**
1986  * \brief Used to check the thread for certain conditions of failure.
1987  */
1989 {
1991  for (int i = 0; i < TVT_MAX; i++) {
1992  ThreadVars *tv = tv_root[i];
1993  while (tv) {
1995  FatalError("thread %s failed", tv->name);
1996  }
1997  tv = tv->next;
1998  }
1999  }
2001 }
2002 
2003 /**
2004  * \brief Used to check if all threads have finished their initialization. On
2005  * finding an un-initialized thread, it waits till that thread completes
2006  * its initialization, before proceeding to the next thread.
2007  *
2008  * \retval TM_ECODE_OK all initialized properly
2009  * \retval TM_ECODE_FAILED failure
2010  */
2012 {
2013  struct timeval start_ts;
2014  struct timeval cur_ts;
2015  gettimeofday(&start_ts, NULL);
2016 
2017 again:
2019  for (int i = 0; i < TVT_MAX; i++) {
2020  ThreadVars *tv = tv_root[i];
2021  while (tv != NULL) {
2024 
2025  SCLogError("thread \"%s\" failed to "
2026  "initialize: flags %04x",
2027  tv->name, SC_ATOMIC_GET(tv->flags));
2028  return TM_ECODE_FAILED;
2029  }
2030 
2031  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
2033 
2034  gettimeofday(&cur_ts, NULL);
2035  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
2036  SCLogError("thread \"%s\" failed to "
2037  "initialize in time: flags %04x",
2038  tv->name, SC_ATOMIC_GET(tv->flags));
2039  return TM_ECODE_FAILED;
2040  }
2041 
2042  /* sleep a little to give the thread some
2043  * time to finish initialization */
2044  SleepUsec(100);
2045  goto again;
2046  }
2047 
2050  SCLogError("thread \"%s\" failed to "
2051  "initialize.",
2052  tv->name);
2053  return TM_ECODE_FAILED;
2054  }
2057  SCLogError("thread \"%s\" closed on "
2058  "initialization.",
2059  tv->name);
2060  return TM_ECODE_FAILED;
2061  }
2062 
2063  tv = tv->next;
2064  }
2065  }
2067 
2068  return TM_ECODE_OK;
2069 }
2070 
2071 /**
2072  * \brief returns a count of all the threads that match the flag
2073  */
2075 {
2076  uint32_t cnt = 0;
2078  for (int i = 0; i < TVT_MAX; i++) {
2079  ThreadVars *tv = tv_root[i];
2080  while (tv != NULL) {
2081  if ((tv->tmm_flags & flags) == flags)
2082  cnt++;
2083 
2084  tv = tv->next;
2085  }
2086  }
2088  return cnt;
2089 }
2090 
2091 #ifdef DEBUG_VALIDATION
2092 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2093 {
2094  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2096  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2097  tv, s, s->tm_id, m->name);
2098  }
2099 }
2100 
2101 static void TmThreadDumpThreads(void)
2102 {
2104  for (int i = 0; i < TVT_MAX; i++) {
2105  ThreadVars *tv = tv_root[i];
2106  while (tv != NULL) {
2107  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2108  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2109  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2110  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2111  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2112  } else if (tv->stream_pq_local != NULL) {
2113  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2114  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2115  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2116  }
2117  }
2118  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2119  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2120  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2121  }
2122  TmThreadDoDumpSlots(tv);
2123  tv = tv->next;
2124  }
2125  }
2128 }
2129 #endif
2130 
2131 /* Aligned to CLS to avoid false sharing between atomic ops. */
2132 typedef struct Thread_ {
2133  ThreadVars *tv; /**< threadvars structure */
2134  const char *name;
2135  int type;
2136  int in_use; /**< bool to indicate this is in use */
2137 
2138  SC_ATOMIC_DECLARE(SCTime_t, pktts); /**< current packet time of this thread
2139  * (offline mode) */
2140  SCTime_t sys_sec_stamp; /**< timestamp in real system
2141  * time when the pktts was last updated. */
2143 } __attribute__((aligned(CLS))) Thread;
2145 typedef struct Threads_ {
2146  Thread *threads;
2147  size_t threads_size;
2148  int threads_cnt;
2150 
2151 static bool thread_store_sealed = false;
2152 static Threads thread_store = { NULL, 0, 0 };
2153 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2154 
2156 {
2157  SCMutexLock(&thread_store_lock);
2158  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2159  thread_store_sealed = true;
2160  SCMutexUnlock(&thread_store_lock);
2161 }
2162 
2164 {
2165  SCMutexLock(&thread_store_lock);
2166  DEBUG_VALIDATE_BUG_ON(!thread_store_sealed);
2167  thread_store_sealed = false;
2168  SCMutexUnlock(&thread_store_lock);
2169 }
2170 
2172 {
2173  SCMutexLock(&thread_store_lock);
2174  for (size_t s = 0; s < thread_store.threads_size; s++) {
2175  Thread *t = &thread_store.threads[s];
2176  if (t == NULL || t->in_use == 0)
2177  continue;
2178 
2179  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2180  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2181  if (t->tv) {
2182  ThreadVars *tv = t->tv;
2183  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2184  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2185  tv, tv->type, tv->name, tv->tmm_flags, flags);
2186  }
2187  }
2188  SCMutexUnlock(&thread_store_lock);
2189 }
2190 
2191 #define STEP 32
2192 /**
2193  * \retval id thread id, or 0 if not found
2194  */
2196 {
2197  SCMutexLock(&thread_store_lock);
2198  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2199  if (thread_store.threads == NULL) {
2200  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2201  BUG_ON(thread_store.threads == NULL);
2202  thread_store.threads_size = STEP;
2203  }
2204 
2205  size_t s;
2206  for (s = 0; s < thread_store.threads_size; s++) {
2207  if (thread_store.threads[s].in_use == 0) {
2208  Thread *t = &thread_store.threads[s];
2209  SCSpinInit(&t->spin, 0);
2210  SCSpinLock(&t->spin);
2211  t->name = tv->name;
2212  t->type = type;
2213  t->tv = tv;
2214  t->in_use = 1;
2215  SCSpinUnlock(&t->spin);
2216 
2217  SCMutexUnlock(&thread_store_lock);
2218  return (int)(s+1);
2219  }
2220  }
2221 
2222  /* if we get here the array is completely filled */
2223  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2224  BUG_ON(newmem == NULL);
2225  thread_store.threads = newmem;
2226  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2227 
2228  Thread *t = &thread_store.threads[thread_store.threads_size];
2229  SCSpinInit(&t->spin, 0);
2230  SCSpinLock(&t->spin);
2231  t->name = tv->name;
2232  t->type = type;
2233  t->tv = tv;
2234  t->in_use = 1;
2235  SCSpinUnlock(&t->spin);
2236 
2237  s = thread_store.threads_size;
2238  thread_store.threads_size += STEP;
2239 
2240  SCMutexUnlock(&thread_store_lock);
2241  return (int)(s+1);
2242 }
2243 #undef STEP
2244 
2245 void TmThreadsUnregisterThread(const int id)
2246 {
2247  SCMutexLock(&thread_store_lock);
2248  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2249  if (id <= 0 || id > (int)thread_store.threads_size) {
2250  SCMutexUnlock(&thread_store_lock);
2251  return;
2252  }
2253 
2254  /* id is one higher than index */
2255  int idx = id - 1;
2256 
2257  /* reset thread_id, which serves as clearing the record */
2258  thread_store.threads[idx].in_use = 0;
2259 
2260  /* check if we have at least one registered thread left */
2261  size_t s;
2262  for (s = 0; s < thread_store.threads_size; s++) {
2263  Thread *t = &thread_store.threads[s];
2264  if (t->in_use == 1) {
2265  goto end;
2266  }
2267  }
2268 
2269  /* if we get here no threads are registered */
2270  SCFree(thread_store.threads);
2271  thread_store.threads = NULL;
2272  thread_store.threads_size = 0;
2273  thread_store.threads_cnt = 0;
2274 
2275 end:
2276  SCMutexUnlock(&thread_store_lock);
2277 }
2278 
2279 void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2280 {
2281  SCTime_t now = SCTimeGetTime();
2282  int idx = id - 1;
2283  Thread *t = &thread_store.threads[idx];
2284  SCSpinLock(&t->spin);
2285  SC_ATOMIC_SET(t->pktts, ts);
2286 
2287 #ifdef DEBUG
2288  if (t->sys_sec_stamp.secs != 0) {
2289  SCTime_t tmpts = SCTIME_ADD_SECS(t->sys_sec_stamp, 3);
2290  if (SCTIME_CMP_LT(tmpts, now)) {
2291  SCLogDebug("%s: thread slept for %u secs", t->name, (uint32_t)(now.secs - tmpts.secs));
2292  }
2293  }
2294 #endif
2295 
2296  t->sys_sec_stamp = now;
2297  SCSpinUnlock(&t->spin);
2298 }
2299 
2301 {
2302  static SCTime_t nullts = SCTIME_INITIALIZER;
2303  bool ready = true;
2304  for (size_t s = 0; s < thread_store.threads_size; s++) {
2305  Thread *t = &thread_store.threads[s];
2306  if (!t->in_use) {
2307  break;
2308  }
2309  SCSpinLock(&t->spin);
2310  if (t->type != TVT_PPT) {
2311  SCSpinUnlock(&t->spin);
2312  continue;
2313  }
2314  if (SCTIME_CMP_EQ(t->sys_sec_stamp, nullts)) {
2315  ready = false;
2316  SCSpinUnlock(&t->spin);
2317  break;
2318  }
2319  SCSpinUnlock(&t->spin);
2320  }
2321  return ready;
2322 }
2323 
2325 {
2326  SCTime_t now = SCTimeGetTime();
2327  for (size_t s = 0; s < thread_store.threads_size; s++) {
2328  Thread *t = &thread_store.threads[s];
2329  if (!t->in_use) {
2330  break;
2331  }
2332  SCSpinLock(&t->spin);
2333  if (t->type != TVT_PPT) {
2334  SCSpinUnlock(&t->spin);
2335  continue;
2336  }
2337  SC_ATOMIC_SET(t->pktts, ts);
2338  t->sys_sec_stamp = now;
2339  SCSpinUnlock(&t->spin);
2340  }
2341 }
2342 
2344 {
2345  DEBUG_VALIDATE_BUG_ON(idx == 0);
2346  const int i = idx - 1;
2347  Thread *t = &thread_store.threads[i];
2348  return SC_ATOMIC_GET(t->pktts);
2349 }
2350 
2351 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2352 {
2353  struct timeval local = { 0 };
2354  static SCTime_t nullts = SCTIME_INITIALIZER;
2355  bool set = false;
2356  SCTime_t now = SCTimeGetTime();
2357 
2358  for (size_t s = 0; s < thread_store.threads_size; s++) {
2359  Thread *t = &thread_store.threads[s];
2360  if (t->in_use == 0) {
2361  break;
2362  }
2363  SCSpinLock(&t->spin);
2364  /* only packet threads set timestamps based on packets */
2365  if (t->type != TVT_PPT) {
2366  SCSpinUnlock(&t->spin);
2367  continue;
2368  }
2369  SCTime_t pktts = SC_ATOMIC_GET(t->pktts);
2370  if (SCTIME_CMP_NEQ(pktts, nullts)) {
2371  SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5);
2372  /* ignore sleeping threads */
2373  if (SCTIME_CMP_LT(sys_sec_stamp, now)) {
2374  SCSpinUnlock(&t->spin);
2375  continue;
2376  }
2377  if (!set) {
2378  SCTIME_TO_TIMEVAL(&local, pktts);
2379  set = true;
2380  } else {
2381  if (SCTIME_CMP_LT(pktts, SCTIME_FROM_TIMEVAL(&local))) {
2382  SCTIME_TO_TIMEVAL(&local, pktts);
2383  }
2384  }
2385  }
2386  SCSpinUnlock(&t->spin);
2387  }
2388  *ts = local;
2389  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2390 }
2391 
2393 {
2394  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2395  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2396  /* always create at least one thread */
2397  if (thread_max == 0)
2398  thread_max = ncpus * threading_detect_ratio;
2399  if (thread_max < 1)
2400  thread_max = 1;
2401  if (thread_max > 1024) {
2402  SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2403  thread_max = 1024;
2404  }
2405  return (uint16_t)thread_max;
2406 }
2407 
2408 /** \brief inject a flow into a threads flow queue
2409  */
2410 void TmThreadsInjectFlowById(Flow *f, const int id)
2411 {
2412  if (id > 0 && id <= (int)thread_store.threads_size) {
2413  int idx = id - 1;
2414  Thread *t = &thread_store.threads[idx];
2415  ThreadVars *tv = t->tv;
2416  if (tv != NULL && tv->flow_queue != NULL) {
2417  FlowEnqueue(tv->flow_queue, f);
2418 
2419  /* wake up listening thread(s) if necessary */
2420  if (tv->inq != NULL) {
2421  SCMutexLock(&tv->inq->pq->mutex_q);
2422  SCCondSignal(&tv->inq->pq->cond_q);
2423  SCMutexUnlock(&tv->inq->pq->mutex_q);
2424  } else if (tv->break_loop) {
2425  TmThreadsCaptureBreakLoop(tv);
2426  }
2427  return;
2428  }
2429  }
2430  BUG_ON(1);
2431 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:68
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:824
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:77
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:81
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:70
SCTmThreadsSlotPacketLoopFinish
bool SCTmThreadsSlotPacketLoopFinish(ThreadVars *tv)
Definition: tm-threads.c:275
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:131
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2300
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1783
tm-threads.h
TmThreadsSealThreads
void TmThreadsSealThreads(void)
Definition: tm-threads.c:2155
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:85
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
TmThreadLibSpawn
TmEcode TmThreadLibSpawn(ThreadVars *tv)
Spawns a "fake" lib thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1759
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:135
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1706
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1127
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check, const uint8_t module_flags)
Disable all packet threads.
Definition: tm-threads.c:1540
ThreadVars_::iface_name
char * iface_name
Definition: threadvars.h:135
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
threading_set_cpu_affinity
bool threading_set_cpu_affinity
Definition: runmodes.c:62
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:865
SCTIME_CMP_NEQ
#define SCTIME_CMP_NEQ(a, b)
Definition: util-time.h:107
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:70
Thread_::SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(SCTime_t, pktts)
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:314
TmThreadContinueThreads
void TmThreadContinueThreads(void)
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1972
CLS
#define CLS
Definition: suricata-common.h:69
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1070
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:380
AffinityGetYamlPath
char * AffinityGetYamlPath(ThreadsAffinityType *taf)
Get the YAML path for the given affinity type. The path is built using the parent name (if available)...
Definition: util-affinity.c:429
CaptureStatsSetup
void CaptureStatsSetup(ThreadVars *tv)
Definition: decode.c:1054
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:104
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:48
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:282
PKT_SRC_SHUTDOWN_FLUSH
@ PKT_SRC_SHUTDOWN_FLUSH
Definition: decode.h:64
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:103
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1824
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:175
thread-callbacks.h
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:56
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:45
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:83
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:62
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:90
Packet_::flags
uint32_t flags
Definition: decode.h:547
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:80
threads.h
UtilAffinityGetAffinedCPUNum
uint16_t UtilAffinityGetAffinedCPUNum(ThreadsAffinityType *taf)
Return the total number of CPUs in a given affinity.
Definition: util-affinity.c:1043
Threads
Threads
Definition: tm-threads.c:2149
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:347
TmThreadsUnsealThreads
void TmThreadsUnsealThreads(void)
Definition: tm-threads.c:2163
ThreadVars_::t
pthread_t t
Definition: threadvars.h:59
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:305
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:71
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2074
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:103
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:66
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:91
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:116
MIN
#define MIN(x, y)
Definition: suricata-common.h:408
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:84
SCSpinLock
#define SCSpinLock
Definition: threads-debug.h:236
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:73
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1388
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:388
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:122
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:776
SCTIME_CMP_EQ
#define SCTIME_CMP_EQ(a, b)
Definition: util-time.h:108
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:89
m
SCMutex m
Definition: flow-hash.h:6
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:90
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:44
TmModule_::ThreadBusy
bool(* ThreadBusy)(ThreadVars *tv, void *thread_data)
Definition: tm-modules.h:67
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:142
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:305
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:82
rww_lock_contention
thread_local uint64_t rww_lock_contention
PacketQueueNoLock_
simple fifo queue for packets
Definition: packet-queue.h:34
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:89
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:173
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2191
Thread_::in_use
int in_use
Definition: tm-threads.c:2136
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:38
TmThreadsGetThreadTime
SCTime_t TmThreadsGetThreadTime(const int idx)
Definition: tm-threads.c:2343
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:58
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:235
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:81
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2410
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:80
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2351
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmThreadsProcessDecodePseudoPackets
TmEcode TmThreadsProcessDecodePseudoPackets(ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
Definition: tm-threads.c:116
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:53
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:46
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:46
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1327
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:111
util-signal.h
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:384
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:78
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:787
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1596
capture-hooks.h
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2133
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1836
RunmodeIsWorkers
bool RunmodeIsWorkers(void)
Definition: runmodes.c:204
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:57
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:88
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:2011
RunmodeIsAutofp
bool RunmodeIsAutofp(void)
Definition: runmodes.c:209
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:61
tv
ThreadVars * tv
Definition: tm-threads.c:2144
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:82
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:95
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:208
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:58
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:120
threading_set_stack_size
uint64_t threading_set_stack_size
Definition: runmodes.c:63
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1270
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
Definition: tm-threads.c:2279
SCEnter
#define SCEnter(...)
Definition: util-debug.h:284
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:79
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:69
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:56
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1681
THV_KILL
#define THV_KILL
Definition: threadvars.h:40
THV_REQ_FLOW_LOOP
#define THV_REQ_FLOW_LOOP
Definition: threadvars.h:47
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:940
FlowQueueNew
FlowQueue * FlowQueueNew(void)
Definition: flow-queue.c:35
SCSpinUnlock
#define SCSpinUnlock
Definition: threads-debug.h:238
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:875
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2245
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2195
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:60
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:61
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:262
thread-storage.h
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
ThreadsAffinityType_::nocpu_warned
bool nocpu_warned
Definition: util-affinity.h:91
__attribute__
struct Thread_ __attribute__((aligned(CLS)))
Definition: tm-threads.c:2143
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
ThreadVars_::id
int id
Definition: threadvars.h:86
ThreadVars_::type
uint8_t type
Definition: threadvars.h:71
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:317
SCTIME_TO_TIMEVAL
#define SCTIME_TO_TIMEVAL(tv, t)
Definition: util-time.h:97
TmThreadTimeoutLoop
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
Definition: tm-threads.c:168
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:88
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:87
SCReturn
#define SCReturn
Definition: util-debug.h:286
stream.h
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:377
Thread_::sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2140
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1627
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:139
source-pcap-file-helper.h
Packet_
Definition: decode.h:501
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:129
CaptureHooksOnPseudoPacketCreated
void CaptureHooksOnPseudoPacketCreated(Packet *p)
Definition: capture-hooks.c:60
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:69
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1597
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:79
TmSlot_
Definition: tm-threads.h:53
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:89
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:68
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:80
name
const char * name
Definition: tm-threads.c:2145
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
runmodes.h
StatsSetupPrivate
int StatsSetupPrivate(StatsThreadContext *stats, const char *thread_name)
Definition: counters.c:1311
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1099
AffinityGetNextCPU
uint16_t AffinityGetNextCPU(ThreadVars *tv, ThreadsAffinityType *taf)
Definition: util-affinity.c:1006
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
TmModule_
Definition: tm-modules.h:47
StatsThreadInit
void StatsThreadInit(StatsThreadContext *stats)
Definition: counters.c:1331
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:256
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:58
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
Definition: tm-threads.c:2324
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:115
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
SCTIME_CMP_LT
#define SCTIME_CMP_LT(a, b)
Definition: util-time.h:105
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:39
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:37
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:660
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:379
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1598
Thread_::type
int type
Definition: tm-threads.c:2135
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:140
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:89
Packet_::flow
struct Flow_ * flow
Definition: decode.h:549
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1186
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:74
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:54
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:833
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:35
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:38
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:105
TmThreadsWaitForUnpause
bool TmThreadsWaitForUnpause(ThreadVars *tv)
Wait for a thread to become unpaused.
Definition: tm-threads.c:365
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:41
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:132
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:100
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:849
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:241
StatsSyncCounters
void StatsSyncCounters(StatsThreadContext *stats)
Definition: counters.c:477
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:72
sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2151
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:51
RECEIVE_CPU_SET
@ RECEIVE_CPU_SET
Definition: util-affinity.h:57
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:77
FatalError
#define FatalError(...)
Definition: util-debug.h:517
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax(void)
Definition: tm-threads.c:2392
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:66
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1160
ThreadStorageSize
unsigned int ThreadStorageSize(void)
Definition: thread-storage.c:25
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:143
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:52
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:216
threadvars.h
util-validate.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:264
SCSpinInit
#define SCSpinInit
Definition: threads-debug.h:239
SCThreadRunInitCallbacks
void SCThreadRunInitCallbacks(ThreadVars *tv)
Definition: thread-callbacks.c:48
ThreadsAffinityType_
Definition: util-affinity.h:72
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:37
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:631
TmSlot_::tm_flags
uint8_t tm_flags
Definition: tm-threads.h:67
Thread_::name
const char * name
Definition: tm-threads.c:2134
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:131
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:50
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:90
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:274
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:74
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:88
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:69
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:73
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:317
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:83
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:111
SCSpinlock
#define SCSpinlock
Definition: threads-debug.h:235
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:376
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:89
suricata.h
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:485
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:141
Tmq_
Definition: tm-queues.h:29
Thread_::spin
SCSpinlock spin
Definition: tm-threads.c:2142
TmThreadWaitOnThreadRunning
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
Definition: tm-threads.c:1903
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2171
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1988
type
int type
Definition: tm-threads.c:2146
ThreadFreeStorage
void ThreadFreeStorage(ThreadVars *tv)
Definition: thread-storage.c:50
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:128
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:82
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:84
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
FindAffinityByInterface
ThreadsAffinityType * FindAffinityByInterface(ThreadsAffinityType *parent, const char *interface_name)
Definition: util-affinity.c:113
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:108
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:265
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:95
SCTIME_INITIALIZER
#define SCTIME_INITIALIZER
Definition: util-time.h:51
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:250
SCTIME_ADD_SECS
#define SCTIME_ADD_SECS(ts, s)
Definition: util-time.h:64
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:42
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2132
ThreadVars_::stats
StatsThreadContext stats
Definition: threadvars.h:121
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:288
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:121
StatsThreadCleanup
void StatsThreadCleanup(StatsThreadContext *stats)
Definition: counters.c:1427
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:63
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:299
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:102
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:80
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:952
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:359
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:176
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:350
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:66