suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2024 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "thread-callbacks.h"
33 #include "threadvars.h"
34 #include "thread-storage.h"
35 #include "tm-queues.h"
36 #include "tm-queuehandlers.h"
37 #include "tm-threads.h"
38 #include "capture-hooks.h"
39 #include "tmqh-packetpool.h"
40 #include "threads.h"
41 #include "util-affinity.h"
42 #include "util-debug.h"
43 #include "util-privs.h"
44 #include "util-cpu.h"
45 #include "util-optimize.h"
46 #include "util-profiling.h"
47 #include "util-signal.h"
48 #include "queue.h"
49 #include "util-validate.h"
51 
52 #ifdef PROFILE_LOCKING
53 thread_local uint64_t mutex_lock_contention;
54 thread_local uint64_t mutex_lock_wait_ticks;
55 thread_local uint64_t mutex_lock_cnt;
56 
57 thread_local uint64_t spin_lock_contention;
58 thread_local uint64_t spin_lock_wait_ticks;
59 thread_local uint64_t spin_lock_cnt;
60 
61 thread_local uint64_t rww_lock_contention;
62 thread_local uint64_t rww_lock_wait_ticks;
63 thread_local uint64_t rww_lock_cnt;
64 
65 thread_local uint64_t rwr_lock_contention;
66 thread_local uint64_t rwr_lock_wait_ticks;
67 thread_local uint64_t rwr_lock_cnt;
68 #endif
69 
70 #ifdef OS_FREEBSD
71 #include <sched.h>
72 #include <sys/param.h>
73 #include <sys/resource.h>
74 #include <sys/cpuset.h>
75 #include <sys/thr.h>
76 #define cpu_set_t cpuset_t
77 #endif /* OS_FREEBSD */
78 
79 /* prototypes */
80 static int SetCPUAffinity(uint16_t cpu);
81 static void TmThreadDeinitMC(ThreadVars *tv);
82 
83 /* root of the threadvars list */
84 ThreadVars *tv_root[TVT_MAX] = { NULL };
85 
86 /* lock to protect tv_root */
88 
89 /**
90  * \brief Check if a thread flag is set.
91  *
92  * \retval 1 flag is set.
93  * \retval 0 flag is not set.
94  */
95 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
96 {
97  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
98 }
99 
100 /**
101  * \brief Set a thread flag.
102  */
103 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
104 {
105  SC_ATOMIC_OR(tv->flags, flag);
106 }
107 
108 /**
109  * \brief Unset a thread flag.
110  */
111 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
112 {
113  SC_ATOMIC_AND(tv->flags, ~flag);
114 }
115 
117  ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
118 {
119  while (decode_pq->top != NULL) {
120  Packet *extra_p = PacketDequeueNoLock(decode_pq);
121  if (unlikely(extra_p == NULL))
122  continue;
123  DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
124 
125  if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
127  }
128  }
130 }
131 
132 /**
133  * \brief Separate run function so we can call it recursively.
134  */
136 {
137  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
138  PACKET_PROFILING_TMM_START(p, s->tm_id);
139  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
140  PACKET_PROFILING_TMM_END(p, s->tm_id);
141  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
142 
143  /* handle error */
144  if (unlikely(r == TM_ECODE_FAILED)) {
145  /* Encountered error. Return packets to packetpool and return */
146  TmThreadsSlotProcessPktFail(tv, NULL);
147  return TM_ECODE_FAILED;
148  }
149  if (s->tm_flags & TM_FLAG_DECODE_TM) {
150  if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
151  TM_ECODE_OK) {
152  return TM_ECODE_FAILED;
153  }
154  }
155  }
156 
157  return TM_ECODE_OK;
158 }
159 
160 /** \internal
161  *
162  * \brief Process flow timeout packets
163  *
164  * Process flow timeout pseudo packets. During shutdown this loop
165  * is run until the flow engine kills the thread and the queue is
166  * empty.
167  */
169 {
170  TmSlot *fw_slot = tv->tm_flowworker;
171  int r = TM_ECODE_OK;
172 
173  if (tv->stream_pq == NULL || fw_slot == NULL) {
174  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
175  return r;
176  }
177 
178  SCLogDebug("flow end loop starting");
179  while (1) {
181  uint32_t len = tv->stream_pq->len;
183  if (len > 0) {
184  while (len--) {
188  if (likely(p)) {
189  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
190  r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
191  if (r == TM_ECODE_FAILED) {
192  break;
193  }
194  }
195  }
196  } else {
198  break;
199  }
200  SleepUsec(1);
201  }
202  }
203  SCLogDebug("flow end loop complete");
205 
206  return r;
207 }
208 
209 static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv)
210 {
211  TmSlot *s = tv->tm_slots;
212 
214 
215  if (tv->thread_setup_flags != 0)
217 
219  PacketPoolInit();
220 
221  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
222  if (slot->SlotThreadInit != NULL) {
223  void *slot_data = NULL;
224  TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
225  if (r != TM_ECODE_OK) {
226  if (r == TM_ECODE_DONE) {
227  EngineDone();
229  goto error;
230  } else {
232  goto error;
233  }
234  }
235  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
236  }
237 
238  /* if the flowworker module is the first, get the threads input queue */
239  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
240  tv->stream_pq = tv->inq->pq;
241  tv->tm_flowworker = slot;
242  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
244  if (tv->flow_queue == NULL) {
246  goto error;
247  }
248  /* setup a queue */
249  } else if (slot->tm_id == TMM_FLOWWORKER) {
250  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
251  if (tv->stream_pq_local == NULL)
252  FatalError("failed to alloc PacketQueue");
255  tv->tm_flowworker = slot;
256  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
258  if (tv->flow_queue == NULL) {
260  goto error;
261  }
262  }
263  }
264 
266 
268 
269  return true;
270 
271 error:
272  return false;
273 }
274 
276 {
277  TmSlot *s = tv->tm_slots;
278  bool rc = true;
279 
281 
283 
284  /* process all pseudo packets the flow timeout may throw at us */
286 
289 
291 
292  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
293  if (slot->SlotThreadExitPrintStats != NULL) {
294  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
295  }
296 
297  if (slot->SlotThreadDeinit != NULL) {
298  TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
299  if (r != TM_ECODE_OK) {
301  rc = false;
302  break;
303  }
304  }
305  }
306 
307  tv->stream_pq = NULL;
309  return rc;
310 }
311 
312 static void *TmThreadsSlotPktAcqLoop(void *td)
313 {
314  ThreadVars *tv = (ThreadVars *)td;
315  TmSlot *s = tv->tm_slots;
316  TmEcode r = TM_ECODE_OK;
317 
318  /* check if we are setup properly */
319  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
320  SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
321  " PktAcqLoop=%p, tmqh_in=%p,"
322  " tmqh_out=%p",
323  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
325  pthread_exit(NULL);
326  return NULL;
327  }
328 
329  if (!TmThreadsSlotPktAcqLoopInit(td)) {
330  goto error;
331  }
332 
333  bool run = TmThreadsWaitForUnpause(tv);
334 
335  while (run) {
336  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
337 
338  if (r == TM_ECODE_FAILED) {
340  run = false;
341  }
343  run = false;
344  }
345  if (r == TM_ECODE_DONE) {
346  run = false;
347  }
348  }
350  goto error;
351  }
352 
353  SCLogDebug("%s ending", tv->name);
354  pthread_exit((void *) 0);
355  return NULL;
356 
357 error:
358  pthread_exit(NULL);
359  return NULL;
360 }
361 
362 /**
363  * Also returns if the kill flag is set.
364  */
366 {
369 
370  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
371  SleepUsec(100);
372 
374  return false;
375  }
376 
378  }
379 
380  return true;
381 }
382 
383 static void *TmThreadsLib(void *td)
384 {
385  ThreadVars *tv = (ThreadVars *)td;
386  TmSlot *s = tv->tm_slots;
387 
388  /* check if we are setup properly */
389  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
390  SCLogError("TmSlot or ThreadVars badly setup: s=%p, tmqh_in=%p,"
391  " tmqh_out=%p",
392  s, tv->tmqh_in, tv->tmqh_out);
394  return NULL;
395  }
396 
397  if (!TmThreadsSlotPktAcqLoopInit(tv)) {
398  goto error;
399  }
400 
401  if (!TmThreadsWaitForUnpause(tv)) {
402  goto error;
403  }
404 
405  return NULL;
406 
407 error:
408  tv->stream_pq = NULL;
409  return (void *)-1;
410 }
411 
412 static void *TmThreadsSlotVar(void *td)
413 {
414  ThreadVars *tv = (ThreadVars *)td;
415  TmSlot *s = (TmSlot *)tv->tm_slots;
416  Packet *p = NULL;
417  TmEcode r = TM_ECODE_OK;
418 
420  PacketPoolInit();//Empty();
421 
423 
424  if (tv->thread_setup_flags != 0)
426 
427  /* Drop the capabilities for this thread */
428  SCDropCaps(tv);
429 
430  /* check if we are setup properly */
431  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
433  pthread_exit(NULL);
434  return NULL;
435  }
436 
437  for (; s != NULL; s = s->slot_next) {
438  if (s->SlotThreadInit != NULL) {
439  void *slot_data = NULL;
440  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
441  if (r != TM_ECODE_OK) {
443  goto error;
444  }
445  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
446  }
447 
448  /* special case: we need to access the stream queue
449  * from the flow timeout code */
450 
451  /* if the flowworker module is the first, get the threads input queue */
452  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
453  tv->stream_pq = tv->inq->pq;
454  tv->tm_flowworker = s;
455  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
457  if (tv->flow_queue == NULL) {
459  pthread_exit(NULL);
460  return NULL;
461  }
462  /* setup a queue */
463  } else if (s->tm_id == TMM_FLOWWORKER) {
464  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
465  if (tv->stream_pq_local == NULL)
466  FatalError("failed to alloc PacketQueue");
469  tv->tm_flowworker = s;
470  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
472  if (tv->flow_queue == NULL) {
474  pthread_exit(NULL);
475  return NULL;
476  }
477  }
478  }
479 
481 
482  // Each 'worker' thread uses this func to process/decode the packet read.
483  // Each decode method is different to receive methods in that they do not
484  // enter infinite loops. They use this as the core loop. As a result, at this
485  // point the worker threads can be considered both initialized and running.
487  bool run = TmThreadsWaitForUnpause(tv);
488 
489  s = (TmSlot *)tv->tm_slots;
490 
491  while (run) {
492  /* input a packet */
493  p = tv->tmqh_in(tv);
494 
495  /* if we didn't get a packet see if we need to do some housekeeping */
496  if (unlikely(p == NULL)) {
497  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
499  if (p != NULL) {
502  }
503  }
504  }
505 
506  if (p != NULL) {
507  /* run the thread module(s) */
508  r = TmThreadsSlotVarRun(tv, p, s);
509  if (r == TM_ECODE_FAILED) {
512  break;
513  }
514 
515  /* output the packet */
516  tv->tmqh_out(tv, p);
517 
518  /* now handle the stream pq packets */
519  TmThreadsHandleInjectedPackets(tv);
520  }
521 
523  run = false;
524  }
525  }
527  goto error;
528  }
530 
531  pthread_exit(NULL);
532  return NULL;
533 
534 error:
535  tv->stream_pq = NULL;
536  pthread_exit(NULL);
537  return NULL;
538 }
539 
540 static void *TmThreadsManagement(void *td)
541 {
542  ThreadVars *tv = (ThreadVars *)td;
543  TmSlot *s = (TmSlot *)tv->tm_slots;
544  TmEcode r = TM_ECODE_OK;
545 
546  BUG_ON(s == NULL);
547 
549 
550  if (tv->thread_setup_flags != 0)
552 
553  /* Drop the capabilities for this thread */
554  SCDropCaps(tv);
555 
556  SCLogDebug("%s starting", tv->name);
557 
558  if (s->SlotThreadInit != NULL) {
559  void *slot_data = NULL;
560  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
561  if (r != TM_ECODE_OK) {
563  pthread_exit(NULL);
564  return NULL;
565  }
566  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
567  }
568 
570 
572 
573  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
574  /* handle error */
575  if (r == TM_ECODE_FAILED) {
577  }
578 
581  }
582 
585 
586  if (s->SlotThreadExitPrintStats != NULL) {
587  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
588  }
589 
590  if (s->SlotThreadDeinit != NULL) {
591  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
592  if (r != TM_ECODE_OK) {
594  pthread_exit(NULL);
595  return NULL;
596  }
597  }
598 
600  pthread_exit((void *) 0);
601  return NULL;
602 }
603 
604 /**
605  * \brief We set the slot functions.
606  *
607  * \param tv Pointer to the TV to set the slot function for.
608  * \param name Name of the slot variant.
609  * \param fn_p Pointer to a custom slot function. Used only if slot variant
610  * "name" is "custom".
611  *
612  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
613  */
614 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
615 {
616  if (name == NULL) {
617  if (fn_p == NULL) {
618  printf("Both slot name and function pointer can't be NULL inside "
619  "TmThreadSetSlots\n");
620  goto error;
621  } else {
622  name = "custom";
623  }
624  }
625 
626  if (strcmp(name, "varslot") == 0) {
627  tv->tm_func = TmThreadsSlotVar;
628  } else if (strcmp(name, "pktacqloop") == 0) {
629  tv->tm_func = TmThreadsSlotPktAcqLoop;
630  } else if (strcmp(name, "management") == 0) {
631  tv->tm_func = TmThreadsManagement;
632  } else if (strcmp(name, "command") == 0) {
633  tv->tm_func = TmThreadsManagement;
634  } else if (strcmp(name, "lib") == 0) {
635  tv->tm_func = TmThreadsLib;
636  } else if (strcmp(name, "custom") == 0) {
637  if (fn_p == NULL)
638  goto error;
639  tv->tm_func = fn_p;
640  } else {
641  printf("Error: Slot \"%s\" not supported\n", name);
642  goto error;
643  }
644 
645  return TM_ECODE_OK;
646 
647 error:
648  return TM_ECODE_FAILED;
649 }
650 
651 /**
652  * \brief Appends a new entry to the slots.
653  *
654  * \param tv TV the slot is attached to.
655  * \param tm TM to append.
656  * \param data Data to be passed on to the slot init function.
657  *
658  * \retval The allocated TmSlot or NULL if there is an error
659  */
660 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
661 {
662  TmSlot *slot = SCCalloc(1, sizeof(TmSlot));
663  if (unlikely(slot == NULL))
664  return;
665  SC_ATOMIC_INITPTR(slot->slot_data);
666  slot->SlotThreadInit = tm->ThreadInit;
667  slot->slot_initdata = data;
668  if (tm->Func) {
669  slot->SlotFunc = tm->Func;
670  } else if (tm->PktAcqLoop) {
671  slot->PktAcqLoop = tm->PktAcqLoop;
672  if (tm->PktAcqBreakLoop) {
673  tv->break_loop = true;
674  }
675  } else if (tm->Management) {
676  slot->Management = tm->Management;
677  }
679  slot->SlotThreadDeinit = tm->ThreadDeinit;
680  /* we don't have to check for the return value "-1". We wouldn't have
681  * received a TM as arg, if it didn't exist */
682  slot->tm_id = TmModuleGetIDForTM(tm);
683  slot->tm_flags |= tm->flags;
684 
685  tv->tmm_flags |= tm->flags;
686  tv->cap_flags |= tm->cap_flags;
687 
688  if (tv->tm_slots == NULL) {
689  tv->tm_slots = slot;
690  } else {
691  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
692 
693  /* get the last slot */
694  for ( ; a != NULL; a = a->slot_next) {
695  b = a;
696  }
697  /* append the new slot */
698  if (b != NULL) {
699  b->slot_next = slot;
700  }
701  }
702 }
703 
704 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
705 static int SetCPUAffinitySet(cpu_set_t *cs)
706 {
707 #if defined OS_FREEBSD
708  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
709  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
710 #elif OS_DARWIN
711  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
712  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
713 #else
714  pid_t tid = (pid_t)syscall(SYS_gettid);
715  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
716 #endif /* OS_FREEBSD */
717 
718  if (r != 0) {
719  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
720  strerror(errno));
721  return -1;
722  }
723 
724  return 0;
725 }
726 #endif
727 
728 
729 /**
730  * \brief Set the thread affinity on the calling thread.
731  *
732  * \param cpuid Id of the core/cpu to setup the affinity.
733  *
734  * \retval 0 If all goes well; -1 if something is wrong.
735  */
736 static int SetCPUAffinity(uint16_t cpuid)
737 {
738 #if defined __OpenBSD__ || defined sun
739  return 0;
740 #else
741  int cpu = (int)cpuid;
742 
743 #if defined OS_WIN32 || defined __CYGWIN__
744  DWORD cs = 1 << cpu;
745 
746  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
747  if (r != 0) {
748  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
749  strerror(errno));
750  return -1;
751  }
752  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
753  SCGetThreadIdLong(), cpu);
754 
755  return 0;
756 
757 #else
758  cpu_set_t cs;
759  memset(&cs, 0, sizeof(cs));
760 
761  CPU_ZERO(&cs);
762  CPU_SET(cpu, &cs);
763  return SetCPUAffinitySet(&cs);
764 #endif /* windows */
765 #endif /* not supported */
766 }
767 
768 
769 /**
770  * \brief Set the thread options (thread priority).
771  *
772  * \param tv Pointer to the ThreadVars to setup the thread priority.
773  *
774  * \retval TM_ECODE_OK.
775  */
777 {
779  tv->thread_priority = prio;
780 
781  return TM_ECODE_OK;
782 }
783 
784 /**
785  * \brief Adjusting nice value for threads.
786  */
788 {
789  SCEnter();
790 #ifndef __CYGWIN__
791 #ifdef OS_WIN32
792  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
793  SCLogError("Error setting priority for "
794  "thread %s: %s",
795  tv->name, strerror(errno));
796  } else {
797  SCLogDebug("Priority set to %"PRId32" for thread %s",
799  }
800 #else
801  int ret = nice(tv->thread_priority);
802  if (ret == -1) {
803  SCLogError("Error setting nice value %d "
804  "for thread %s: %s",
805  tv->thread_priority, tv->name, strerror(errno));
806  } else {
807  SCLogDebug("Nice value set to %"PRId32" for thread %s",
809  }
810 #endif /* OS_WIN32 */
811 #endif
812  SCReturn;
813 }
814 
815 
816 /**
817  * \brief Set the thread options (cpu affinity).
818  *
819  * \param tv pointer to the ThreadVars to setup the affinity.
820  * \param cpu cpu on which affinity is set.
821  *
822  * \retval TM_ECODE_OK
823  */
825 {
827  tv->cpu_affinity = cpu;
828 
829  return TM_ECODE_OK;
830 }
831 
832 
834 {
836  return TM_ECODE_OK;
837 
838  if (type > MAX_CPU_SET) {
839  SCLogError("invalid cpu type family");
840  return TM_ECODE_FAILED;
841  }
842 
844  tv->cpu_affinity = type;
845 
846  return TM_ECODE_OK;
847 }
848 
850 {
851  if (type >= MAX_CPU_SET) {
852  SCLogError("invalid cpu type family");
853  return 0;
854  }
855 
857 }
858 
859 /**
860  * \brief Set the thread options (cpu affinitythread).
861  * Priority should be already set by pthread_create.
862  *
863  * \param tv pointer to the ThreadVars of the calling thread.
864  */
866 {
868  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
869  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
871  SetCPUAffinity(tv->cpu_affinity);
872  }
873 
874 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
879  bool use_iface_affinity = RunmodeIsAutofp() && tv->cpu_affinity == RECEIVE_CPU_SET &&
880  FindAffinityByInterface(taf, tv->iface_name) != NULL;
881  use_iface_affinity |= RunmodeIsWorkers() && tv->cpu_affinity == WORKER_CPU_SET &&
882  FindAffinityByInterface(taf, tv->iface_name) != NULL;
883 
884  if (use_iface_affinity) {
886  }
887 
888  if (UtilAffinityGetAffinedCPUNum(taf) == 0) {
889  if (!taf->nocpu_warned) {
890  SCLogWarning("No CPU affinity set for %s", AffinityGetYamlPath(taf));
891  taf->nocpu_warned = true;
892  }
893  }
894 
895  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
896  uint16_t cpu = AffinityGetNextCPU(tv, taf);
897  SetCPUAffinity(cpu);
898  /* If CPU is in a set overwrite the default thread prio */
899  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
901  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
903  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
905  } else {
906  tv->thread_priority = taf->prio;
907  }
908  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
909  "%d, thread id %lu", tv->thread_priority,
910  tv->name, cpu, SCGetThreadIdLong());
911  } else {
912  SetCPUAffinitySet(&taf->cpu_set);
913  tv->thread_priority = taf->prio;
914  SCLogPerf("Setting prio %d for thread \"%s\", "
915  "thread id %lu", tv->thread_priority,
917  }
919  }
920 #endif
921 
922  return TM_ECODE_OK;
923 }
924 
925 /**
926  * \brief Creates and returns the TV instance for a new thread.
927  *
928  * \param name Name of this TV instance
929  * \param inq_name Incoming queue name
930  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
931  * \param outq_name Outgoing queue name
932  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
933  * \param slots String representation for the slot function to be used
934  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
935  * \param mucond Flag to indicate whether to initialize the condition
936  * and the mutex variables for this newly created TV.
937  *
938  * \retval the newly created TV instance, or NULL on error
939  */
940 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
941  const char *outq_name, const char *outqh_name, const char *slots,
942  void * (*fn_p)(void *), int mucond)
943 {
944  Tmq *tmq = NULL;
945  Tmqh *tmqh = NULL;
946 
947  SCLogDebug("creating thread \"%s\"...", name);
948 
950  if (unlikely(tv == NULL))
951  goto error;
952 
954 
955  strlcpy(tv->name, name, sizeof(tv->name));
956 
957  /* default state for every newly created thread */
959 
960  /* set the incoming queue */
961  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
962  SCLogDebug("inq_name \"%s\"", inq_name);
963 
964  tmq = TmqGetQueueByName(inq_name);
965  if (tmq == NULL) {
966  tmq = TmqCreateQueue(inq_name);
967  if (tmq == NULL)
968  goto error;
969  }
970  SCLogDebug("tmq %p", tmq);
971 
972  tv->inq = tmq;
973  tv->inq->reader_cnt++;
974  SCLogDebug("tv->inq %p", tv->inq);
975  }
976  if (inqh_name != NULL) {
977  SCLogDebug("inqh_name \"%s\"", inqh_name);
978 
979  int id = TmqhNameToID(inqh_name);
980  if (id <= 0) {
981  goto error;
982  }
983  tmqh = TmqhGetQueueHandlerByName(inqh_name);
984  if (tmqh == NULL)
985  goto error;
986 
987  tv->tmqh_in = tmqh->InHandler;
988  tv->inq_id = (uint8_t)id;
989  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
990  }
991 
992  /* set the outgoing queue */
993  if (outqh_name != NULL) {
994  SCLogDebug("outqh_name \"%s\"", outqh_name);
995 
996  int id = TmqhNameToID(outqh_name);
997  if (id <= 0) {
998  goto error;
999  }
1000 
1001  tmqh = TmqhGetQueueHandlerByName(outqh_name);
1002  if (tmqh == NULL)
1003  goto error;
1004 
1005  tv->tmqh_out = tmqh->OutHandler;
1006  tv->outq_id = (uint8_t)id;
1007 
1008  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1009  SCLogDebug("outq_name \"%s\"", outq_name);
1010 
1011  if (tmqh->OutHandlerCtxSetup != NULL) {
1012  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1013  if (tv->outctx == NULL)
1014  goto error;
1015  tv->outq = NULL;
1016  } else {
1017  tmq = TmqGetQueueByName(outq_name);
1018  if (tmq == NULL) {
1019  tmq = TmqCreateQueue(outq_name);
1020  if (tmq == NULL)
1021  goto error;
1022  }
1023  SCLogDebug("tmq %p", tmq);
1024 
1025  tv->outq = tmq;
1026  tv->outctx = NULL;
1027  tv->outq->writer_cnt++;
1028  }
1029  }
1030  }
1031 
1032  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1033  goto error;
1034  }
1035 
1036  if (mucond != 0)
1037  TmThreadInitMC(tv);
1038 
1040 
1041  return tv;
1042 
1043 error:
1044  SCLogError("failed to setup a thread");
1045 
1046  ThreadVarsFree(tv);
1047  return NULL;
1048 }
1049 
1050 /**
1051  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1052  * This function doesn't support custom slots, and hence shouldn't be
1053  * supplied \"custom\" as its slot type. All PPT threads are created
1054  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1055  * conditional variables are not used to kill the thread.
1056  *
1057  * \param name Name of this TV instance
1058  * \param inq_name Incoming queue name
1059  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1060  * \param outq_name Outgoing queue name
1061  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1062  * \param slots String representation for the slot function to be used
1063  *
1064  * \retval the newly created TV instance, or NULL on error
1065  */
1066 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1067  const char *inqh_name, const char *outq_name,
1068  const char *outqh_name, const char *slots)
1069 {
1070  ThreadVars *tv = NULL;
1071 
1072  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1073  slots, NULL, 0);
1074 
1075  if (tv != NULL) {
1076  tv->type = TVT_PPT;
1078  }
1079 
1080  return tv;
1081 }
1082 
1083 /**
1084  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1085  * This function supports only custom slot functions and hence a
1086  * function pointer should be sent as an argument.
1087  *
1088  * \param name Name of this TV instance
1089  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1090  * \param mucond Flag to indicate whether to initialize the condition
1091  * and the mutex variables for this newly created TV.
1092  *
1093  * \retval the newly created TV instance, or NULL on error
1094  */
1095 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1096  int mucond)
1097 {
1098  ThreadVars *tv = NULL;
1099 
1100  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1101 
1102  if (tv != NULL) {
1103  tv->type = TVT_MGMT;
1106  }
1107 
1108  return tv;
1109 }
1110 
1111 /**
1112  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1113  * This function supports only custom slot functions and hence a
1114  * function pointer should be sent as an argument.
1115  *
1116  * \param name Name of this TV instance
1117  * \param module Name of TmModule with MANAGEMENT flag set.
1118  * \param mucond Flag to indicate whether to initialize the condition
1119  * and the mutex variables for this newly created TV.
1120  *
1121  * \retval the newly created TV instance, or NULL on error
1122  */
1123 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1124  int mucond)
1125 {
1126  ThreadVars *tv = NULL;
1127 
1128  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1129 
1130  if (tv != NULL) {
1131  tv->type = TVT_MGMT;
1134 
1135  TmModule *m = TmModuleGetByName(module);
1136  if (m) {
1137  TmSlotSetFuncAppend(tv, m, NULL);
1138  }
1139  }
1140 
1141  return tv;
1142 }
1143 
1144 /**
1145  * \brief Creates and returns the TV instance for a Command thread (CMD).
1146  * This function supports only custom slot functions and hence a
1147  * function pointer should be sent as an argument.
1148  *
1149  * \param name Name of this TV instance
1150  * \param module Name of TmModule with COMMAND flag set.
1151  * \param mucond Flag to indicate whether to initialize the condition
1152  * and the mutex variables for this newly created TV.
1153  *
1154  * \retval the newly created TV instance, or NULL on error
1155  */
1156 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1157  int mucond)
1158 {
1159  ThreadVars *tv = NULL;
1160 
1161  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1162 
1163  if (tv != NULL) {
1164  tv->type = TVT_CMD;
1167 
1168  TmModule *m = TmModuleGetByName(module);
1169  if (m) {
1170  TmSlotSetFuncAppend(tv, m, NULL);
1171  }
1172  }
1173 
1174  return tv;
1175 }
1176 
1177 /**
1178  * \brief Appends this TV to tv_root based on its type
1179  *
1180  * \param type holds the type this TV belongs to.
1181  */
1183 {
1185 
1186  if (tv_root[type] == NULL) {
1187  tv_root[type] = tv;
1188  tv->next = NULL;
1189 
1191 
1192  return;
1193  }
1194 
1195  ThreadVars *t = tv_root[type];
1196 
1197  while (t) {
1198  if (t->next == NULL) {
1199  t->next = tv;
1200  tv->next = NULL;
1201  break;
1202  }
1203 
1204  t = t->next;
1205  }
1206 
1208 }
1209 
1210 static bool ThreadStillHasPackets(ThreadVars *tv)
1211 {
1212  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1213  /* we wait till we dry out all the inq packets, before we
1214  * kill this thread. Do note that you should have disabled
1215  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1216  PacketQueue *q = tv->inq->pq;
1217  SCMutexLock(&q->mutex_q);
1218  uint32_t len = q->len;
1219  SCMutexUnlock(&q->mutex_q);
1220  if (len != 0) {
1221  return true;
1222  }
1223  }
1224 
1225  if (tv->stream_pq != NULL) {
1227  uint32_t len = tv->stream_pq->len;
1229 
1230  if (len != 0) {
1231  return true;
1232  }
1233  }
1234  return false;
1235 }
1236 
1237 /**
1238  * \brief Kill a thread.
1239  *
1240  * \param tv A ThreadVars instance corresponding to the thread that has to be
1241  * killed.
1242  *
1243  * \retval r 1 killed successfully
1244  * 0 not yet ready, needs another look
1245  */
1246 static int TmThreadKillThread(ThreadVars *tv)
1247 {
1248  BUG_ON(tv == NULL);
1249 
1250  /* kill only once :) */
1251  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1252  return 1;
1253  }
1254 
1255  /* set the thread flag informing the thread that it needs to be
1256  * terminated */
1259 
1260  /* to be sure, signal more */
1261  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1262  if (tv->inq_id != TMQH_NOT_SET) {
1264  if (qh != NULL && qh->InShutdownHandler != NULL) {
1265  qh->InShutdownHandler(tv);
1266  }
1267  }
1268  if (tv->inq != NULL) {
1269  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1270  SCMutexLock(&tv->inq->pq->mutex_q);
1271  SCCondSignal(&tv->inq->pq->cond_q);
1272  SCMutexUnlock(&tv->inq->pq->mutex_q);
1273  }
1274  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1275  }
1276 
1277  if (tv->ctrl_cond != NULL ) {
1279  pthread_cond_broadcast(tv->ctrl_cond);
1281  }
1282  return 0;
1283  }
1284 
1285  if (tv->outctx != NULL) {
1286  if (tv->outq_id != TMQH_NOT_SET) {
1288  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1289  qh->OutHandlerCtxFree(tv->outctx);
1290  tv->outctx = NULL;
1291  }
1292  }
1293  }
1294 
1295  /* Join the thread and flag as dead, unless the thread ID is 0 as
1296  * its not a thread created by Suricata. */
1297  if (tv->t) {
1298  pthread_join(tv->t, NULL);
1299  SCLogDebug("thread %s stopped", tv->name);
1300  }
1302  return 1;
1303 }
1304 
1305 static bool ThreadBusy(ThreadVars *tv)
1306 {
1307  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
1308  TmModule *tm = TmModuleGetById(s->tm_id);
1309  if (tm && tm->ThreadBusy != NULL) {
1310  if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
1311  return true;
1312  }
1313  }
1314  return false;
1315 }
1316 
1317 /** \internal
1318  *
1319  * \brief make sure that all packet threads are done processing their
1320  * in-flight packets, including 'injected' flow packets.
1321  */
1322 static void TmThreadDrainPacketThreads(void)
1323 {
1324  ThreadVars *tv = NULL;
1325  struct timeval start_ts;
1326  struct timeval cur_ts;
1327  gettimeofday(&start_ts, NULL);
1328 
1329 again:
1330  gettimeofday(&cur_ts, NULL);
1331  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1332  SCLogWarning("unable to get all packet threads "
1333  "to process their packets in time");
1334  return;
1335  }
1336 
1338 
1339  /* all receive threads are part of packet processing threads */
1340  tv = tv_root[TVT_PPT];
1341  while (tv) {
1342  if (ThreadStillHasPackets(tv)) {
1343  /* we wait till we dry out all the inq packets, before we
1344  * kill this thread. Do note that you should have disabled
1345  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1347 
1348  /* sleep outside lock */
1349  SleepMsec(1);
1350  goto again;
1351  }
1352  if (ThreadBusy(tv)) {
1354 
1356  if (p != NULL) {
1360  PacketQueue *q = tv->stream_pq;
1361  SCMutexLock(&q->mutex_q);
1362  PacketEnqueue(q, p);
1363  SCCondSignal(&q->cond_q);
1364  SCMutexUnlock(&q->mutex_q);
1365  }
1366 
1367  /* don't sleep while holding a lock */
1368  SleepMsec(1);
1369  goto again;
1370  }
1371  tv = tv->next;
1372  }
1373 
1375 }
1376 
1377 /**
1378  * \brief Disable all threads having the specified TMs.
1379  *
1380  * Breaks out of the packet acquisition loop, and bumps
1381  * into the 'flow loop', where it will process packets
1382  * from the flow engine's shutdown handling.
1383  */
1385 {
1386  ThreadVars *tv = NULL;
1387  struct timeval start_ts;
1388  struct timeval cur_ts;
1389  gettimeofday(&start_ts, NULL);
1390 
1391 again:
1392  gettimeofday(&cur_ts, NULL);
1393  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1394  FatalError("Engine unable to disable detect "
1395  "thread - \"%s\". Killing engine",
1396  tv->name);
1397  }
1398 
1400 
1401  /* all receive threads are part of packet processing threads */
1402  tv = tv_root[TVT_PPT];
1403 
1404  /* we do have to keep in mind that TVs are arranged in the order
1405  * right from receive to log. The moment we fail to find a
1406  * receive TM amongst the slots in a tv, it indicates we are done
1407  * with all receive threads */
1408  while (tv) {
1409  int disable = 0;
1410  TmModule *tm = NULL;
1411  /* obtain the slots for this TV */
1412  TmSlot *slots = tv->tm_slots;
1413  while (slots != NULL) {
1414  tm = TmModuleGetById(slots->tm_id);
1415 
1416  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1417  disable = 1;
1418  break;
1419  }
1420 
1421  slots = slots->slot_next;
1422  continue;
1423  }
1424 
1425  if (disable) {
1426  if (ThreadStillHasPackets(tv)) {
1427  /* we wait till we dry out all the inq packets, before we
1428  * kill this thread. Do note that you should have disabled
1429  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1431  /* don't sleep while holding a lock */
1432  SleepMsec(1);
1433  goto again;
1434  }
1435 
1436  if (ThreadBusy(tv)) {
1438 
1440  if (p != NULL) {
1444  PacketQueue *q = tv->stream_pq;
1445  SCMutexLock(&q->mutex_q);
1446  PacketEnqueue(q, p);
1447  SCCondSignal(&q->cond_q);
1448  SCMutexUnlock(&q->mutex_q);
1449  }
1450 
1451  /* don't sleep while holding a lock */
1452  SleepMsec(1);
1453  goto again;
1454  }
1455 
1456  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1457  if (tm && tm->PktAcqBreakLoop != NULL) {
1458  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1459  }
1461 
1462  if (tv->inq != NULL) {
1463  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1464  SCMutexLock(&tv->inq->pq->mutex_q);
1465  SCCondSignal(&tv->inq->pq->cond_q);
1466  SCMutexUnlock(&tv->inq->pq->mutex_q);
1467  }
1468  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1469  }
1470 
1471  /* wait for it to enter the 'flow loop' stage */
1472  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1474 
1475  SleepMsec(1);
1476  goto again;
1477  }
1478  }
1479 
1480  tv = tv->next;
1481  }
1482 
1484 
1485  /* finally wait for all packet threads to have
1486  * processed all of their 'live' packets so we
1487  * don't process the last live packets together
1488  * with FFR packets */
1489  TmThreadDrainPacketThreads();
1490 }
1491 
1492 #ifdef DEBUG_VALIDATION
1493 static void TmThreadDumpThreads(void);
1494 #endif
1495 
1496 static void TmThreadDebugValidateNoMorePackets(void)
1497 {
1498 #ifdef DEBUG_VALIDATION
1500  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1501  if (ThreadStillHasPackets(tv)) {
1503  TmThreadDumpThreads();
1505  }
1506  }
1508 #endif
1509 }
1510 
1511 /** \internal
1512  * \brief check if a thread has any of the modules indicated by TM_FLAG_*
1513  * \param tv thread
1514  * \param flags TM_FLAG_*'s
1515  * \retval bool true if at least on of the flags is present */
1516 static inline bool CheckModuleFlags(const ThreadVars *tv, const uint8_t flags)
1517 {
1518  return (tv->tmm_flags & flags) != 0;
1519 }
1520 
1521 /**
1522  * \brief Disable all packet threads
1523  * \param set flag to set
1524  * \param check flag to check
1525  * \param module_flags bitflags of TmModule's to apply the `set` flag to.
1526  *
1527  * Support 2 stages in shutting down the packet threads:
1528  * 1. set THV_REQ_FLOW_LOOP and wait for THV_FLOW_LOOP
1529  * 2. set THV_KILL and wait for THV_RUNNING_DONE
1530  *
1531  * During step 1 the main loop is exited, and the flow loop logic is entered.
1532  * During step 2, the flow loop logic is done and the thread closes.
1533  *
1534  * `module_flags` limits which threads are disabled
1535  */
1537  const uint16_t set, const uint16_t check, const uint8_t module_flags)
1538 {
1539  struct timeval start_ts;
1540  struct timeval cur_ts;
1541 
1542  /* first drain all packet threads of their packets */
1543  TmThreadDrainPacketThreads();
1544 
1545  /* since all the threads possibly able to produce more packets
1546  * are now gone or inactive, we should see no packets anywhere
1547  * anymore. */
1548  TmThreadDebugValidateNoMorePackets();
1549 
1550  gettimeofday(&start_ts, NULL);
1551 again:
1552  gettimeofday(&cur_ts, NULL);
1553  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1554  FatalError("Engine unable to disable packet "
1555  "threads. Killing engine");
1556  }
1557 
1558  /* loop through the packet threads and kill them */
1560  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1561  /* only set flow worker threads to THV_REQ_FLOW_LOOP */
1562  if (!CheckModuleFlags(tv, module_flags)) {
1563  SCLogDebug("%s does not have any of the modules %02x, skip", tv->name, module_flags);
1564  continue;
1565  }
1566  TmThreadsSetFlag(tv, set);
1567 
1568  /* separate worker threads (autofp) will still wait at their
1569  * input queues. So nudge them here so they will observe the
1570  * THV_KILL flag. */
1571  if (tv->inq != NULL) {
1572  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1573  SCMutexLock(&tv->inq->pq->mutex_q);
1574  SCCondSignal(&tv->inq->pq->cond_q);
1575  SCMutexUnlock(&tv->inq->pq->mutex_q);
1576  }
1577  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1578  }
1579 
1580  /* wait for it to reach the expected state */
1581  if (!TmThreadsCheckFlag(tv, check)) {
1583  SCLogDebug("%s did not reach state %u, again", tv->name, check);
1584 
1585  SleepMsec(1);
1586  goto again;
1587  }
1588  }
1590 }
1591 
1592 #define MIN_WAIT_TIME 100
1593 #define MAX_WAIT_TIME 999999
1595 {
1596  ThreadVars *tv = NULL;
1597  unsigned int sleep_usec = MIN_WAIT_TIME;
1598 
1599  BUG_ON((family < 0) || (family >= TVT_MAX));
1600 
1601 again:
1603  tv = tv_root[family];
1604 
1605  while (tv) {
1606  int r = TmThreadKillThread(tv);
1607  if (r == 0) {
1609  SleepUsec(sleep_usec);
1610  sleep_usec *= 2; /* slowly back off */
1611  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1612  goto again;
1613  }
1614  sleep_usec = MIN_WAIT_TIME; /* reset */
1615 
1616  tv = tv->next;
1617  }
1619 }
1620 #undef MIN_WAIT_TIME
1621 #undef MAX_WAIT_TIME
1622 
1624 {
1625  int i = 0;
1626 
1627  for (i = 0; i < TVT_MAX; i++) {
1629  }
1630 }
1631 
1632 static void TmThreadFree(ThreadVars *tv)
1633 {
1634  TmSlot *s;
1635  TmSlot *ps;
1636  if (tv == NULL)
1637  return;
1638 
1639  SCLogDebug("Freeing thread '%s'.", tv->name);
1640 
1642 
1643  if (tv->flow_queue) {
1644  BUG_ON(tv->flow_queue->qlen != 0);
1645  SCFree(tv->flow_queue);
1646  }
1647 
1649 
1650  TmThreadDeinitMC(tv);
1651 
1652  if (tv->printable_name) {
1654  }
1655 
1656  if (tv->iface_name) {
1657  SCFree(tv->iface_name);
1658  }
1659 
1660  if (tv->stream_pq_local) {
1664  }
1665 
1666  s = (TmSlot *)tv->tm_slots;
1667  while (s) {
1668  ps = s;
1669  s = s->slot_next;
1670  SCFree(ps);
1671  }
1672 
1674  ThreadVarsFree(tv);
1675 }
1676 
1678 {
1679  ThreadVars *tv = NULL;
1680  ThreadVars *ptv = NULL;
1681 
1682  if ((family < 0) || (family >= TVT_MAX))
1683  return;
1684 
1686  tv = tv_root[family];
1687 
1688  while (tv) {
1689  ptv = tv;
1690  tv = tv->next;
1691  TmThreadFree(ptv);
1692  }
1693  tv_root[family] = NULL;
1695 }
1696 
1697 /**
1698  * \brief Spawns a thread associated with the ThreadVars instance tv
1699  *
1700  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1701  */
1703 {
1704  pthread_attr_t attr;
1705  if (tv->tm_func == NULL) {
1706  FatalError("No thread function set");
1707  }
1708 
1709  /* Initialize and set thread detached attribute */
1710  pthread_attr_init(&attr);
1711 
1712  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1713 
1714  /* Adjust thread stack size if configured */
1716  SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1717  if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1718  FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
1720  }
1721  }
1722 
1723  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1724  if (rc) {
1725  FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc,
1726  strerror(errno));
1727  }
1728 
1729 #if DEBUG && HAVE_PTHREAD_GETATTR_NP
1731  if (pthread_getattr_np(tv->t, &attr) == 0) {
1732  size_t stack_size;
1733  void *stack_addr;
1734  pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1735  SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1736  } else {
1737  SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1738  "pthread_getattr_np() is %" PRId32,
1739  rc);
1740  }
1741  }
1742 #endif
1743 
1745 
1746  TmThreadAppend(tv, tv->type);
1747  return TM_ECODE_OK;
1748 }
1749 
1750 /**
1751  * \brief Spawns a "fake" lib thread associated with the ThreadVars instance tv
1752  *
1753  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1754  */
1756 {
1757  if (tv->tm_func == NULL) {
1758  printf("ERROR: no thread function set\n");
1759  return TM_ECODE_FAILED;
1760  }
1761 
1762  if (tv->tm_func((void *)tv) == (void *)-1) {
1763  return TM_ECODE_FAILED;
1764  }
1765 
1767 
1768  return TM_ECODE_OK;
1769 }
1770 
1771 /**
1772  * \brief Initializes the mutex and condition variables for this TV
1773  *
1774  * It can be used by a thread to control a wait loop that can also be
1775  * influenced by other threads.
1776  *
1777  * \param tv Pointer to a TV instance
1778  */
1780 {
1781  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1782  FatalError("Fatal error encountered in TmThreadInitMC. "
1783  "Exiting...");
1784  }
1785 
1786  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1787  printf("Error initializing the tv->m mutex\n");
1788  exit(EXIT_FAILURE);
1789  }
1790 
1791  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1792  FatalError("Fatal error encountered in TmThreadInitMC. "
1793  "Exiting...");
1794  }
1795 
1796  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1797  FatalError("Error initializing the tv->cond condition "
1798  "variable");
1799  }
1800 }
1801 
1802 static void TmThreadDeinitMC(ThreadVars *tv)
1803 {
1804  if (tv->ctrl_mutex) {
1806  SCFree(tv->ctrl_mutex);
1807  }
1808  if (tv->ctrl_cond) {
1810  SCFree(tv->ctrl_cond);
1811  }
1812 }
1813 
1814 /**
1815  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1816  * the kill flag has been set or not on the thread.
1817  *
1818  * \param tv Pointer to the TV instance.
1819  */
1821 {
1822  while (!TmThreadsCheckFlag(tv, flags)) {
1823  SleepUsec(100);
1824  }
1825 }
1826 
1827 /**
1828  * \brief Unpauses a thread
1829  *
1830  * \param tv Pointer to a TV instance that has to be unpaused
1831  */
1833 {
1835 }
1836 
1837 static TmEcode WaitOnThreadsRunningByType(const int t)
1838 {
1839  struct timeval start_ts;
1840  struct timeval cur_ts;
1841  uint32_t thread_cnt = 0;
1842 
1843  /* on retries, this will init to the last thread that started up already */
1844  ThreadVars *tv_start = tv_root[t];
1846  for (ThreadVars *tv = tv_start; tv != NULL; tv = tv->next) {
1847  thread_cnt++;
1848  }
1850 
1851  /* give threads a second each to start up, plus a margin of a minute. */
1852  uint32_t time_budget = 60 + thread_cnt;
1853 
1854  gettimeofday(&start_ts, NULL);
1855 again:
1857  ThreadVars *tv = tv_start;
1858  while (tv != NULL) {
1861 
1862  SCLogError("thread \"%s\" failed to "
1863  "start: flags %04x",
1864  tv->name, SC_ATOMIC_GET(tv->flags));
1865  return TM_ECODE_FAILED;
1866  }
1867 
1870 
1871  /* 60 seconds provided for the thread to transition from
1872  * THV_INIT_DONE to THV_RUNNING */
1873  gettimeofday(&cur_ts, NULL);
1874  if (((uint32_t)cur_ts.tv_sec - (uint32_t)start_ts.tv_sec) > time_budget) {
1875  SCLogError("thread \"%s\" failed to "
1876  "start in time: flags %04x. Total threads: %u. Time budget %us",
1877  tv->name, SC_ATOMIC_GET(tv->flags), thread_cnt, time_budget);
1878  return TM_ECODE_FAILED;
1879  }
1880 
1881  /* sleep a little to give the thread some
1882  * time to start running */
1883  SleepUsec(100);
1884  goto again;
1885  }
1886  tv_start = tv;
1887 
1888  tv = tv->next;
1889  }
1891  return TM_ECODE_OK;
1892 }
1893 
1894 /**
1895  * \brief Waits for all threads to be in a running state
1896  *
1897  * \retval TM_ECODE_OK if all are running or error if a thread failed
1898  */
1900 {
1901  uint16_t RX_num = 0;
1902  uint16_t W_num = 0;
1903  uint16_t FM_num = 0;
1904  uint16_t FR_num = 0;
1905  uint16_t TX_num = 0;
1906 
1907  for (int i = 0; i < TVT_MAX; i++) {
1908  if (WaitOnThreadsRunningByType(i) != TM_ECODE_OK)
1909  return TM_ECODE_FAILED;
1910  }
1911 
1913  for (int i = 0; i < TVT_MAX; i++) {
1914  for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
1915  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1916  RX_num++;
1917  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1918  W_num++;
1919  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1920  TX_num++;
1921  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1922  FM_num++;
1923  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1924  FR_num++;
1925  }
1926  }
1928 
1929  /* Construct a welcome string displaying
1930  * initialized thread types and counts */
1931  uint16_t app_len = 32;
1932  uint16_t buf_len = 256;
1933 
1934  char append_str[app_len];
1935  char thread_counts[buf_len];
1936 
1937  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1938  if (RX_num > 0) {
1939  snprintf(append_str, app_len, "RX: %u ", RX_num);
1940  strlcat(thread_counts, append_str, buf_len);
1941  }
1942  if (W_num > 0) {
1943  snprintf(append_str, app_len, "W: %u ", W_num);
1944  strlcat(thread_counts, append_str, buf_len);
1945  }
1946  if (TX_num > 0) {
1947  snprintf(append_str, app_len, "TX: %u ", TX_num);
1948  strlcat(thread_counts, append_str, buf_len);
1949  }
1950  if (FM_num > 0) {
1951  snprintf(append_str, app_len, "FM: %u ", FM_num);
1952  strlcat(thread_counts, append_str, buf_len);
1953  }
1954  if (FR_num > 0) {
1955  snprintf(append_str, app_len, "FR: %u ", FR_num);
1956  strlcat(thread_counts, append_str, buf_len);
1957  }
1958  snprintf(append_str, app_len, " Engine started.");
1959  strlcat(thread_counts, append_str, buf_len);
1960  SCLogNotice("%s", thread_counts);
1961 
1962  return TM_ECODE_OK;
1963 }
1964 
1965 /**
1966  * \brief Unpauses all threads present in tv_root
1967  */
1969 {
1971  for (int i = 0; i < TVT_MAX; i++) {
1972  ThreadVars *tv = tv_root[i];
1973  while (tv != NULL) {
1975  tv = tv->next;
1976  }
1977  }
1979 }
1980 
1981 /**
1982  * \brief Used to check the thread for certain conditions of failure.
1983  */
1985 {
1987  for (int i = 0; i < TVT_MAX; i++) {
1988  ThreadVars *tv = tv_root[i];
1989  while (tv) {
1991  FatalError("thread %s failed", tv->name);
1992  }
1993  tv = tv->next;
1994  }
1995  }
1997 }
1998 
1999 /**
2000  * \brief Used to check if all threads have finished their initialization. On
2001  * finding an un-initialized thread, it waits till that thread completes
2002  * its initialization, before proceeding to the next thread.
2003  *
2004  * \retval TM_ECODE_OK all initialized properly
2005  * \retval TM_ECODE_FAILED failure
2006  */
2008 {
2009  struct timeval start_ts;
2010  struct timeval cur_ts;
2011  gettimeofday(&start_ts, NULL);
2012 
2013 again:
2015  for (int i = 0; i < TVT_MAX; i++) {
2016  ThreadVars *tv = tv_root[i];
2017  while (tv != NULL) {
2020 
2021  SCLogError("thread \"%s\" failed to "
2022  "initialize: flags %04x",
2023  tv->name, SC_ATOMIC_GET(tv->flags));
2024  return TM_ECODE_FAILED;
2025  }
2026 
2027  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
2029 
2030  gettimeofday(&cur_ts, NULL);
2031  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
2032  SCLogError("thread \"%s\" failed to "
2033  "initialize in time: flags %04x",
2034  tv->name, SC_ATOMIC_GET(tv->flags));
2035  return TM_ECODE_FAILED;
2036  }
2037 
2038  /* sleep a little to give the thread some
2039  * time to finish initialization */
2040  SleepUsec(100);
2041  goto again;
2042  }
2043 
2046  SCLogError("thread \"%s\" failed to "
2047  "initialize.",
2048  tv->name);
2049  return TM_ECODE_FAILED;
2050  }
2053  SCLogError("thread \"%s\" closed on "
2054  "initialization.",
2055  tv->name);
2056  return TM_ECODE_FAILED;
2057  }
2058 
2059  tv = tv->next;
2060  }
2061  }
2063 
2064  return TM_ECODE_OK;
2065 }
2066 
2067 /**
2068  * \brief returns a count of all the threads that match the flag
2069  */
2071 {
2072  uint32_t cnt = 0;
2074  for (int i = 0; i < TVT_MAX; i++) {
2075  ThreadVars *tv = tv_root[i];
2076  while (tv != NULL) {
2077  if ((tv->tmm_flags & flags) == flags)
2078  cnt++;
2079 
2080  tv = tv->next;
2081  }
2082  }
2084  return cnt;
2085 }
2086 
2087 #ifdef DEBUG_VALIDATION
2088 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2089 {
2090  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2092  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2093  tv, s, s->tm_id, m->name);
2094  }
2095 }
2096 
2097 static void TmThreadDumpThreads(void)
2098 {
2100  for (int i = 0; i < TVT_MAX; i++) {
2101  ThreadVars *tv = tv_root[i];
2102  while (tv != NULL) {
2103  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2104  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2105  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2106  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2107  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2108  } else if (tv->stream_pq_local != NULL) {
2109  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2110  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2111  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2112  }
2113  }
2114  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2115  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2116  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2117  }
2118  TmThreadDoDumpSlots(tv);
2119  tv = tv->next;
2120  }
2121  }
2124 }
2125 #endif
2126 
2127 /* Aligned to CLS to avoid false sharing between atomic ops. */
2128 typedef struct Thread_ {
2129  ThreadVars *tv; /**< threadvars structure */
2130  const char *name;
2131  int type;
2132  int in_use; /**< bool to indicate this is in use */
2133 
2134  SC_ATOMIC_DECLARE(SCTime_t, pktts); /**< current packet time of this thread
2135  * (offline mode) */
2136  SCTime_t sys_sec_stamp; /**< timestamp in real system
2137  * time when the pktts was last updated. */
2139 } __attribute__((aligned(CLS))) Thread;
2141 typedef struct Threads_ {
2142  Thread *threads;
2143  size_t threads_size;
2144  int threads_cnt;
2146 
2147 static bool thread_store_sealed = false;
2148 static Threads thread_store = { NULL, 0, 0 };
2149 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2150 
2152 {
2153  SCMutexLock(&thread_store_lock);
2154  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2155  thread_store_sealed = true;
2156  SCMutexUnlock(&thread_store_lock);
2157 }
2158 
2160 {
2161  SCMutexLock(&thread_store_lock);
2162  DEBUG_VALIDATE_BUG_ON(!thread_store_sealed);
2163  thread_store_sealed = false;
2164  SCMutexUnlock(&thread_store_lock);
2165 }
2166 
2168 {
2169  SCMutexLock(&thread_store_lock);
2170  for (size_t s = 0; s < thread_store.threads_size; s++) {
2171  Thread *t = &thread_store.threads[s];
2172  if (t == NULL || t->in_use == 0)
2173  continue;
2174 
2175  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2176  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2177  if (t->tv) {
2178  ThreadVars *tv = t->tv;
2179  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2180  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2181  tv, tv->type, tv->name, tv->tmm_flags, flags);
2182  }
2183  }
2184  SCMutexUnlock(&thread_store_lock);
2185 }
2186 
2187 #define STEP 32
2188 /**
2189  * \retval id thread id, or 0 if not found
2190  */
2192 {
2193  SCMutexLock(&thread_store_lock);
2194  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2195  if (thread_store.threads == NULL) {
2196  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2197  BUG_ON(thread_store.threads == NULL);
2198  thread_store.threads_size = STEP;
2199  }
2200 
2201  size_t s;
2202  for (s = 0; s < thread_store.threads_size; s++) {
2203  if (thread_store.threads[s].in_use == 0) {
2204  Thread *t = &thread_store.threads[s];
2205  SCSpinInit(&t->spin, 0);
2206  SCSpinLock(&t->spin);
2207  t->name = tv->name;
2208  t->type = type;
2209  t->tv = tv;
2210  t->in_use = 1;
2211  SCSpinUnlock(&t->spin);
2212 
2213  SCMutexUnlock(&thread_store_lock);
2214  return (int)(s+1);
2215  }
2216  }
2217 
2218  /* if we get here the array is completely filled */
2219  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2220  BUG_ON(newmem == NULL);
2221  thread_store.threads = newmem;
2222  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2223 
2224  Thread *t = &thread_store.threads[thread_store.threads_size];
2225  SCSpinInit(&t->spin, 0);
2226  SCSpinLock(&t->spin);
2227  t->name = tv->name;
2228  t->type = type;
2229  t->tv = tv;
2230  t->in_use = 1;
2231  SCSpinUnlock(&t->spin);
2232 
2233  s = thread_store.threads_size;
2234  thread_store.threads_size += STEP;
2235 
2236  SCMutexUnlock(&thread_store_lock);
2237  return (int)(s+1);
2238 }
2239 #undef STEP
2240 
2241 void TmThreadsUnregisterThread(const int id)
2242 {
2243  SCMutexLock(&thread_store_lock);
2244  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2245  if (id <= 0 || id > (int)thread_store.threads_size) {
2246  SCMutexUnlock(&thread_store_lock);
2247  return;
2248  }
2249 
2250  /* id is one higher than index */
2251  int idx = id - 1;
2252 
2253  /* reset thread_id, which serves as clearing the record */
2254  thread_store.threads[idx].in_use = 0;
2255 
2256  /* check if we have at least one registered thread left */
2257  size_t s;
2258  for (s = 0; s < thread_store.threads_size; s++) {
2259  Thread *t = &thread_store.threads[s];
2260  if (t->in_use == 1) {
2261  goto end;
2262  }
2263  }
2264 
2265  /* if we get here no threads are registered */
2266  SCFree(thread_store.threads);
2267  thread_store.threads = NULL;
2268  thread_store.threads_size = 0;
2269  thread_store.threads_cnt = 0;
2270 
2271 end:
2272  SCMutexUnlock(&thread_store_lock);
2273 }
2274 
2275 void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2276 {
2277  SCTime_t now = SCTimeGetTime();
2278  int idx = id - 1;
2279  Thread *t = &thread_store.threads[idx];
2280  SCSpinLock(&t->spin);
2281  SC_ATOMIC_SET(t->pktts, ts);
2282 
2283 #ifdef DEBUG
2284  if (t->sys_sec_stamp.secs != 0) {
2285  SCTime_t tmpts = SCTIME_ADD_SECS(t->sys_sec_stamp, 3);
2286  if (SCTIME_CMP_LT(tmpts, now)) {
2287  SCLogDebug("%s: thread slept for %u secs", t->name, (uint32_t)(now.secs - tmpts.secs));
2288  }
2289  }
2290 #endif
2291 
2292  t->sys_sec_stamp = now;
2293  SCSpinUnlock(&t->spin);
2294 }
2295 
2297 {
2298  static SCTime_t nullts = SCTIME_INITIALIZER;
2299  bool ready = true;
2300  for (size_t s = 0; s < thread_store.threads_size; s++) {
2301  Thread *t = &thread_store.threads[s];
2302  if (!t->in_use) {
2303  break;
2304  }
2305  SCSpinLock(&t->spin);
2306  if (t->type != TVT_PPT) {
2307  SCSpinUnlock(&t->spin);
2308  continue;
2309  }
2310  if (SCTIME_CMP_EQ(t->sys_sec_stamp, nullts)) {
2311  ready = false;
2312  SCSpinUnlock(&t->spin);
2313  break;
2314  }
2315  SCSpinUnlock(&t->spin);
2316  }
2317  return ready;
2318 }
2319 
2321 {
2322  SCTime_t now = SCTimeGetTime();
2323  for (size_t s = 0; s < thread_store.threads_size; s++) {
2324  Thread *t = &thread_store.threads[s];
2325  if (!t->in_use) {
2326  break;
2327  }
2328  SCSpinLock(&t->spin);
2329  if (t->type != TVT_PPT) {
2330  SCSpinUnlock(&t->spin);
2331  continue;
2332  }
2333  SC_ATOMIC_SET(t->pktts, ts);
2334  t->sys_sec_stamp = now;
2335  SCSpinUnlock(&t->spin);
2336  }
2337 }
2338 
2340 {
2341  DEBUG_VALIDATE_BUG_ON(idx == 0);
2342  const int i = idx - 1;
2343  Thread *t = &thread_store.threads[i];
2344  return SC_ATOMIC_GET(t->pktts);
2345 }
2346 
2347 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2348 {
2349  struct timeval local = { 0 };
2350  static SCTime_t nullts = SCTIME_INITIALIZER;
2351  bool set = false;
2352  SCTime_t now = SCTimeGetTime();
2353 
2354  for (size_t s = 0; s < thread_store.threads_size; s++) {
2355  Thread *t = &thread_store.threads[s];
2356  if (t->in_use == 0) {
2357  break;
2358  }
2359  SCSpinLock(&t->spin);
2360  /* only packet threads set timestamps based on packets */
2361  if (t->type != TVT_PPT) {
2362  SCSpinUnlock(&t->spin);
2363  continue;
2364  }
2365  SCTime_t pktts = SC_ATOMIC_GET(t->pktts);
2366  if (SCTIME_CMP_NEQ(pktts, nullts)) {
2367  SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5);
2368  /* ignore sleeping threads */
2369  if (SCTIME_CMP_LT(sys_sec_stamp, now)) {
2370  SCSpinUnlock(&t->spin);
2371  continue;
2372  }
2373  if (!set) {
2374  SCTIME_TO_TIMEVAL(&local, pktts);
2375  set = true;
2376  } else {
2377  if (SCTIME_CMP_LT(pktts, SCTIME_FROM_TIMEVAL(&local))) {
2378  SCTIME_TO_TIMEVAL(&local, pktts);
2379  }
2380  }
2381  }
2382  SCSpinUnlock(&t->spin);
2383  }
2384  *ts = local;
2385  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2386 }
2387 
2389 {
2390  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2391  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2392  /* always create at least one thread */
2393  if (thread_max == 0)
2394  thread_max = ncpus * threading_detect_ratio;
2395  if (thread_max < 1)
2396  thread_max = 1;
2397  if (thread_max > 1024) {
2398  SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2399  thread_max = 1024;
2400  }
2401  return (uint16_t)thread_max;
2402 }
2403 
2404 /** \brief inject a flow into a threads flow queue
2405  */
2406 void TmThreadsInjectFlowById(Flow *f, const int id)
2407 {
2408  if (id > 0 && id <= (int)thread_store.threads_size) {
2409  int idx = id - 1;
2410  Thread *t = &thread_store.threads[idx];
2411  ThreadVars *tv = t->tv;
2412  if (tv != NULL && tv->flow_queue != NULL) {
2413  FlowEnqueue(tv->flow_queue, f);
2414 
2415  /* wake up listening thread(s) if necessary */
2416  if (tv->inq != NULL) {
2417  SCMutexLock(&tv->inq->pq->mutex_q);
2418  SCCondSignal(&tv->inq->pq->cond_q);
2419  SCMutexUnlock(&tv->inq->pq->mutex_q);
2420  } else if (tv->break_loop) {
2421  TmThreadsCaptureBreakLoop(tv);
2422  }
2423  return;
2424  }
2425  }
2426  BUG_ON(1);
2427 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:68
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:824
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:77
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:81
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:70
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:89
SCTmThreadsSlotPacketLoopFinish
bool SCTmThreadsSlotPacketLoopFinish(ThreadVars *tv)
Definition: tm-threads.c:275
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:131
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2296
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1779
tm-threads.h
TmThreadsSealThreads
void TmThreadsSealThreads(void)
Definition: tm-threads.c:2151
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:85
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
TmThreadLibSpawn
TmEcode TmThreadLibSpawn(ThreadVars *tv)
Spawns a "fake" lib thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1755
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:135
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1702
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1123
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check, const uint8_t module_flags)
Disable all packet threads.
Definition: tm-threads.c:1536
ThreadVars_::iface_name
char * iface_name
Definition: threadvars.h:135
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
threading_set_cpu_affinity
bool threading_set_cpu_affinity
Definition: runmodes.c:62
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:865
SCTIME_CMP_NEQ
#define SCTIME_CMP_NEQ(a, b)
Definition: util-time.h:107
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:70
Thread_::SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(SCTime_t, pktts)
TmThreadContinueThreads
void TmThreadContinueThreads(void)
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1968
RECEIVE_CPU_SET
@ RECEIVE_CPU_SET
Definition: util-affinity.h:57
CLS
#define CLS
Definition: suricata-common.h:77
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1066
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:380
AffinityGetYamlPath
char * AffinityGetYamlPath(ThreadsAffinityType *taf)
Get the YAML path for the given affinity type. The path is built using the parent name (if available)...
Definition: util-affinity.c:429
CaptureStatsSetup
void CaptureStatsSetup(ThreadVars *tv)
Definition: decode.c:1113
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:104
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:48
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:282
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
PKT_SRC_SHUTDOWN_FLUSH
@ PKT_SRC_SHUTDOWN_FLUSH
Definition: decode.h:64
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:103
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1820
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:175
thread-callbacks.h
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:56
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:45
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:83
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:62
Packet_::flags
uint32_t flags
Definition: decode.h:560
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:80
threads.h
UtilAffinityGetAffinedCPUNum
uint16_t UtilAffinityGetAffinedCPUNum(ThreadsAffinityType *taf)
Return the total number of CPUs in a given affinity.
Definition: util-affinity.c:1043
Threads
Threads
Definition: tm-threads.c:2145
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:354
TmThreadsUnsealThreads
void TmThreadsUnsealThreads(void)
Definition: tm-threads.c:2159
ThreadVars_::t
pthread_t t
Definition: threadvars.h:59
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:305
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:71
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2070
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:103
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:66
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:58
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:116
MIN
#define MIN(x, y)
Definition: suricata-common.h:416
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:84
SCSpinLock
#define SCSpinLock
Definition: threads-debug.h:236
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:73
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1384
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:388
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:60
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:122
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:776
SCTIME_CMP_EQ
#define SCTIME_CMP_EQ(a, b)
Definition: util-time.h:108
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:89
m
SCMutex m
Definition: flow-hash.h:6
p
Packet * p
Definition: fuzz_iprep.c:21
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:44
TmModule_::ThreadBusy
bool(* ThreadBusy)(ThreadVars *tv, void *thread_data)
Definition: tm-modules.h:67
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:142
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:305
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:82
rww_lock_contention
thread_local uint64_t rww_lock_contention
PacketQueueNoLock_
simple fifo queue for packets
Definition: packet-queue.h:34
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:173
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2187
Thread_::in_use
int in_use
Definition: tm-threads.c:2132
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:38
TmThreadsGetThreadTime
SCTime_t TmThreadsGetThreadTime(const int idx)
Definition: tm-threads.c:2339
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:58
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:235
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:81
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2406
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:80
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2347
ThreadVarsAlloc
ThreadVars * ThreadVarsAlloc(void)
Allocate a new ThreadVars structure.
Definition: threadvars.c:28
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmThreadsProcessDecodePseudoPackets
TmEcode TmThreadsProcessDecodePseudoPackets(ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
Definition: tm-threads.c:116
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:53
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:46
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:46
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1359
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:111
util-signal.h
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:384
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:78
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:787
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1592
capture-hooks.h
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2129
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1832
RunmodeIsWorkers
bool RunmodeIsWorkers(void)
Definition: runmodes.c:204
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:57
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:2007
RunmodeIsAutofp
bool RunmodeIsAutofp(void)
Definition: runmodes.c:209
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:61
tv
ThreadVars * tv
Definition: tm-threads.c:2140
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:82
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:95
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:208
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:58
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:120
threading_set_stack_size
uint64_t threading_set_stack_size
Definition: runmodes.c:63
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1306
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
Definition: tm-threads.c:2275
SCEnter
#define SCEnter(...)
Definition: util-debug.h:284
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:79
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:69
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:56
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1677
THV_KILL
#define THV_KILL
Definition: threadvars.h:40
THV_REQ_FLOW_LOOP
#define THV_REQ_FLOW_LOOP
Definition: threadvars.h:47
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:940
FlowQueueNew
FlowQueue * FlowQueueNew(void)
Definition: flow-queue.c:35
SCSpinUnlock
#define SCSpinUnlock
Definition: threads-debug.h:238
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:876
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2241
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2191
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:90
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:262
thread-storage.h
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
ThreadsAffinityType_::nocpu_warned
bool nocpu_warned
Definition: util-affinity.h:91
__attribute__
struct Thread_ __attribute__((aligned(CLS)))
Definition: tm-threads.c:2139
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
ThreadVars_::id
int id
Definition: threadvars.h:86
ThreadVars_::type
uint8_t type
Definition: threadvars.h:71
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:325
SCTIME_TO_TIMEVAL
#define SCTIME_TO_TIMEVAL(tv, t)
Definition: util-time.h:97
TmThreadTimeoutLoop
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
Definition: tm-threads.c:168
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:88
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:87
SCReturn
#define SCReturn
Definition: util-debug.h:286
stream.h
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:377
Thread_::sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2136
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1623
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:139
source-pcap-file-helper.h
Packet_
Definition: decode.h:514
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:129
CaptureHooksOnPseudoPacketCreated
void CaptureHooksOnPseudoPacketCreated(Packet *p)
Definition: capture-hooks.c:60
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:69
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1593
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:79
TmSlot_
Definition: tm-threads.h:53
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:89
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:68
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:80
name
const char * name
Definition: tm-threads.c:2141
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
runmodes.h
StatsSetupPrivate
int StatsSetupPrivate(StatsThreadContext *stats, const char *thread_name)
Definition: counters.c:1313
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1095
AffinityGetNextCPU
uint16_t AffinityGetNextCPU(ThreadVars *tv, ThreadsAffinityType *taf)
Definition: util-affinity.c:1006
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:88
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:89
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
TmModule_
Definition: tm-modules.h:47
StatsThreadInit
void StatsThreadInit(StatsThreadContext *stats)
Definition: counters.c:1333
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:256
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
Definition: tm-threads.c:2320
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:115
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
SCTIME_CMP_LT
#define SCTIME_CMP_LT(a, b)
Definition: util-time.h:105
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:39
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:37
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:660
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:379
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1594
Thread_::type
int type
Definition: tm-threads.c:2131
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:140
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:89
Packet_::flow
struct Flow_ * flow
Definition: decode.h:562
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1182
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:74
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:54
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:833
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:35
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:38
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:105
TmThreadsWaitForUnpause
bool TmThreadsWaitForUnpause(ThreadVars *tv)
Wait for a thread to become unpaused.
Definition: tm-threads.c:365
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:41
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:132
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:100
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:88
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:849
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:241
StatsSyncCounters
void StatsSyncCounters(StatsThreadContext *stats)
Definition: counters.c:477
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:72
sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2147
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:61
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:51
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:77
FatalError
#define FatalError(...)
Definition: util-debug.h:517
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax(void)
Definition: tm-threads.c:2388
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:66
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1156
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:143
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:52
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:216
threadvars.h
util-validate.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:264
SCSpinInit
#define SCSpinInit
Definition: threads-debug.h:239
SCThreadRunInitCallbacks
void SCThreadRunInitCallbacks(ThreadVars *tv)
Definition: thread-callbacks.c:48
ThreadsAffinityType_
Definition: util-affinity.h:72
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:37
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:646
TmSlot_::tm_flags
uint8_t tm_flags
Definition: tm-threads.h:67
Thread_::name
const char * name
Definition: tm-threads.c:2130
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:131
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:50
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:90
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:274
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:74
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:69
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:73
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:317
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:83
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:111
SCSpinlock
#define SCSpinlock
Definition: threads-debug.h:235
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:376
ThreadVarsFree
void ThreadVarsFree(ThreadVars *tv)
Free a ThreadVars structure.
Definition: threadvars.c:42
suricata.h
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:503
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:141
Tmq_
Definition: tm-queues.h:29
Thread_::spin
SCSpinlock spin
Definition: tm-threads.c:2138
TmThreadWaitOnThreadRunning
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
Definition: tm-threads.c:1899
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2167
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1984
type
int type
Definition: tm-threads.c:2142
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:128
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:82
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:84
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
FindAffinityByInterface
ThreadsAffinityType * FindAffinityByInterface(ThreadsAffinityType *parent, const char *interface_name)
Definition: util-affinity.c:113
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:108
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:265
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:95
SCTIME_INITIALIZER
#define SCTIME_INITIALIZER
Definition: util-time.h:51
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:250
SCTIME_ADD_SECS
#define SCTIME_ADD_SECS(ts, s)
Definition: util-time.h:64
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:42
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2128
ThreadVars_::stats
StatsThreadContext stats
Definition: threadvars.h:121
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:288
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:121
StatsThreadCleanup
void StatsThreadCleanup(StatsThreadContext *stats)
Definition: counters.c:1429
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:90
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:63
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:299
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:109
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:91
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:80
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:949
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:359
Tmqh_
Definition: tm-queuehandlers.h:36
SCThreadFreeStorage
void SCThreadFreeStorage(ThreadVars *tv)
Definition: thread-storage.c:45
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:176
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:66
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:350