suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2024 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "thread-callbacks.h"
33 #include "threadvars.h"
34 #include "thread-storage.h"
35 #include "tm-queues.h"
36 #include "tm-queuehandlers.h"
37 #include "tm-threads.h"
38 #include "tmqh-packetpool.h"
39 #include "threads.h"
40 #include "util-affinity.h"
41 #include "util-debug.h"
42 #include "util-privs.h"
43 #include "util-cpu.h"
44 #include "util-optimize.h"
45 #include "util-profiling.h"
46 #include "util-signal.h"
47 #include "queue.h"
48 #include "util-validate.h"
49 
50 #ifdef PROFILE_LOCKING
51 thread_local uint64_t mutex_lock_contention;
52 thread_local uint64_t mutex_lock_wait_ticks;
53 thread_local uint64_t mutex_lock_cnt;
54 
55 thread_local uint64_t spin_lock_contention;
56 thread_local uint64_t spin_lock_wait_ticks;
57 thread_local uint64_t spin_lock_cnt;
58 
59 thread_local uint64_t rww_lock_contention;
60 thread_local uint64_t rww_lock_wait_ticks;
61 thread_local uint64_t rww_lock_cnt;
62 
63 thread_local uint64_t rwr_lock_contention;
64 thread_local uint64_t rwr_lock_wait_ticks;
65 thread_local uint64_t rwr_lock_cnt;
66 #endif
67 
68 #ifdef OS_FREEBSD
69 #include <sched.h>
70 #include <sys/param.h>
71 #include <sys/resource.h>
72 #include <sys/cpuset.h>
73 #include <sys/thr.h>
74 #define cpu_set_t cpuset_t
75 #endif /* OS_FREEBSD */
76 
77 /* prototypes */
78 static int SetCPUAffinity(uint16_t cpu);
79 static void TmThreadDeinitMC(ThreadVars *tv);
80 
81 /* root of the threadvars list */
82 ThreadVars *tv_root[TVT_MAX] = { NULL };
83 
84 /* lock to protect tv_root */
86 
87 /**
88  * \brief Check if a thread flag is set.
89  *
90  * \retval 1 flag is set.
91  * \retval 0 flag is not set.
92  */
93 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
94 {
95  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
96 }
97 
98 /**
99  * \brief Set a thread flag.
100  */
101 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
102 {
103  SC_ATOMIC_OR(tv->flags, flag);
104 }
105 
106 /**
107  * \brief Unset a thread flag.
108  */
109 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
110 {
111  SC_ATOMIC_AND(tv->flags, ~flag);
112 }
113 
115  ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
116 {
117  while (decode_pq->top != NULL) {
118  Packet *extra_p = PacketDequeueNoLock(decode_pq);
119  if (unlikely(extra_p == NULL))
120  continue;
121  DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
122 
123  if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
125  }
126  }
128 }
129 
130 /**
131  * \brief Separate run function so we can call it recursively.
132  */
134 {
135  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
136  PACKET_PROFILING_TMM_START(p, s->tm_id);
137  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
138  PACKET_PROFILING_TMM_END(p, s->tm_id);
139  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
140 
141  /* handle error */
142  if (unlikely(r == TM_ECODE_FAILED)) {
143  /* Encountered error. Return packets to packetpool and return */
144  TmThreadsSlotProcessPktFail(tv, NULL);
145  return TM_ECODE_FAILED;
146  }
147  if (s->tm_flags & TM_FLAG_DECODE_TM) {
148  if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
149  TM_ECODE_OK) {
150  return TM_ECODE_FAILED;
151  }
152  }
153  }
154 
155  return TM_ECODE_OK;
156 }
157 
158 /** \internal
159  *
160  * \brief Process flow timeout packets
161  *
162  * Process flow timeout pseudo packets. During shutdown this loop
163  * is run until the flow engine kills the thread and the queue is
164  * empty.
165  */
167 {
168  TmSlot *fw_slot = tv->tm_flowworker;
169  int r = TM_ECODE_OK;
170 
171  if (tv->stream_pq == NULL || fw_slot == NULL) {
172  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
173  return r;
174  }
175 
176  SCLogDebug("flow end loop starting");
177  while (1) {
179  uint32_t len = tv->stream_pq->len;
181  if (len > 0) {
182  while (len--) {
186  if (likely(p)) {
187  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
188  r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
189  if (r == TM_ECODE_FAILED) {
190  break;
191  }
192  }
193  }
194  } else {
196  break;
197  }
198  SleepUsec(1);
199  }
200  }
201  SCLogDebug("flow end loop complete");
203 
204  return r;
205 }
206 
207 static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv)
208 {
209  TmSlot *s = tv->tm_slots;
210 
212 
213  if (tv->thread_setup_flags != 0)
215 
217  PacketPoolInit();
218 
219  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
220  if (slot->SlotThreadInit != NULL) {
221  void *slot_data = NULL;
222  TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
223  if (r != TM_ECODE_OK) {
224  if (r == TM_ECODE_DONE) {
225  EngineDone();
227  goto error;
228  } else {
230  goto error;
231  }
232  }
233  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
234  }
235 
236  /* if the flowworker module is the first, get the threads input queue */
237  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
238  tv->stream_pq = tv->inq->pq;
239  tv->tm_flowworker = slot;
240  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
242  if (tv->flow_queue == NULL) {
244  goto error;
245  }
246  /* setup a queue */
247  } else if (slot->tm_id == TMM_FLOWWORKER) {
248  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
249  if (tv->stream_pq_local == NULL)
250  FatalError("failed to alloc PacketQueue");
253  tv->tm_flowworker = slot;
254  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
256  if (tv->flow_queue == NULL) {
258  goto error;
259  }
260  }
261  }
262 
264 
266 
267  return true;
268 
269 error:
270  return false;
271 }
272 
274 {
275  TmSlot *s = tv->tm_slots;
276  bool rc = true;
277 
279 
281 
282  /* process all pseudo packets the flow timeout may throw at us */
284 
287 
289 
290  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
291  if (slot->SlotThreadExitPrintStats != NULL) {
292  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
293  }
294 
295  if (slot->SlotThreadDeinit != NULL) {
296  TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
297  if (r != TM_ECODE_OK) {
299  rc = false;
300  break;
301  }
302  }
303  }
304 
305  tv->stream_pq = NULL;
307  return rc;
308 }
309 
310 static void *TmThreadsSlotPktAcqLoop(void *td)
311 {
312  ThreadVars *tv = (ThreadVars *)td;
313  TmSlot *s = tv->tm_slots;
314  TmEcode r = TM_ECODE_OK;
315 
316  /* check if we are setup properly */
317  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
318  SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
319  " PktAcqLoop=%p, tmqh_in=%p,"
320  " tmqh_out=%p",
321  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
323  pthread_exit(NULL);
324  return NULL;
325  }
326 
327  if (!TmThreadsSlotPktAcqLoopInit(td)) {
328  goto error;
329  }
330 
331  bool run = TmThreadsWaitForUnpause(tv);
332 
333  while (run) {
334  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
335 
336  if (r == TM_ECODE_FAILED) {
338  run = false;
339  }
341  run = false;
342  }
343  if (r == TM_ECODE_DONE) {
344  run = false;
345  }
346  }
348  goto error;
349  }
350 
351  SCLogDebug("%s ending", tv->name);
352  pthread_exit((void *) 0);
353  return NULL;
354 
355 error:
356  pthread_exit(NULL);
357  return NULL;
358 }
359 
360 /**
361  * Also returns if the kill flag is set.
362  */
364 {
367 
368  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
369  SleepUsec(100);
370 
372  return false;
373  }
374 
376  }
377 
378  return true;
379 }
380 
381 static void *TmThreadsLib(void *td)
382 {
383  ThreadVars *tv = (ThreadVars *)td;
384  TmSlot *s = tv->tm_slots;
385 
386  /* check if we are setup properly */
387  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
388  SCLogError("TmSlot or ThreadVars badly setup: s=%p, tmqh_in=%p,"
389  " tmqh_out=%p",
390  s, tv->tmqh_in, tv->tmqh_out);
392  return NULL;
393  }
394 
395  if (!TmThreadsSlotPktAcqLoopInit(tv)) {
396  goto error;
397  }
398 
399  if (!TmThreadsWaitForUnpause(tv)) {
400  goto error;
401  }
402 
403  return NULL;
404 
405 error:
406  tv->stream_pq = NULL;
407  return (void *)-1;
408 }
409 
410 static void *TmThreadsSlotVar(void *td)
411 {
412  ThreadVars *tv = (ThreadVars *)td;
413  TmSlot *s = (TmSlot *)tv->tm_slots;
414  Packet *p = NULL;
415  TmEcode r = TM_ECODE_OK;
416 
418  PacketPoolInit();//Empty();
419 
421 
422  if (tv->thread_setup_flags != 0)
424 
425  /* Drop the capabilities for this thread */
426  SCDropCaps(tv);
427 
428  /* check if we are setup properly */
429  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
431  pthread_exit(NULL);
432  return NULL;
433  }
434 
435  for (; s != NULL; s = s->slot_next) {
436  if (s->SlotThreadInit != NULL) {
437  void *slot_data = NULL;
438  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
439  if (r != TM_ECODE_OK) {
441  goto error;
442  }
443  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
444  }
445 
446  /* special case: we need to access the stream queue
447  * from the flow timeout code */
448 
449  /* if the flowworker module is the first, get the threads input queue */
450  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
451  tv->stream_pq = tv->inq->pq;
452  tv->tm_flowworker = s;
453  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
455  if (tv->flow_queue == NULL) {
457  pthread_exit(NULL);
458  return NULL;
459  }
460  /* setup a queue */
461  } else if (s->tm_id == TMM_FLOWWORKER) {
462  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
463  if (tv->stream_pq_local == NULL)
464  FatalError("failed to alloc PacketQueue");
467  tv->tm_flowworker = s;
468  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
470  if (tv->flow_queue == NULL) {
472  pthread_exit(NULL);
473  return NULL;
474  }
475  }
476  }
477 
479 
480  // Each 'worker' thread uses this func to process/decode the packet read.
481  // Each decode method is different to receive methods in that they do not
482  // enter infinite loops. They use this as the core loop. As a result, at this
483  // point the worker threads can be considered both initialized and running.
485  bool run = TmThreadsWaitForUnpause(tv);
486 
487  s = (TmSlot *)tv->tm_slots;
488 
489  while (run) {
490  /* input a packet */
491  p = tv->tmqh_in(tv);
492 
493  /* if we didn't get a packet see if we need to do some housekeeping */
494  if (unlikely(p == NULL)) {
495  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
497  if (p != NULL) {
498  p->flags |= PKT_PSEUDO_STREAM_END;
500  }
501  }
502  }
503 
504  if (p != NULL) {
505  /* run the thread module(s) */
506  r = TmThreadsSlotVarRun(tv, p, s);
507  if (r == TM_ECODE_FAILED) {
510  break;
511  }
512 
513  /* output the packet */
514  tv->tmqh_out(tv, p);
515 
516  /* now handle the stream pq packets */
517  TmThreadsHandleInjectedPackets(tv);
518  }
519 
521  run = false;
522  }
523  }
525  goto error;
526  }
528 
529  pthread_exit(NULL);
530  return NULL;
531 
532 error:
533  tv->stream_pq = NULL;
534  pthread_exit(NULL);
535  return NULL;
536 }
537 
538 static void *TmThreadsManagement(void *td)
539 {
540  ThreadVars *tv = (ThreadVars *)td;
541  TmSlot *s = (TmSlot *)tv->tm_slots;
542  TmEcode r = TM_ECODE_OK;
543 
544  BUG_ON(s == NULL);
545 
547 
548  if (tv->thread_setup_flags != 0)
550 
551  /* Drop the capabilities for this thread */
552  SCDropCaps(tv);
553 
554  SCLogDebug("%s starting", tv->name);
555 
556  if (s->SlotThreadInit != NULL) {
557  void *slot_data = NULL;
558  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
559  if (r != TM_ECODE_OK) {
561  pthread_exit(NULL);
562  return NULL;
563  }
564  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
565  }
566 
568 
570 
571  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
572  /* handle error */
573  if (r == TM_ECODE_FAILED) {
575  }
576 
579  }
580 
583 
584  if (s->SlotThreadExitPrintStats != NULL) {
585  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
586  }
587 
588  if (s->SlotThreadDeinit != NULL) {
589  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
590  if (r != TM_ECODE_OK) {
592  pthread_exit(NULL);
593  return NULL;
594  }
595  }
596 
598  pthread_exit((void *) 0);
599  return NULL;
600 }
601 
602 /**
603  * \brief We set the slot functions.
604  *
605  * \param tv Pointer to the TV to set the slot function for.
606  * \param name Name of the slot variant.
607  * \param fn_p Pointer to a custom slot function. Used only if slot variant
608  * "name" is "custom".
609  *
610  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
611  */
612 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
613 {
614  if (name == NULL) {
615  if (fn_p == NULL) {
616  printf("Both slot name and function pointer can't be NULL inside "
617  "TmThreadSetSlots\n");
618  goto error;
619  } else {
620  name = "custom";
621  }
622  }
623 
624  if (strcmp(name, "varslot") == 0) {
625  tv->tm_func = TmThreadsSlotVar;
626  } else if (strcmp(name, "pktacqloop") == 0) {
627  tv->tm_func = TmThreadsSlotPktAcqLoop;
628  } else if (strcmp(name, "management") == 0) {
629  tv->tm_func = TmThreadsManagement;
630  } else if (strcmp(name, "command") == 0) {
631  tv->tm_func = TmThreadsManagement;
632  } else if (strcmp(name, "lib") == 0) {
633  tv->tm_func = TmThreadsLib;
634  } else if (strcmp(name, "custom") == 0) {
635  if (fn_p == NULL)
636  goto error;
637  tv->tm_func = fn_p;
638  } else {
639  printf("Error: Slot \"%s\" not supported\n", name);
640  goto error;
641  }
642 
643  return TM_ECODE_OK;
644 
645 error:
646  return TM_ECODE_FAILED;
647 }
648 
649 /**
650  * \brief Appends a new entry to the slots.
651  *
652  * \param tv TV the slot is attached to.
653  * \param tm TM to append.
654  * \param data Data to be passed on to the slot init function.
655  *
656  * \retval The allocated TmSlot or NULL if there is an error
657  */
658 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
659 {
660  TmSlot *slot = SCCalloc(1, sizeof(TmSlot));
661  if (unlikely(slot == NULL))
662  return;
663  SC_ATOMIC_INITPTR(slot->slot_data);
664  slot->SlotThreadInit = tm->ThreadInit;
665  slot->slot_initdata = data;
666  if (tm->Func) {
667  slot->SlotFunc = tm->Func;
668  } else if (tm->PktAcqLoop) {
669  slot->PktAcqLoop = tm->PktAcqLoop;
670  if (tm->PktAcqBreakLoop) {
671  tv->break_loop = true;
672  }
673  } else if (tm->Management) {
674  slot->Management = tm->Management;
675  }
677  slot->SlotThreadDeinit = tm->ThreadDeinit;
678  /* we don't have to check for the return value "-1". We wouldn't have
679  * received a TM as arg, if it didn't exist */
680  slot->tm_id = TmModuleGetIDForTM(tm);
681  slot->tm_flags |= tm->flags;
682 
683  tv->tmm_flags |= tm->flags;
684  tv->cap_flags |= tm->cap_flags;
685 
686  if (tv->tm_slots == NULL) {
687  tv->tm_slots = slot;
688  } else {
689  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
690 
691  /* get the last slot */
692  for ( ; a != NULL; a = a->slot_next) {
693  b = a;
694  }
695  /* append the new slot */
696  if (b != NULL) {
697  b->slot_next = slot;
698  }
699  }
700 }
701 
702 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
703 static int SetCPUAffinitySet(cpu_set_t *cs)
704 {
705 #if defined OS_FREEBSD
706  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
707  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
708 #elif OS_DARWIN
709  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
710  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
711 #else
712  pid_t tid = (pid_t)syscall(SYS_gettid);
713  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
714 #endif /* OS_FREEBSD */
715 
716  if (r != 0) {
717  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
718  strerror(errno));
719  return -1;
720  }
721 
722  return 0;
723 }
724 #endif
725 
726 
727 /**
728  * \brief Set the thread affinity on the calling thread.
729  *
730  * \param cpuid Id of the core/cpu to setup the affinity.
731  *
732  * \retval 0 If all goes well; -1 if something is wrong.
733  */
734 static int SetCPUAffinity(uint16_t cpuid)
735 {
736 #if defined __OpenBSD__ || defined sun
737  return 0;
738 #else
739  int cpu = (int)cpuid;
740 
741 #if defined OS_WIN32 || defined __CYGWIN__
742  DWORD cs = 1 << cpu;
743 
744  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
745  if (r != 0) {
746  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
747  strerror(errno));
748  return -1;
749  }
750  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
751  SCGetThreadIdLong(), cpu);
752 
753  return 0;
754 
755 #else
756  cpu_set_t cs;
757  memset(&cs, 0, sizeof(cs));
758 
759  CPU_ZERO(&cs);
760  CPU_SET(cpu, &cs);
761  return SetCPUAffinitySet(&cs);
762 #endif /* windows */
763 #endif /* not supported */
764 }
765 
766 
767 /**
768  * \brief Set the thread options (thread priority).
769  *
770  * \param tv Pointer to the ThreadVars to setup the thread priority.
771  *
772  * \retval TM_ECODE_OK.
773  */
775 {
777  tv->thread_priority = prio;
778 
779  return TM_ECODE_OK;
780 }
781 
782 /**
783  * \brief Adjusting nice value for threads.
784  */
786 {
787  SCEnter();
788 #ifndef __CYGWIN__
789 #ifdef OS_WIN32
790  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
791  SCLogError("Error setting priority for "
792  "thread %s: %s",
793  tv->name, strerror(errno));
794  } else {
795  SCLogDebug("Priority set to %"PRId32" for thread %s",
797  }
798 #else
799  int ret = nice(tv->thread_priority);
800  if (ret == -1) {
801  SCLogError("Error setting nice value %d "
802  "for thread %s: %s",
803  tv->thread_priority, tv->name, strerror(errno));
804  } else {
805  SCLogDebug("Nice value set to %"PRId32" for thread %s",
807  }
808 #endif /* OS_WIN32 */
809 #endif
810  SCReturn;
811 }
812 
813 
814 /**
815  * \brief Set the thread options (cpu affinity).
816  *
817  * \param tv pointer to the ThreadVars to setup the affinity.
818  * \param cpu cpu on which affinity is set.
819  *
820  * \retval TM_ECODE_OK
821  */
823 {
825  tv->cpu_affinity = cpu;
826 
827  return TM_ECODE_OK;
828 }
829 
830 
832 {
834  return TM_ECODE_OK;
835 
836  if (type > MAX_CPU_SET) {
837  SCLogError("invalid cpu type family");
838  return TM_ECODE_FAILED;
839  }
840 
842  tv->cpu_affinity = type;
843 
844  return TM_ECODE_OK;
845 }
846 
848 {
849  if (type >= MAX_CPU_SET) {
850  SCLogError("invalid cpu type family");
851  return 0;
852  }
853 
855 }
856 
857 /**
858  * \brief Set the thread options (cpu affinitythread).
859  * Priority should be already set by pthread_create.
860  *
861  * \param tv pointer to the ThreadVars of the calling thread.
862  */
864 {
866  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
867  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
869  SetCPUAffinity(tv->cpu_affinity);
870  }
871 
872 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
877  bool use_iface_affinity = RunmodeIsAutofp() && tv->cpu_affinity == RECEIVE_CPU_SET &&
878  FindAffinityByInterface(taf, tv->iface_name) != NULL;
879  use_iface_affinity |= RunmodeIsWorkers() && tv->cpu_affinity == WORKER_CPU_SET &&
880  FindAffinityByInterface(taf, tv->iface_name) != NULL;
881 
882  if (use_iface_affinity) {
884  }
885 
886  if (UtilAffinityGetAffinedCPUNum(taf) == 0) {
887  if (!taf->nocpu_warned) {
888  SCLogWarning("No CPU affinity set for %s", AffinityGetYamlPath(taf));
889  taf->nocpu_warned = true;
890  }
891  }
892 
893  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
894  uint16_t cpu = AffinityGetNextCPU(tv, taf);
895  SetCPUAffinity(cpu);
896  /* If CPU is in a set overwrite the default thread prio */
897  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
899  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
901  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
903  } else {
904  tv->thread_priority = taf->prio;
905  }
906  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
907  "%d, thread id %lu", tv->thread_priority,
908  tv->name, cpu, SCGetThreadIdLong());
909  } else {
910  SetCPUAffinitySet(&taf->cpu_set);
911  tv->thread_priority = taf->prio;
912  SCLogPerf("Setting prio %d for thread \"%s\", "
913  "thread id %lu", tv->thread_priority,
915  }
917  }
918 #endif
919 
920  return TM_ECODE_OK;
921 }
922 
923 /**
924  * \brief Creates and returns the TV instance for a new thread.
925  *
926  * \param name Name of this TV instance
927  * \param inq_name Incoming queue name
928  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
929  * \param outq_name Outgoing queue name
930  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
931  * \param slots String representation for the slot function to be used
932  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
933  * \param mucond Flag to indicate whether to initialize the condition
934  * and the mutex variables for this newly created TV.
935  *
936  * \retval the newly created TV instance, or NULL on error
937  */
938 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
939  const char *outq_name, const char *outqh_name, const char *slots,
940  void * (*fn_p)(void *), int mucond)
941 {
942  ThreadVars *tv = NULL;
943  Tmq *tmq = NULL;
944  Tmqh *tmqh = NULL;
945 
946  SCLogDebug("creating thread \"%s\"...", name);
947 
948  /* XXX create separate function for this: allocate a thread container */
949  tv = SCCalloc(1, sizeof(ThreadVars) + ThreadStorageSize());
950  if (unlikely(tv == NULL))
951  goto error;
952 
953  SC_ATOMIC_INIT(tv->flags);
954  SCMutexInit(&tv->perf_public_ctx.m, NULL);
955 
956  strlcpy(tv->name, name, sizeof(tv->name));
957 
958  /* default state for every newly created thread */
960 
961  /* set the incoming queue */
962  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
963  SCLogDebug("inq_name \"%s\"", inq_name);
964 
965  tmq = TmqGetQueueByName(inq_name);
966  if (tmq == NULL) {
967  tmq = TmqCreateQueue(inq_name);
968  if (tmq == NULL)
969  goto error;
970  }
971  SCLogDebug("tmq %p", tmq);
972 
973  tv->inq = tmq;
974  tv->inq->reader_cnt++;
975  SCLogDebug("tv->inq %p", tv->inq);
976  }
977  if (inqh_name != NULL) {
978  SCLogDebug("inqh_name \"%s\"", inqh_name);
979 
980  int id = TmqhNameToID(inqh_name);
981  if (id <= 0) {
982  goto error;
983  }
984  tmqh = TmqhGetQueueHandlerByName(inqh_name);
985  if (tmqh == NULL)
986  goto error;
987 
988  tv->tmqh_in = tmqh->InHandler;
989  tv->inq_id = (uint8_t)id;
990  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
991  }
992 
993  /* set the outgoing queue */
994  if (outqh_name != NULL) {
995  SCLogDebug("outqh_name \"%s\"", outqh_name);
996 
997  int id = TmqhNameToID(outqh_name);
998  if (id <= 0) {
999  goto error;
1000  }
1001 
1002  tmqh = TmqhGetQueueHandlerByName(outqh_name);
1003  if (tmqh == NULL)
1004  goto error;
1005 
1006  tv->tmqh_out = tmqh->OutHandler;
1007  tv->outq_id = (uint8_t)id;
1008 
1009  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1010  SCLogDebug("outq_name \"%s\"", outq_name);
1011 
1012  if (tmqh->OutHandlerCtxSetup != NULL) {
1013  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1014  if (tv->outctx == NULL)
1015  goto error;
1016  tv->outq = NULL;
1017  } else {
1018  tmq = TmqGetQueueByName(outq_name);
1019  if (tmq == NULL) {
1020  tmq = TmqCreateQueue(outq_name);
1021  if (tmq == NULL)
1022  goto error;
1023  }
1024  SCLogDebug("tmq %p", tmq);
1025 
1026  tv->outq = tmq;
1027  tv->outctx = NULL;
1028  tv->outq->writer_cnt++;
1029  }
1030  }
1031  }
1032 
1033  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1034  goto error;
1035  }
1036 
1037  if (mucond != 0)
1038  TmThreadInitMC(tv);
1039 
1041 
1042  return tv;
1043 
1044 error:
1045  SCLogError("failed to setup a thread");
1046 
1047  if (tv != NULL)
1048  SCFree(tv);
1049  return NULL;
1050 }
1051 
1052 /**
1053  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1054  * This function doesn't support custom slots, and hence shouldn't be
1055  * supplied \"custom\" as its slot type. All PPT threads are created
1056  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1057  * conditional variables are not used to kill the thread.
1058  *
1059  * \param name Name of this TV instance
1060  * \param inq_name Incoming queue name
1061  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1062  * \param outq_name Outgoing queue name
1063  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1064  * \param slots String representation for the slot function to be used
1065  *
1066  * \retval the newly created TV instance, or NULL on error
1067  */
1068 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1069  const char *inqh_name, const char *outq_name,
1070  const char *outqh_name, const char *slots)
1071 {
1072  ThreadVars *tv = NULL;
1073 
1074  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1075  slots, NULL, 0);
1076 
1077  if (tv != NULL) {
1078  tv->type = TVT_PPT;
1080  }
1081 
1082  return tv;
1083 }
1084 
1085 /**
1086  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1087  * This function supports only custom slot functions and hence a
1088  * function pointer should be sent as an argument.
1089  *
1090  * \param name Name of this TV instance
1091  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1092  * \param mucond Flag to indicate whether to initialize the condition
1093  * and the mutex variables for this newly created TV.
1094  *
1095  * \retval the newly created TV instance, or NULL on error
1096  */
1097 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1098  int mucond)
1099 {
1100  ThreadVars *tv = NULL;
1101 
1102  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1103 
1104  if (tv != NULL) {
1105  tv->type = TVT_MGMT;
1108  }
1109 
1110  return tv;
1111 }
1112 
1113 /**
1114  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1115  * This function supports only custom slot functions and hence a
1116  * function pointer should be sent as an argument.
1117  *
1118  * \param name Name of this TV instance
1119  * \param module Name of TmModule with MANAGEMENT flag set.
1120  * \param mucond Flag to indicate whether to initialize the condition
1121  * and the mutex variables for this newly created TV.
1122  *
1123  * \retval the newly created TV instance, or NULL on error
1124  */
1125 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1126  int mucond)
1127 {
1128  ThreadVars *tv = NULL;
1129 
1130  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1131 
1132  if (tv != NULL) {
1133  tv->type = TVT_MGMT;
1136 
1137  TmModule *m = TmModuleGetByName(module);
1138  if (m) {
1139  TmSlotSetFuncAppend(tv, m, NULL);
1140  }
1141  }
1142 
1143  return tv;
1144 }
1145 
1146 /**
1147  * \brief Creates and returns the TV instance for a Command thread (CMD).
1148  * This function supports only custom slot functions and hence a
1149  * function pointer should be sent as an argument.
1150  *
1151  * \param name Name of this TV instance
1152  * \param module Name of TmModule with COMMAND flag set.
1153  * \param mucond Flag to indicate whether to initialize the condition
1154  * and the mutex variables for this newly created TV.
1155  *
1156  * \retval the newly created TV instance, or NULL on error
1157  */
1158 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1159  int mucond)
1160 {
1161  ThreadVars *tv = NULL;
1162 
1163  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1164 
1165  if (tv != NULL) {
1166  tv->type = TVT_CMD;
1169 
1170  TmModule *m = TmModuleGetByName(module);
1171  if (m) {
1172  TmSlotSetFuncAppend(tv, m, NULL);
1173  }
1174  }
1175 
1176  return tv;
1177 }
1178 
1179 /**
1180  * \brief Appends this TV to tv_root based on its type
1181  *
1182  * \param type holds the type this TV belongs to.
1183  */
1185 {
1187 
1188  if (tv_root[type] == NULL) {
1189  tv_root[type] = tv;
1190  tv->next = NULL;
1191 
1193 
1194  return;
1195  }
1196 
1197  ThreadVars *t = tv_root[type];
1198 
1199  while (t) {
1200  if (t->next == NULL) {
1201  t->next = tv;
1202  tv->next = NULL;
1203  break;
1204  }
1205 
1206  t = t->next;
1207  }
1208 
1210 }
1211 
1212 static bool ThreadStillHasPackets(ThreadVars *tv)
1213 {
1214  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1215  /* we wait till we dry out all the inq packets, before we
1216  * kill this thread. Do note that you should have disabled
1217  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1218  PacketQueue *q = tv->inq->pq;
1219  SCMutexLock(&q->mutex_q);
1220  uint32_t len = q->len;
1221  SCMutexUnlock(&q->mutex_q);
1222  if (len != 0) {
1223  return true;
1224  }
1225  }
1226 
1227  if (tv->stream_pq != NULL) {
1229  uint32_t len = tv->stream_pq->len;
1231 
1232  if (len != 0) {
1233  return true;
1234  }
1235  }
1236  return false;
1237 }
1238 
1239 /**
1240  * \brief Kill a thread.
1241  *
1242  * \param tv A ThreadVars instance corresponding to the thread that has to be
1243  * killed.
1244  *
1245  * \retval r 1 killed successfully
1246  * 0 not yet ready, needs another look
1247  */
1248 static int TmThreadKillThread(ThreadVars *tv)
1249 {
1250  BUG_ON(tv == NULL);
1251 
1252  /* kill only once :) */
1253  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1254  return 1;
1255  }
1256 
1257  /* set the thread flag informing the thread that it needs to be
1258  * terminated */
1261 
1262  /* to be sure, signal more */
1263  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1264  if (tv->inq_id != TMQH_NOT_SET) {
1266  if (qh != NULL && qh->InShutdownHandler != NULL) {
1267  qh->InShutdownHandler(tv);
1268  }
1269  }
1270  if (tv->inq != NULL) {
1271  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1272  SCMutexLock(&tv->inq->pq->mutex_q);
1273  SCCondSignal(&tv->inq->pq->cond_q);
1274  SCMutexUnlock(&tv->inq->pq->mutex_q);
1275  }
1276  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1277  }
1278 
1279  if (tv->ctrl_cond != NULL ) {
1281  pthread_cond_broadcast(tv->ctrl_cond);
1283  }
1284  return 0;
1285  }
1286 
1287  if (tv->outctx != NULL) {
1288  if (tv->outq_id != TMQH_NOT_SET) {
1290  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1291  qh->OutHandlerCtxFree(tv->outctx);
1292  tv->outctx = NULL;
1293  }
1294  }
1295  }
1296 
1297  /* Join the thread and flag as dead, unless the thread ID is 0 as
1298  * its not a thread created by Suricata. */
1299  if (tv->t) {
1300  pthread_join(tv->t, NULL);
1301  SCLogDebug("thread %s stopped", tv->name);
1302  }
1304  return 1;
1305 }
1306 
1307 static bool ThreadBusy(ThreadVars *tv)
1308 {
1309  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
1310  TmModule *tm = TmModuleGetById(s->tm_id);
1311  if (tm && tm->ThreadBusy != NULL) {
1312  if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
1313  return true;
1314  }
1315  }
1316  return false;
1317 }
1318 
1319 /** \internal
1320  *
1321  * \brief make sure that all packet threads are done processing their
1322  * in-flight packets, including 'injected' flow packets.
1323  */
1324 static void TmThreadDrainPacketThreads(void)
1325 {
1326  ThreadVars *tv = NULL;
1327  struct timeval start_ts;
1328  struct timeval cur_ts;
1329  gettimeofday(&start_ts, NULL);
1330 
1331 again:
1332  gettimeofday(&cur_ts, NULL);
1333  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1334  SCLogWarning("unable to get all packet threads "
1335  "to process their packets in time");
1336  return;
1337  }
1338 
1340 
1341  /* all receive threads are part of packet processing threads */
1342  tv = tv_root[TVT_PPT];
1343  while (tv) {
1344  if (ThreadStillHasPackets(tv)) {
1345  /* we wait till we dry out all the inq packets, before we
1346  * kill this thread. Do note that you should have disabled
1347  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1349 
1350  /* sleep outside lock */
1351  SleepMsec(1);
1352  goto again;
1353  }
1354  if (ThreadBusy(tv)) {
1356 
1357  Packet *p = PacketGetFromAlloc();
1358  if (p != NULL) {
1361  PacketQueue *q = tv->stream_pq;
1362  SCMutexLock(&q->mutex_q);
1363  PacketEnqueue(q, p);
1364  SCCondSignal(&q->cond_q);
1365  SCMutexUnlock(&q->mutex_q);
1366  }
1367 
1368  /* don't sleep while holding a lock */
1369  SleepMsec(1);
1370  goto again;
1371  }
1372  tv = tv->next;
1373  }
1374 
1376 }
1377 
1378 /**
1379  * \brief Disable all threads having the specified TMs.
1380  *
1381  * Breaks out of the packet acquisition loop, and bumps
1382  * into the 'flow loop', where it will process packets
1383  * from the flow engine's shutdown handling.
1384  */
1386 {
1387  ThreadVars *tv = NULL;
1388  struct timeval start_ts;
1389  struct timeval cur_ts;
1390  gettimeofday(&start_ts, NULL);
1391 
1392 again:
1393  gettimeofday(&cur_ts, NULL);
1394  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1395  FatalError("Engine unable to disable detect "
1396  "thread - \"%s\". Killing engine",
1397  tv->name);
1398  }
1399 
1401 
1402  /* all receive threads are part of packet processing threads */
1403  tv = tv_root[TVT_PPT];
1404 
1405  /* we do have to keep in mind that TVs are arranged in the order
1406  * right from receive to log. The moment we fail to find a
1407  * receive TM amongst the slots in a tv, it indicates we are done
1408  * with all receive threads */
1409  while (tv) {
1410  int disable = 0;
1411  TmModule *tm = NULL;
1412  /* obtain the slots for this TV */
1413  TmSlot *slots = tv->tm_slots;
1414  while (slots != NULL) {
1415  tm = TmModuleGetById(slots->tm_id);
1416 
1417  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1418  disable = 1;
1419  break;
1420  }
1421 
1422  slots = slots->slot_next;
1423  continue;
1424  }
1425 
1426  if (disable) {
1427  if (ThreadStillHasPackets(tv)) {
1428  /* we wait till we dry out all the inq packets, before we
1429  * kill this thread. Do note that you should have disabled
1430  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1432  /* don't sleep while holding a lock */
1433  SleepMsec(1);
1434  goto again;
1435  }
1436 
1437  if (ThreadBusy(tv)) {
1439 
1440  Packet *p = PacketGetFromAlloc();
1441  if (p != NULL) {
1444  PacketQueue *q = tv->stream_pq;
1445  SCMutexLock(&q->mutex_q);
1446  PacketEnqueue(q, p);
1447  SCCondSignal(&q->cond_q);
1448  SCMutexUnlock(&q->mutex_q);
1449  }
1450 
1451  /* don't sleep while holding a lock */
1452  SleepMsec(1);
1453  goto again;
1454  }
1455 
1456  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1457  if (tm && tm->PktAcqBreakLoop != NULL) {
1458  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1459  }
1461 
1462  if (tv->inq != NULL) {
1463  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1464  SCMutexLock(&tv->inq->pq->mutex_q);
1465  SCCondSignal(&tv->inq->pq->cond_q);
1466  SCMutexUnlock(&tv->inq->pq->mutex_q);
1467  }
1468  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1469  }
1470 
1471  /* wait for it to enter the 'flow loop' stage */
1472  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1474 
1475  SleepMsec(1);
1476  goto again;
1477  }
1478  }
1479 
1480  tv = tv->next;
1481  }
1482 
1484 
1485  /* finally wait for all packet threads to have
1486  * processed all of their 'live' packets so we
1487  * don't process the last live packets together
1488  * with FFR packets */
1489  TmThreadDrainPacketThreads();
1490 }
1491 
1492 #ifdef DEBUG_VALIDATION
1493 static void TmThreadDumpThreads(void);
1494 #endif
1495 
1496 static void TmThreadDebugValidateNoMorePackets(void)
1497 {
1498 #ifdef DEBUG_VALIDATION
1500  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1501  if (ThreadStillHasPackets(tv)) {
1503  TmThreadDumpThreads();
1504  abort();
1505  }
1506  }
1508 #endif
1509 }
1510 
1511 /** \internal
1512  * \brief check if a thread has any of the modules indicated by TM_FLAG_*
1513  * \param tv thread
1514  * \param flags TM_FLAG_*'s
1515  * \retval bool true if at least on of the flags is present */
1516 static inline bool CheckModuleFlags(const ThreadVars *tv, const uint8_t flags)
1517 {
1518  return (tv->tmm_flags & flags) != 0;
1519 }
1520 
1521 /**
1522  * \brief Disable all packet threads
1523  * \param set flag to set
1524  * \param check flag to check
1525  * \param module_flags bitflags of TmModule's to apply the `set` flag to.
1526  *
1527  * Support 2 stages in shutting down the packet threads:
1528  * 1. set THV_REQ_FLOW_LOOP and wait for THV_FLOW_LOOP
1529  * 2. set THV_KILL and wait for THV_RUNNING_DONE
1530  *
1531  * During step 1 the main loop is exited, and the flow loop logic is entered.
1532  * During step 2, the flow loop logic is done and the thread closes.
1533  *
1534  * `module_flags` limits which threads are disabled
1535  */
1537  const uint16_t set, const uint16_t check, const uint8_t module_flags)
1538 {
1539  struct timeval start_ts;
1540  struct timeval cur_ts;
1541 
1542  /* first drain all packet threads of their packets */
1543  TmThreadDrainPacketThreads();
1544 
1545  /* since all the threads possibly able to produce more packets
1546  * are now gone or inactive, we should see no packets anywhere
1547  * anymore. */
1548  TmThreadDebugValidateNoMorePackets();
1549 
1550  gettimeofday(&start_ts, NULL);
1551 again:
1552  gettimeofday(&cur_ts, NULL);
1553  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1554  FatalError("Engine unable to disable packet "
1555  "threads. Killing engine");
1556  }
1557 
1558  /* loop through the packet threads and kill them */
1560  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1561  /* only set flow worker threads to THV_REQ_FLOW_LOOP */
1562  if (!CheckModuleFlags(tv, module_flags)) {
1563  SCLogDebug("%s does not have any of the modules %02x, skip", tv->name, module_flags);
1564  continue;
1565  }
1566  TmThreadsSetFlag(tv, set);
1567 
1568  /* separate worker threads (autofp) will still wait at their
1569  * input queues. So nudge them here so they will observe the
1570  * THV_KILL flag. */
1571  if (tv->inq != NULL) {
1572  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1573  SCMutexLock(&tv->inq->pq->mutex_q);
1574  SCCondSignal(&tv->inq->pq->cond_q);
1575  SCMutexUnlock(&tv->inq->pq->mutex_q);
1576  }
1577  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1578  }
1579 
1580  /* wait for it to reach the expected state */
1581  if (!TmThreadsCheckFlag(tv, check)) {
1583  SCLogDebug("%s did not reach state %u, again", tv->name, check);
1584 
1585  SleepMsec(1);
1586  goto again;
1587  }
1588  }
1590 }
1591 
1592 #define MIN_WAIT_TIME 100
1593 #define MAX_WAIT_TIME 999999
1595 {
1596  ThreadVars *tv = NULL;
1597  unsigned int sleep_usec = MIN_WAIT_TIME;
1598 
1599  BUG_ON((family < 0) || (family >= TVT_MAX));
1600 
1601 again:
1603  tv = tv_root[family];
1604 
1605  while (tv) {
1606  int r = TmThreadKillThread(tv);
1607  if (r == 0) {
1609  SleepUsec(sleep_usec);
1610  sleep_usec *= 2; /* slowly back off */
1611  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1612  goto again;
1613  }
1614  sleep_usec = MIN_WAIT_TIME; /* reset */
1615 
1616  tv = tv->next;
1617  }
1619 }
1620 #undef MIN_WAIT_TIME
1621 #undef MAX_WAIT_TIME
1622 
1624 {
1625  int i = 0;
1626 
1627  for (i = 0; i < TVT_MAX; i++) {
1629  }
1630 }
1631 
1632 static void TmThreadFree(ThreadVars *tv)
1633 {
1634  TmSlot *s;
1635  TmSlot *ps;
1636  if (tv == NULL)
1637  return;
1638 
1639  SCLogDebug("Freeing thread '%s'.", tv->name);
1640 
1642 
1643  if (tv->flow_queue) {
1644  BUG_ON(tv->flow_queue->qlen != 0);
1645  SCFree(tv->flow_queue);
1646  }
1647 
1649 
1650  TmThreadDeinitMC(tv);
1651 
1652  if (tv->thread_group_name) {
1654  }
1655 
1656  if (tv->printable_name) {
1658  }
1659 
1660  if (tv->iface_name) {
1661  SCFree(tv->iface_name);
1662  }
1663 
1664  if (tv->stream_pq_local) {
1668  }
1669 
1670  s = (TmSlot *)tv->tm_slots;
1671  while (s) {
1672  ps = s;
1673  s = s->slot_next;
1674  SCFree(ps);
1675  }
1676 
1678  SCFree(tv);
1679 }
1680 
1682 {
1683  char *thread_group_name = NULL;
1684 
1685  if (name == NULL)
1686  return;
1687 
1688  if (tv == NULL)
1689  return;
1690 
1691  thread_group_name = SCStrdup(name);
1692  if (unlikely(thread_group_name == NULL)) {
1693  SCLogError("error allocating memory");
1694  return;
1695  }
1696  tv->thread_group_name = thread_group_name;
1697 }
1698 
1700 {
1701  ThreadVars *tv = NULL;
1702  ThreadVars *ptv = NULL;
1703 
1704  if ((family < 0) || (family >= TVT_MAX))
1705  return;
1706 
1708  tv = tv_root[family];
1709 
1710  while (tv) {
1711  ptv = tv;
1712  tv = tv->next;
1713  TmThreadFree(ptv);
1714  }
1715  tv_root[family] = NULL;
1717 }
1718 
1719 /**
1720  * \brief Spawns a thread associated with the ThreadVars instance tv
1721  *
1722  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1723  */
1725 {
1726  pthread_attr_t attr;
1727  if (tv->tm_func == NULL) {
1728  FatalError("No thread function set");
1729  }
1730 
1731  /* Initialize and set thread detached attribute */
1732  pthread_attr_init(&attr);
1733 
1734  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1735 
1736  /* Adjust thread stack size if configured */
1738  SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1739  if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1740  FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
1742  }
1743  }
1744 
1745  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1746  if (rc) {
1747  FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc,
1748  strerror(errno));
1749  }
1750 
1751 #if DEBUG && HAVE_PTHREAD_GETATTR_NP
1753  if (pthread_getattr_np(tv->t, &attr) == 0) {
1754  size_t stack_size;
1755  void *stack_addr;
1756  pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1757  SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1758  } else {
1759  SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1760  "pthread_getattr_np() is %" PRId32,
1761  rc);
1762  }
1763  }
1764 #endif
1765 
1767 
1768  TmThreadAppend(tv, tv->type);
1769  return TM_ECODE_OK;
1770 }
1771 
1772 /**
1773  * \brief Spawns a "fake" lib thread associated with the ThreadVars instance tv
1774  *
1775  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1776  */
1778 {
1779  if (tv->tm_func == NULL) {
1780  printf("ERROR: no thread function set\n");
1781  return TM_ECODE_FAILED;
1782  }
1783 
1784  if (tv->tm_func((void *)tv) == (void *)-1) {
1785  return TM_ECODE_FAILED;
1786  }
1787 
1789 
1790  return TM_ECODE_OK;
1791 }
1792 
1793 /**
1794  * \brief Initializes the mutex and condition variables for this TV
1795  *
1796  * It can be used by a thread to control a wait loop that can also be
1797  * influenced by other threads.
1798  *
1799  * \param tv Pointer to a TV instance
1800  */
1802 {
1803  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1804  FatalError("Fatal error encountered in TmThreadInitMC. "
1805  "Exiting...");
1806  }
1807 
1808  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1809  printf("Error initializing the tv->m mutex\n");
1810  exit(EXIT_FAILURE);
1811  }
1812 
1813  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1814  FatalError("Fatal error encountered in TmThreadInitMC. "
1815  "Exiting...");
1816  }
1817 
1818  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1819  FatalError("Error initializing the tv->cond condition "
1820  "variable");
1821  }
1822 }
1823 
1824 static void TmThreadDeinitMC(ThreadVars *tv)
1825 {
1826  if (tv->ctrl_mutex) {
1828  SCFree(tv->ctrl_mutex);
1829  }
1830  if (tv->ctrl_cond) {
1832  SCFree(tv->ctrl_cond);
1833  }
1834 }
1835 
1836 /**
1837  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1838  * the kill flag has been set or not on the thread.
1839  *
1840  * \param tv Pointer to the TV instance.
1841  */
1843 {
1844  while (!TmThreadsCheckFlag(tv, flags)) {
1845  SleepUsec(100);
1846  }
1847 }
1848 
1849 /**
1850  * \brief Unpauses a thread
1851  *
1852  * \param tv Pointer to a TV instance that has to be unpaused
1853  */
1855 {
1857 }
1858 
1859 static TmEcode WaitOnThreadsRunningByType(const int t)
1860 {
1861  struct timeval start_ts;
1862  struct timeval cur_ts;
1863  uint32_t thread_cnt = 0;
1864 
1865  /* on retries, this will init to the last thread that started up already */
1866  ThreadVars *tv_start = tv_root[t];
1868  for (ThreadVars *tv = tv_start; tv != NULL; tv = tv->next) {
1869  thread_cnt++;
1870  }
1872 
1873  /* give threads a second each to start up, plus a margin of a minute. */
1874  uint32_t time_budget = 60 + thread_cnt;
1875 
1876  gettimeofday(&start_ts, NULL);
1877 again:
1879  ThreadVars *tv = tv_start;
1880  while (tv != NULL) {
1883 
1884  SCLogError("thread \"%s\" failed to "
1885  "start: flags %04x",
1886  tv->name, SC_ATOMIC_GET(tv->flags));
1887  return TM_ECODE_FAILED;
1888  }
1889 
1892 
1893  /* 60 seconds provided for the thread to transition from
1894  * THV_INIT_DONE to THV_RUNNING */
1895  gettimeofday(&cur_ts, NULL);
1896  if (((uint32_t)cur_ts.tv_sec - (uint32_t)start_ts.tv_sec) > time_budget) {
1897  SCLogError("thread \"%s\" failed to "
1898  "start in time: flags %04x. Total threads: %u. Time budget %us",
1899  tv->name, SC_ATOMIC_GET(tv->flags), thread_cnt, time_budget);
1900  return TM_ECODE_FAILED;
1901  }
1902 
1903  /* sleep a little to give the thread some
1904  * time to start running */
1905  SleepUsec(100);
1906  goto again;
1907  }
1908  tv_start = tv;
1909 
1910  tv = tv->next;
1911  }
1913  return TM_ECODE_OK;
1914 }
1915 
1916 /**
1917  * \brief Waits for all threads to be in a running state
1918  *
1919  * \retval TM_ECODE_OK if all are running or error if a thread failed
1920  */
1922 {
1923  uint16_t RX_num = 0;
1924  uint16_t W_num = 0;
1925  uint16_t FM_num = 0;
1926  uint16_t FR_num = 0;
1927  uint16_t TX_num = 0;
1928 
1929  for (int i = 0; i < TVT_MAX; i++) {
1930  if (WaitOnThreadsRunningByType(i) != TM_ECODE_OK)
1931  return TM_ECODE_FAILED;
1932  }
1933 
1935  for (int i = 0; i < TVT_MAX; i++) {
1936  for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
1937  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1938  RX_num++;
1939  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1940  W_num++;
1941  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1942  TX_num++;
1943  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1944  FM_num++;
1945  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1946  FR_num++;
1947  }
1948  }
1950 
1951  /* Construct a welcome string displaying
1952  * initialized thread types and counts */
1953  uint16_t app_len = 32;
1954  uint16_t buf_len = 256;
1955 
1956  char append_str[app_len];
1957  char thread_counts[buf_len];
1958 
1959  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1960  if (RX_num > 0) {
1961  snprintf(append_str, app_len, "RX: %u ", RX_num);
1962  strlcat(thread_counts, append_str, buf_len);
1963  }
1964  if (W_num > 0) {
1965  snprintf(append_str, app_len, "W: %u ", W_num);
1966  strlcat(thread_counts, append_str, buf_len);
1967  }
1968  if (TX_num > 0) {
1969  snprintf(append_str, app_len, "TX: %u ", TX_num);
1970  strlcat(thread_counts, append_str, buf_len);
1971  }
1972  if (FM_num > 0) {
1973  snprintf(append_str, app_len, "FM: %u ", FM_num);
1974  strlcat(thread_counts, append_str, buf_len);
1975  }
1976  if (FR_num > 0) {
1977  snprintf(append_str, app_len, "FR: %u ", FR_num);
1978  strlcat(thread_counts, append_str, buf_len);
1979  }
1980  snprintf(append_str, app_len, " Engine started.");
1981  strlcat(thread_counts, append_str, buf_len);
1982  SCLogNotice("%s", thread_counts);
1983 
1984  return TM_ECODE_OK;
1985 }
1986 
1987 /**
1988  * \brief Unpauses all threads present in tv_root
1989  */
1991 {
1993  for (int i = 0; i < TVT_MAX; i++) {
1994  ThreadVars *tv = tv_root[i];
1995  while (tv != NULL) {
1997  tv = tv->next;
1998  }
1999  }
2001 }
2002 
2003 /**
2004  * \brief Used to check the thread for certain conditions of failure.
2005  */
2007 {
2009  for (int i = 0; i < TVT_MAX; i++) {
2010  ThreadVars *tv = tv_root[i];
2011  while (tv) {
2013  FatalError("thread %s failed", tv->name);
2014  }
2015  tv = tv->next;
2016  }
2017  }
2019 }
2020 
2021 /**
2022  * \brief Used to check if all threads have finished their initialization. On
2023  * finding an un-initialized thread, it waits till that thread completes
2024  * its initialization, before proceeding to the next thread.
2025  *
2026  * \retval TM_ECODE_OK all initialized properly
2027  * \retval TM_ECODE_FAILED failure
2028  */
2030 {
2031  struct timeval start_ts;
2032  struct timeval cur_ts;
2033  gettimeofday(&start_ts, NULL);
2034 
2035 again:
2037  for (int i = 0; i < TVT_MAX; i++) {
2038  ThreadVars *tv = tv_root[i];
2039  while (tv != NULL) {
2042 
2043  SCLogError("thread \"%s\" failed to "
2044  "initialize: flags %04x",
2045  tv->name, SC_ATOMIC_GET(tv->flags));
2046  return TM_ECODE_FAILED;
2047  }
2048 
2049  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
2051 
2052  gettimeofday(&cur_ts, NULL);
2053  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
2054  SCLogError("thread \"%s\" failed to "
2055  "initialize in time: flags %04x",
2056  tv->name, SC_ATOMIC_GET(tv->flags));
2057  return TM_ECODE_FAILED;
2058  }
2059 
2060  /* sleep a little to give the thread some
2061  * time to finish initialization */
2062  SleepUsec(100);
2063  goto again;
2064  }
2065 
2068  SCLogError("thread \"%s\" failed to "
2069  "initialize.",
2070  tv->name);
2071  return TM_ECODE_FAILED;
2072  }
2075  SCLogError("thread \"%s\" closed on "
2076  "initialization.",
2077  tv->name);
2078  return TM_ECODE_FAILED;
2079  }
2080 
2081  tv = tv->next;
2082  }
2083  }
2085 
2086  return TM_ECODE_OK;
2087 }
2088 
2089 /**
2090  * \brief returns a count of all the threads that match the flag
2091  */
2093 {
2094  uint32_t cnt = 0;
2096  for (int i = 0; i < TVT_MAX; i++) {
2097  ThreadVars *tv = tv_root[i];
2098  while (tv != NULL) {
2099  if ((tv->tmm_flags & flags) == flags)
2100  cnt++;
2101 
2102  tv = tv->next;
2103  }
2104  }
2106  return cnt;
2107 }
2108 
2109 #ifdef DEBUG_VALIDATION
2110 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2111 {
2112  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2114  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2115  tv, s, s->tm_id, m->name);
2116  }
2117 }
2118 
2119 static void TmThreadDumpThreads(void)
2120 {
2122  for (int i = 0; i < TVT_MAX; i++) {
2123  ThreadVars *tv = tv_root[i];
2124  while (tv != NULL) {
2125  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2126  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2127  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2128  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2129  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2130  } else if (tv->stream_pq_local != NULL) {
2131  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2132  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2133  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2134  }
2135  }
2136  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2137  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2138  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2139  }
2140  TmThreadDoDumpSlots(tv);
2141  tv = tv->next;
2142  }
2143  }
2146 }
2147 #endif
2148 
2149 /* Aligned to CLS to avoid false sharing between atomic ops. */
2150 typedef struct Thread_ {
2151  ThreadVars *tv; /**< threadvars structure */
2152  const char *name;
2153  int type;
2154  int in_use; /**< bool to indicate this is in use */
2155 
2156  SC_ATOMIC_DECLARE(SCTime_t, pktts); /**< current packet time of this thread
2157  * (offline mode) */
2158  SCTime_t sys_sec_stamp; /**< timestamp in real system
2159  * time when the pktts was last updated. */
2161 } __attribute__((aligned(CLS))) Thread;
2163 typedef struct Threads_ {
2164  Thread *threads;
2165  size_t threads_size;
2166  int threads_cnt;
2168 
2169 static bool thread_store_sealed = false;
2170 static Threads thread_store = { NULL, 0, 0 };
2171 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2172 
2174 {
2175  SCMutexLock(&thread_store_lock);
2176  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2177  thread_store_sealed = true;
2178  SCMutexUnlock(&thread_store_lock);
2179 }
2180 
2182 {
2183  SCMutexLock(&thread_store_lock);
2184  DEBUG_VALIDATE_BUG_ON(!thread_store_sealed);
2185  thread_store_sealed = false;
2186  SCMutexUnlock(&thread_store_lock);
2187 }
2188 
2190 {
2191  SCMutexLock(&thread_store_lock);
2192  for (size_t s = 0; s < thread_store.threads_size; s++) {
2193  Thread *t = &thread_store.threads[s];
2194  if (t == NULL || t->in_use == 0)
2195  continue;
2196 
2197  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2198  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2199  if (t->tv) {
2200  ThreadVars *tv = t->tv;
2201  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2202  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2203  tv, tv->type, tv->name, tv->tmm_flags, flags);
2204  }
2205  }
2206  SCMutexUnlock(&thread_store_lock);
2207 }
2208 
2209 #define STEP 32
2210 /**
2211  * \retval id thread id, or 0 if not found
2212  */
2214 {
2215  SCMutexLock(&thread_store_lock);
2216  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2217  if (thread_store.threads == NULL) {
2218  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2219  BUG_ON(thread_store.threads == NULL);
2220  thread_store.threads_size = STEP;
2221  }
2222 
2223  size_t s;
2224  for (s = 0; s < thread_store.threads_size; s++) {
2225  if (thread_store.threads[s].in_use == 0) {
2226  Thread *t = &thread_store.threads[s];
2227  SCSpinInit(&t->spin, 0);
2228  SCSpinLock(&t->spin);
2229  t->name = tv->name;
2230  t->type = type;
2231  t->tv = tv;
2232  t->in_use = 1;
2233  SCSpinUnlock(&t->spin);
2234 
2235  SCMutexUnlock(&thread_store_lock);
2236  return (int)(s+1);
2237  }
2238  }
2239 
2240  /* if we get here the array is completely filled */
2241  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2242  BUG_ON(newmem == NULL);
2243  thread_store.threads = newmem;
2244  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2245 
2246  Thread *t = &thread_store.threads[thread_store.threads_size];
2247  SCSpinInit(&t->spin, 0);
2248  SCSpinLock(&t->spin);
2249  t->name = tv->name;
2250  t->type = type;
2251  t->tv = tv;
2252  t->in_use = 1;
2253  SCSpinUnlock(&t->spin);
2254 
2255  s = thread_store.threads_size;
2256  thread_store.threads_size += STEP;
2257 
2258  SCMutexUnlock(&thread_store_lock);
2259  return (int)(s+1);
2260 }
2261 #undef STEP
2262 
2263 void TmThreadsUnregisterThread(const int id)
2264 {
2265  SCMutexLock(&thread_store_lock);
2266  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2267  if (id <= 0 || id > (int)thread_store.threads_size) {
2268  SCMutexUnlock(&thread_store_lock);
2269  return;
2270  }
2271 
2272  /* id is one higher than index */
2273  int idx = id - 1;
2274 
2275  /* reset thread_id, which serves as clearing the record */
2276  thread_store.threads[idx].in_use = 0;
2277 
2278  /* check if we have at least one registered thread left */
2279  size_t s;
2280  for (s = 0; s < thread_store.threads_size; s++) {
2281  Thread *t = &thread_store.threads[s];
2282  if (t->in_use == 1) {
2283  goto end;
2284  }
2285  }
2286 
2287  /* if we get here no threads are registered */
2288  SCFree(thread_store.threads);
2289  thread_store.threads = NULL;
2290  thread_store.threads_size = 0;
2291  thread_store.threads_cnt = 0;
2292 
2293 end:
2294  SCMutexUnlock(&thread_store_lock);
2295 }
2296 
2297 void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2298 {
2299  SCTime_t now = SCTimeGetTime();
2300  int idx = id - 1;
2301  Thread *t = &thread_store.threads[idx];
2302  SCSpinLock(&t->spin);
2303  SC_ATOMIC_SET(t->pktts, ts);
2304 
2305 #ifdef DEBUG
2306  if (t->sys_sec_stamp.secs != 0) {
2307  SCTime_t tmpts = SCTIME_ADD_SECS(t->sys_sec_stamp, 3);
2308  if (SCTIME_CMP_LT(tmpts, now)) {
2309  SCLogDebug("%s: thread slept for %u secs", t->name, (uint32_t)(now.secs - tmpts.secs));
2310  }
2311  }
2312 #endif
2313 
2314  t->sys_sec_stamp = now;
2315  SCSpinUnlock(&t->spin);
2316 }
2317 
2319 {
2320  static SCTime_t nullts = SCTIME_INITIALIZER;
2321  bool ready = true;
2322  for (size_t s = 0; s < thread_store.threads_size; s++) {
2323  Thread *t = &thread_store.threads[s];
2324  if (!t->in_use) {
2325  break;
2326  }
2327  SCSpinLock(&t->spin);
2328  if (t->type != TVT_PPT) {
2329  SCSpinUnlock(&t->spin);
2330  continue;
2331  }
2332  if (SCTIME_CMP_EQ(t->sys_sec_stamp, nullts)) {
2333  ready = false;
2334  SCSpinUnlock(&t->spin);
2335  break;
2336  }
2337  SCSpinUnlock(&t->spin);
2338  }
2339  return ready;
2340 }
2341 
2343 {
2344  SCTime_t now = SCTimeGetTime();
2345  for (size_t s = 0; s < thread_store.threads_size; s++) {
2346  Thread *t = &thread_store.threads[s];
2347  if (!t->in_use) {
2348  break;
2349  }
2350  SCSpinLock(&t->spin);
2351  if (t->type != TVT_PPT) {
2352  SCSpinUnlock(&t->spin);
2353  continue;
2354  }
2355  SC_ATOMIC_SET(t->pktts, ts);
2356  t->sys_sec_stamp = now;
2357  SCSpinUnlock(&t->spin);
2358  }
2359 }
2360 
2362 {
2363  BUG_ON(idx == 0);
2364  const int i = idx - 1;
2365  Thread *t = &thread_store.threads[i];
2366  return SC_ATOMIC_GET(t->pktts);
2367 }
2368 
2369 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2370 {
2371  struct timeval local = { 0 };
2372  static SCTime_t nullts = SCTIME_INITIALIZER;
2373  bool set = false;
2374  SCTime_t now = SCTimeGetTime();
2375 
2376  for (size_t s = 0; s < thread_store.threads_size; s++) {
2377  Thread *t = &thread_store.threads[s];
2378  if (t->in_use == 0) {
2379  break;
2380  }
2381  SCSpinLock(&t->spin);
2382  /* only packet threads set timestamps based on packets */
2383  if (t->type != TVT_PPT) {
2384  SCSpinUnlock(&t->spin);
2385  continue;
2386  }
2387  SCTime_t pktts = SC_ATOMIC_GET(t->pktts);
2388  if (SCTIME_CMP_NEQ(pktts, nullts)) {
2389  SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5);
2390  /* ignore sleeping threads */
2391  if (SCTIME_CMP_LT(sys_sec_stamp, now)) {
2392  SCSpinUnlock(&t->spin);
2393  continue;
2394  }
2395  if (!set) {
2396  SCTIME_TO_TIMEVAL(&local, pktts);
2397  set = true;
2398  } else {
2399  if (SCTIME_CMP_LT(pktts, SCTIME_FROM_TIMEVAL(&local))) {
2400  SCTIME_TO_TIMEVAL(&local, pktts);
2401  }
2402  }
2403  }
2404  SCSpinUnlock(&t->spin);
2405  }
2406  *ts = local;
2407  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2408 }
2409 
2411 {
2412  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2413  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2414  /* always create at least one thread */
2415  if (thread_max == 0)
2416  thread_max = ncpus * threading_detect_ratio;
2417  if (thread_max < 1)
2418  thread_max = 1;
2419  if (thread_max > 1024) {
2420  SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2421  thread_max = 1024;
2422  }
2423  return (uint16_t)thread_max;
2424 }
2425 
2426 /** \brief inject a flow into a threads flow queue
2427  */
2428 void TmThreadsInjectFlowById(Flow *f, const int id)
2429 {
2430  if (id > 0 && id <= (int)thread_store.threads_size) {
2431  int idx = id - 1;
2432  Thread *t = &thread_store.threads[idx];
2433  ThreadVars *tv = t->tv;
2434  if (tv != NULL && tv->flow_queue != NULL) {
2435  FlowEnqueue(tv->flow_queue, f);
2436 
2437  /* wake up listening thread(s) if necessary */
2438  if (tv->inq != NULL) {
2439  SCMutexLock(&tv->inq->pq->mutex_q);
2440  SCCondSignal(&tv->inq->pq->cond_q);
2441  SCMutexUnlock(&tv->inq->pq->mutex_q);
2442  } else if (tv->break_loop) {
2443  TmThreadsCaptureBreakLoop(tv);
2444  }
2445  return;
2446  }
2447  }
2448  BUG_ON(1);
2449 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:68
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:822
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:77
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:81
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:70
SCTmThreadsSlotPacketLoopFinish
bool SCTmThreadsSlotPacketLoopFinish(ThreadVars *tv)
Definition: tm-threads.c:273
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:135
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2318
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1801
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:58
tm-threads.h
TmThreadsSealThreads
void TmThreadsSealThreads(void)
Definition: tm-threads.c:2173
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:85
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:60
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
TmThreadLibSpawn
TmEcode TmThreadLibSpawn(ThreadVars *tv)
Spawns a "fake" lib thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1777
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:133
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1724
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1125
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check, const uint8_t module_flags)
Disable all packet threads.
Definition: tm-threads.c:1536
ThreadVars_::iface_name
char * iface_name
Definition: threadvars.h:139
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
threading_set_cpu_affinity
bool threading_set_cpu_affinity
Definition: runmodes.c:62
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:863
SCTIME_CMP_NEQ
#define SCTIME_CMP_NEQ(a, b)
Definition: util-time.h:107
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:70
Thread_::SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(SCTime_t, pktts)
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:314
TmThreadContinueThreads
void TmThreadContinueThreads(void)
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1990
CLS
#define CLS
Definition: suricata-common.h:69
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1068
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:379
AffinityGetYamlPath
char * AffinityGetYamlPath(ThreadsAffinityType *taf)
Get the YAML path for the given affinity type. The path is built using the parent name (if available)...
Definition: util-affinity.c:429
TmThreadSetGroupName
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1681
CaptureStatsSetup
void CaptureStatsSetup(ThreadVars *tv)
Definition: decode.c:1032
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:105
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:48
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:275
PKT_SRC_SHUTDOWN_FLUSH
@ PKT_SRC_SHUTDOWN_FLUSH
Definition: decode.h:64
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:101
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1842
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:175
thread-callbacks.h
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:56
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:45
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:83
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:62
Packet_::flags
uint32_t flags
Definition: decode.h:544
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:80
threads.h
UtilAffinityGetAffinedCPUNum
uint16_t UtilAffinityGetAffinedCPUNum(ThreadsAffinityType *taf)
Return the total number of CPUs in a given affinity.
Definition: util-affinity.c:1043
Threads
Threads
Definition: tm-threads.c:2167
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:356
TmThreadsUnsealThreads
void TmThreadsUnsealThreads(void)
Definition: tm-threads.c:2181
ThreadVars_::t
pthread_t t
Definition: threadvars.h:59
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:304
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:71
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2092
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:104
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:66
StatsSetupPrivate
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1205
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:88
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:117
MIN
#define MIN(x, y)
Definition: suricata-common.h:408
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:82
SCSpinLock
#define SCSpinLock
Definition: threads-debug.h:235
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:74
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1385
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:387
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:121
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:774
SCTIME_CMP_EQ
#define SCTIME_CMP_EQ(a, b)
Definition: util-time.h:108
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:89
m
SCMutex m
Definition: flow-hash.h:6
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:44
TmModule_::ThreadBusy
bool(* ThreadBusy)(ThreadVars *tv, void *thread_data)
Definition: tm-modules.h:67
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:88
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:146
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:314
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:82
rww_lock_contention
thread_local uint64_t rww_lock_contention
PacketQueueNoLock_
simple fifo queue for packets
Definition: packet-queue.h:34
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:173
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2209
Thread_::in_use
int in_use
Definition: tm-threads.c:2154
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:38
TmThreadsGetThreadTime
SCTime_t TmThreadsGetThreadTime(const int idx)
Definition: tm-threads.c:2361
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:58
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:244
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:81
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2428
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:81
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2369
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmThreadsProcessDecodePseudoPackets
TmEcode TmThreadsProcessDecodePseudoPackets(ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
Definition: tm-threads.c:114
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:53
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:46
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:46
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1323
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:109
util-signal.h
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:79
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:785
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1592
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2151
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1854
RunmodeIsWorkers
bool RunmodeIsWorkers(void)
Definition: runmodes.c:204
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:57
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:2029
RunmodeIsAutofp
bool RunmodeIsAutofp(void)
Definition: runmodes.c:209
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:61
tv
ThreadVars * tv
Definition: tm-threads.c:2162
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:82
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:96
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:208
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:58
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
threading_set_stack_size
uint64_t threading_set_stack_size
Definition: runmodes.c:63
ThreadVars_::perf_public_ctx
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:128
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1266
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
Definition: tm-threads.c:2297
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:90
SCEnter
#define SCEnter(...)
Definition: util-debug.h:277
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:79
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:69
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:56
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1699
THV_KILL
#define THV_KILL
Definition: threadvars.h:40
THV_REQ_FLOW_LOOP
#define THV_REQ_FLOW_LOOP
Definition: threadvars.h:47
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:938
FlowQueueNew
FlowQueue * FlowQueueNew(void)
Definition: flow-queue.c:35
SCSpinUnlock
#define SCSpinUnlock
Definition: threads-debug.h:237
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:853
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2263
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2213
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:255
thread-storage.h
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
ThreadsAffinityType_::nocpu_warned
bool nocpu_warned
Definition: util-affinity.h:91
__attribute__
struct Thread_ __attribute__((aligned(CLS)))
Definition: tm-threads.c:2161
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:89
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:125
ThreadVars_::id
int id
Definition: threadvars.h:87
ThreadVars_::type
uint8_t type
Definition: threadvars.h:72
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:90
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:317
SCTIME_TO_TIMEVAL
#define SCTIME_TO_TIMEVAL(tv, t)
Definition: util-time.h:97
TmThreadTimeoutLoop
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
Definition: tm-threads.c:166
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:88
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:85
SCReturn
#define SCReturn
Definition: util-debug.h:279
stream.h
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:376
Thread_::sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2158
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1623
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:139
Packet_
Definition: decode.h:501
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:133
ThreadVars_::thread_group_name
char * thread_group_name
Definition: threadvars.h:67
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:69
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1593
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:79
TmSlot_
Definition: tm-threads.h:53
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:89
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:69
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:80
name
const char * name
Definition: tm-threads.c:2163
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:89
runmodes.h
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1097
AffinityGetNextCPU
uint16_t AffinityGetNextCPU(ThreadVars *tv, ThreadsAffinityType *taf)
Definition: util-affinity.c:1006
RECEIVE_CPU_SET
@ RECEIVE_CPU_SET
Definition: util-affinity.h:57
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
TmModule_
Definition: tm-modules.h:47
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:255
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
Definition: tm-threads.c:2342
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:116
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
SCTIME_CMP_LT
#define SCTIME_CMP_LT(a, b)
Definition: util-time.h:105
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:39
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:37
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:658
StatsPublicThreadContext_::m
SCMutex m
Definition: counters.h:75
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:378
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1594
Thread_::type
int type
Definition: tm-threads.c:2153
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:90
Packet_::flow
struct Flow_ * flow
Definition: decode.h:546
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1184
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:75
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:54
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:831
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:35
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:38
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:106
TmThreadsWaitForUnpause
bool TmThreadsWaitForUnpause(ThreadVars *tv)
Wait for a thread to become unpaused.
Definition: tm-threads.c:363
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:41
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:136
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:101
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:847
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:234
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:72
sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2169
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:51
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:77
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
FatalError
#define FatalError(...)
Definition: util-debug.h:510
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax(void)
Definition: tm-threads.c:2410
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:66
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1158
StatsSyncCounters
void StatsSyncCounters(ThreadVars *tv)
Definition: counters.c:445
ThreadStorageSize
unsigned int ThreadStorageSize(void)
Definition: thread-storage.c:25
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:147
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:52
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:216
threadvars.h
util-validate.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:258
SCSpinInit
#define SCSpinInit
Definition: threads-debug.h:238
SCThreadRunInitCallbacks
void SCThreadRunInitCallbacks(ThreadVars *tv)
Definition: thread-callbacks.c:48
ThreadsAffinityType_
Definition: util-affinity.h:72
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:37
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:635
TmSlot_::tm_flags
uint8_t tm_flags
Definition: tm-threads.h:67
Thread_::name
const char * name
Definition: tm-threads.c:2152
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:131
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:50
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:91
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:267
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:74
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:69
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:73
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:317
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:84
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:61
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:112
SCSpinlock
#define SCSpinlock
Definition: threads-debug.h:234
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
suricata.h
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:91
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:476
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:145
Tmq_
Definition: tm-queues.h:29
Thread_::spin
SCSpinlock spin
Definition: tm-threads.c:2160
TmThreadWaitOnThreadRunning
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
Definition: tm-threads.c:1921
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2189
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:2006
type
int type
Definition: tm-threads.c:2164
ThreadFreeStorage
void ThreadFreeStorage(ThreadVars *tv)
Definition: thread-storage.c:50
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:132
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:83
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:84
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
FindAffinityByInterface
ThreadsAffinityType * FindAffinityByInterface(ThreadsAffinityType *parent, const char *interface_name)
Definition: util-affinity.c:113
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:108
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:274
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:93
SCTIME_INITIALIZER
#define SCTIME_INITIALIZER
Definition: util-time.h:51
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:243
SCTIME_ADD_SECS
#define SCTIME_ADD_SECS(ts, s)
Definition: util-time.h:64
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:42
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2150
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:281
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
StatsThreadCleanup
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1304
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:66
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:63
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:293
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:102
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:80
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:949
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:359
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:172
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:350