suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2024 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "thread-callbacks.h"
33 #include "threadvars.h"
34 #include "thread-storage.h"
35 #include "tm-queues.h"
36 #include "tm-queuehandlers.h"
37 #include "tm-threads.h"
38 #include "tmqh-packetpool.h"
39 #include "threads.h"
40 #include "util-affinity.h"
41 #include "util-debug.h"
42 #include "util-privs.h"
43 #include "util-cpu.h"
44 #include "util-optimize.h"
45 #include "util-profiling.h"
46 #include "util-signal.h"
47 #include "queue.h"
48 #include "util-validate.h"
49 
50 #ifdef PROFILE_LOCKING
51 thread_local uint64_t mutex_lock_contention;
52 thread_local uint64_t mutex_lock_wait_ticks;
53 thread_local uint64_t mutex_lock_cnt;
54 
55 thread_local uint64_t spin_lock_contention;
56 thread_local uint64_t spin_lock_wait_ticks;
57 thread_local uint64_t spin_lock_cnt;
58 
59 thread_local uint64_t rww_lock_contention;
60 thread_local uint64_t rww_lock_wait_ticks;
61 thread_local uint64_t rww_lock_cnt;
62 
63 thread_local uint64_t rwr_lock_contention;
64 thread_local uint64_t rwr_lock_wait_ticks;
65 thread_local uint64_t rwr_lock_cnt;
66 #endif
67 
68 #ifdef OS_FREEBSD
69 #include <sched.h>
70 #include <sys/param.h>
71 #include <sys/resource.h>
72 #include <sys/cpuset.h>
73 #include <sys/thr.h>
74 #define cpu_set_t cpuset_t
75 #endif /* OS_FREEBSD */
76 
77 /* prototypes */
78 static int SetCPUAffinity(uint16_t cpu);
79 static void TmThreadDeinitMC(ThreadVars *tv);
80 
81 /* root of the threadvars list */
82 ThreadVars *tv_root[TVT_MAX] = { NULL };
83 
84 /* lock to protect tv_root */
86 
87 /**
88  * \brief Check if a thread flag is set.
89  *
90  * \retval 1 flag is set.
91  * \retval 0 flag is not set.
92  */
93 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
94 {
95  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
96 }
97 
98 /**
99  * \brief Set a thread flag.
100  */
101 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
102 {
103  SC_ATOMIC_OR(tv->flags, flag);
104 }
105 
106 /**
107  * \brief Unset a thread flag.
108  */
109 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
110 {
111  SC_ATOMIC_AND(tv->flags, ~flag);
112 }
113 
115  ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
116 {
117  while (decode_pq->top != NULL) {
118  Packet *extra_p = PacketDequeueNoLock(decode_pq);
119  if (unlikely(extra_p == NULL))
120  continue;
121  DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
122 
123  if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
125  }
126  }
128 }
129 
130 /**
131  * \brief Separate run function so we can call it recursively.
132  */
134 {
135  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
136  PACKET_PROFILING_TMM_START(p, s->tm_id);
137  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
138  PACKET_PROFILING_TMM_END(p, s->tm_id);
139  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
140 
141  /* handle error */
142  if (unlikely(r == TM_ECODE_FAILED)) {
143  /* Encountered error. Return packets to packetpool and return */
144  TmThreadsSlotProcessPktFail(tv, NULL);
145  return TM_ECODE_FAILED;
146  }
147  if (s->tm_flags & TM_FLAG_DECODE_TM) {
148  if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
149  TM_ECODE_OK) {
150  return TM_ECODE_FAILED;
151  }
152  }
153  }
154 
155  return TM_ECODE_OK;
156 }
157 
158 /** \internal
159  *
160  * \brief Process flow timeout packets
161  *
162  * Process flow timeout pseudo packets. During shutdown this loop
163  * is run until the flow engine kills the thread and the queue is
164  * empty.
165  */
167 {
168  TmSlot *fw_slot = tv->tm_flowworker;
169  int r = TM_ECODE_OK;
170 
171  if (tv->stream_pq == NULL || fw_slot == NULL) {
172  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
173  return r;
174  }
175 
176  SCLogDebug("flow end loop starting");
177  while (1) {
179  uint32_t len = tv->stream_pq->len;
181  if (len > 0) {
182  while (len--) {
186  if (likely(p)) {
187  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
188  r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
189  if (r == TM_ECODE_FAILED) {
190  break;
191  }
192  }
193  }
194  } else {
196  break;
197  }
198  SleepUsec(1);
199  }
200  }
201  SCLogDebug("flow end loop complete");
203 
204  return r;
205 }
206 
207 static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv)
208 {
209  TmSlot *s = tv->tm_slots;
210 
212 
213  if (tv->thread_setup_flags != 0)
215 
217  PacketPoolInit();
218 
219  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
220  if (slot->SlotThreadInit != NULL) {
221  void *slot_data = NULL;
222  TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
223  if (r != TM_ECODE_OK) {
224  if (r == TM_ECODE_DONE) {
225  EngineDone();
227  goto error;
228  } else {
230  goto error;
231  }
232  }
233  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
234  }
235 
236  /* if the flowworker module is the first, get the threads input queue */
237  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
238  tv->stream_pq = tv->inq->pq;
239  tv->tm_flowworker = slot;
240  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
242  if (tv->flow_queue == NULL) {
244  goto error;
245  }
246  /* setup a queue */
247  } else if (slot->tm_id == TMM_FLOWWORKER) {
248  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
249  if (tv->stream_pq_local == NULL)
250  FatalError("failed to alloc PacketQueue");
253  tv->tm_flowworker = slot;
254  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
256  if (tv->flow_queue == NULL) {
258  goto error;
259  }
260  }
261  }
262 
264 
266 
267  return true;
268 
269 error:
270  return false;
271 }
272 
274 {
275  TmSlot *s = tv->tm_slots;
276  bool rc = true;
277 
279 
281 
282  /* process all pseudo packets the flow timeout may throw at us */
284 
287 
289 
290  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
291  if (slot->SlotThreadExitPrintStats != NULL) {
292  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
293  }
294 
295  if (slot->SlotThreadDeinit != NULL) {
296  TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
297  if (r != TM_ECODE_OK) {
299  rc = false;
300  break;
301  }
302  }
303  }
304 
305  tv->stream_pq = NULL;
307  return rc;
308 }
309 
310 static void *TmThreadsSlotPktAcqLoop(void *td)
311 {
312  ThreadVars *tv = (ThreadVars *)td;
313  TmSlot *s = tv->tm_slots;
314  TmEcode r = TM_ECODE_OK;
315 
316  /* check if we are setup properly */
317  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
318  SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
319  " PktAcqLoop=%p, tmqh_in=%p,"
320  " tmqh_out=%p",
321  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
323  pthread_exit(NULL);
324  return NULL;
325  }
326 
327  if (!TmThreadsSlotPktAcqLoopInit(td)) {
328  goto error;
329  }
330 
331  bool run = TmThreadsWaitForUnpause(tv);
332 
333  while (run) {
334  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
335 
336  if (r == TM_ECODE_FAILED) {
338  run = false;
339  }
341  run = false;
342  }
343  if (r == TM_ECODE_DONE) {
344  run = false;
345  }
346  }
348  goto error;
349  }
350 
351  SCLogDebug("%s ending", tv->name);
352  pthread_exit((void *) 0);
353  return NULL;
354 
355 error:
356  pthread_exit(NULL);
357  return NULL;
358 }
359 
360 /**
361  * Also returns if the kill flag is set.
362  */
364 {
367 
368  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
369  SleepUsec(100);
370 
372  return false;
373  }
374 
376  }
377 
378  return true;
379 }
380 
381 static void *TmThreadsLib(void *td)
382 {
383  ThreadVars *tv = (ThreadVars *)td;
384  TmSlot *s = tv->tm_slots;
385 
386  /* check if we are setup properly */
387  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
388  SCLogError("TmSlot or ThreadVars badly setup: s=%p, tmqh_in=%p,"
389  " tmqh_out=%p",
390  s, tv->tmqh_in, tv->tmqh_out);
392  return NULL;
393  }
394 
395  if (!TmThreadsSlotPktAcqLoopInit(tv)) {
396  goto error;
397  }
398 
399  if (!TmThreadsWaitForUnpause(tv)) {
400  goto error;
401  }
402 
403  return NULL;
404 
405 error:
406  tv->stream_pq = NULL;
407  return (void *)-1;
408 }
409 
410 static void *TmThreadsSlotVar(void *td)
411 {
412  ThreadVars *tv = (ThreadVars *)td;
413  TmSlot *s = (TmSlot *)tv->tm_slots;
414  Packet *p = NULL;
415  TmEcode r = TM_ECODE_OK;
416 
418  PacketPoolInit();//Empty();
419 
421 
422  if (tv->thread_setup_flags != 0)
424 
425  /* Drop the capabilities for this thread */
426  SCDropCaps(tv);
427 
428  /* check if we are setup properly */
429  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
431  pthread_exit(NULL);
432  return NULL;
433  }
434 
435  for (; s != NULL; s = s->slot_next) {
436  if (s->SlotThreadInit != NULL) {
437  void *slot_data = NULL;
438  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
439  if (r != TM_ECODE_OK) {
441  goto error;
442  }
443  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
444  }
445 
446  /* special case: we need to access the stream queue
447  * from the flow timeout code */
448 
449  /* if the flowworker module is the first, get the threads input queue */
450  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
451  tv->stream_pq = tv->inq->pq;
452  tv->tm_flowworker = s;
453  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
455  if (tv->flow_queue == NULL) {
457  pthread_exit(NULL);
458  return NULL;
459  }
460  /* setup a queue */
461  } else if (s->tm_id == TMM_FLOWWORKER) {
462  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
463  if (tv->stream_pq_local == NULL)
464  FatalError("failed to alloc PacketQueue");
467  tv->tm_flowworker = s;
468  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
470  if (tv->flow_queue == NULL) {
472  pthread_exit(NULL);
473  return NULL;
474  }
475  }
476  }
477 
479 
480  // Each 'worker' thread uses this func to process/decode the packet read.
481  // Each decode method is different to receive methods in that they do not
482  // enter infinite loops. They use this as the core loop. As a result, at this
483  // point the worker threads can be considered both initialized and running.
485  bool run = TmThreadsWaitForUnpause(tv);
486 
487  s = (TmSlot *)tv->tm_slots;
488 
489  while (run) {
490  /* input a packet */
491  p = tv->tmqh_in(tv);
492 
493  /* if we didn't get a packet see if we need to do some housekeeping */
494  if (unlikely(p == NULL)) {
495  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
497  if (p != NULL) {
498  p->flags |= PKT_PSEUDO_STREAM_END;
500  }
501  }
502  }
503 
504  if (p != NULL) {
505  /* run the thread module(s) */
506  r = TmThreadsSlotVarRun(tv, p, s);
507  if (r == TM_ECODE_FAILED) {
510  break;
511  }
512 
513  /* output the packet */
514  tv->tmqh_out(tv, p);
515 
516  /* now handle the stream pq packets */
517  TmThreadsHandleInjectedPackets(tv);
518  }
519 
521  run = false;
522  }
523  }
525  goto error;
526  }
528 
529  pthread_exit(NULL);
530  return NULL;
531 
532 error:
533  tv->stream_pq = NULL;
534  pthread_exit(NULL);
535  return NULL;
536 }
537 
538 static void *TmThreadsManagement(void *td)
539 {
540  ThreadVars *tv = (ThreadVars *)td;
541  TmSlot *s = (TmSlot *)tv->tm_slots;
542  TmEcode r = TM_ECODE_OK;
543 
544  BUG_ON(s == NULL);
545 
547 
548  if (tv->thread_setup_flags != 0)
550 
551  /* Drop the capabilities for this thread */
552  SCDropCaps(tv);
553 
554  SCLogDebug("%s starting", tv->name);
555 
556  if (s->SlotThreadInit != NULL) {
557  void *slot_data = NULL;
558  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
559  if (r != TM_ECODE_OK) {
561  pthread_exit(NULL);
562  return NULL;
563  }
564  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
565  }
566 
568 
570 
571  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
572  /* handle error */
573  if (r == TM_ECODE_FAILED) {
575  }
576 
579  }
580 
583 
584  if (s->SlotThreadExitPrintStats != NULL) {
585  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
586  }
587 
588  if (s->SlotThreadDeinit != NULL) {
589  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
590  if (r != TM_ECODE_OK) {
592  pthread_exit(NULL);
593  return NULL;
594  }
595  }
596 
598  pthread_exit((void *) 0);
599  return NULL;
600 }
601 
602 /**
603  * \brief We set the slot functions.
604  *
605  * \param tv Pointer to the TV to set the slot function for.
606  * \param name Name of the slot variant.
607  * \param fn_p Pointer to a custom slot function. Used only if slot variant
608  * "name" is "custom".
609  *
610  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
611  */
612 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
613 {
614  if (name == NULL) {
615  if (fn_p == NULL) {
616  printf("Both slot name and function pointer can't be NULL inside "
617  "TmThreadSetSlots\n");
618  goto error;
619  } else {
620  name = "custom";
621  }
622  }
623 
624  if (strcmp(name, "varslot") == 0) {
625  tv->tm_func = TmThreadsSlotVar;
626  } else if (strcmp(name, "pktacqloop") == 0) {
627  tv->tm_func = TmThreadsSlotPktAcqLoop;
628  } else if (strcmp(name, "management") == 0) {
629  tv->tm_func = TmThreadsManagement;
630  } else if (strcmp(name, "command") == 0) {
631  tv->tm_func = TmThreadsManagement;
632  } else if (strcmp(name, "lib") == 0) {
633  tv->tm_func = TmThreadsLib;
634  } else if (strcmp(name, "custom") == 0) {
635  if (fn_p == NULL)
636  goto error;
637  tv->tm_func = fn_p;
638  } else {
639  printf("Error: Slot \"%s\" not supported\n", name);
640  goto error;
641  }
642 
643  return TM_ECODE_OK;
644 
645 error:
646  return TM_ECODE_FAILED;
647 }
648 
649 /**
650  * \brief Appends a new entry to the slots.
651  *
652  * \param tv TV the slot is attached to.
653  * \param tm TM to append.
654  * \param data Data to be passed on to the slot init function.
655  *
656  * \retval The allocated TmSlot or NULL if there is an error
657  */
658 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
659 {
660  TmSlot *slot = SCCalloc(1, sizeof(TmSlot));
661  if (unlikely(slot == NULL))
662  return;
663  SC_ATOMIC_INITPTR(slot->slot_data);
664  slot->SlotThreadInit = tm->ThreadInit;
665  slot->slot_initdata = data;
666  if (tm->Func) {
667  slot->SlotFunc = tm->Func;
668  } else if (tm->PktAcqLoop) {
669  slot->PktAcqLoop = tm->PktAcqLoop;
670  if (tm->PktAcqBreakLoop) {
671  tv->break_loop = true;
672  }
673  } else if (tm->Management) {
674  slot->Management = tm->Management;
675  }
677  slot->SlotThreadDeinit = tm->ThreadDeinit;
678  /* we don't have to check for the return value "-1". We wouldn't have
679  * received a TM as arg, if it didn't exist */
680  slot->tm_id = TmModuleGetIDForTM(tm);
681  slot->tm_flags |= tm->flags;
682 
683  tv->tmm_flags |= tm->flags;
684  tv->cap_flags |= tm->cap_flags;
685 
686  if (tv->tm_slots == NULL) {
687  tv->tm_slots = slot;
688  } else {
689  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
690 
691  /* get the last slot */
692  for ( ; a != NULL; a = a->slot_next) {
693  b = a;
694  }
695  /* append the new slot */
696  if (b != NULL) {
697  b->slot_next = slot;
698  }
699  }
700 }
701 
702 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
703 static int SetCPUAffinitySet(cpu_set_t *cs)
704 {
705 #if defined OS_FREEBSD
706  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
707  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
708 #elif OS_DARWIN
709  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
710  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
711 #else
712  pid_t tid = (pid_t)syscall(SYS_gettid);
713  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
714 #endif /* OS_FREEBSD */
715 
716  if (r != 0) {
717  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
718  strerror(errno));
719  return -1;
720  }
721 
722  return 0;
723 }
724 #endif
725 
726 
727 /**
728  * \brief Set the thread affinity on the calling thread.
729  *
730  * \param cpuid Id of the core/cpu to setup the affinity.
731  *
732  * \retval 0 If all goes well; -1 if something is wrong.
733  */
734 static int SetCPUAffinity(uint16_t cpuid)
735 {
736 #if defined __OpenBSD__ || defined sun
737  return 0;
738 #else
739  int cpu = (int)cpuid;
740 
741 #if defined OS_WIN32 || defined __CYGWIN__
742  DWORD cs = 1 << cpu;
743 
744  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
745  if (r != 0) {
746  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
747  strerror(errno));
748  return -1;
749  }
750  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
751  SCGetThreadIdLong(), cpu);
752 
753  return 0;
754 
755 #else
756  cpu_set_t cs;
757  memset(&cs, 0, sizeof(cs));
758 
759  CPU_ZERO(&cs);
760  CPU_SET(cpu, &cs);
761  return SetCPUAffinitySet(&cs);
762 #endif /* windows */
763 #endif /* not supported */
764 }
765 
766 
767 /**
768  * \brief Set the thread options (thread priority).
769  *
770  * \param tv Pointer to the ThreadVars to setup the thread priority.
771  *
772  * \retval TM_ECODE_OK.
773  */
775 {
777  tv->thread_priority = prio;
778 
779  return TM_ECODE_OK;
780 }
781 
782 /**
783  * \brief Adjusting nice value for threads.
784  */
786 {
787  SCEnter();
788 #ifndef __CYGWIN__
789 #ifdef OS_WIN32
790  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
791  SCLogError("Error setting priority for "
792  "thread %s: %s",
793  tv->name, strerror(errno));
794  } else {
795  SCLogDebug("Priority set to %"PRId32" for thread %s",
797  }
798 #else
799  int ret = nice(tv->thread_priority);
800  if (ret == -1) {
801  SCLogError("Error setting nice value %d "
802  "for thread %s: %s",
803  tv->thread_priority, tv->name, strerror(errno));
804  } else {
805  SCLogDebug("Nice value set to %"PRId32" for thread %s",
807  }
808 #endif /* OS_WIN32 */
809 #endif
810  SCReturn;
811 }
812 
813 
814 /**
815  * \brief Set the thread options (cpu affinity).
816  *
817  * \param tv pointer to the ThreadVars to setup the affinity.
818  * \param cpu cpu on which affinity is set.
819  *
820  * \retval TM_ECODE_OK
821  */
823 {
825  tv->cpu_affinity = cpu;
826 
827  return TM_ECODE_OK;
828 }
829 
830 
832 {
834  return TM_ECODE_OK;
835 
836  if (type > MAX_CPU_SET) {
837  SCLogError("invalid cpu type family");
838  return TM_ECODE_FAILED;
839  }
840 
842  tv->cpu_affinity = type;
843 
844  return TM_ECODE_OK;
845 }
846 
848 {
849  if (type >= MAX_CPU_SET) {
850  SCLogError("invalid cpu type family");
851  return 0;
852  }
853 
855 }
856 
857 /**
858  * \brief Set the thread options (cpu affinitythread).
859  * Priority should be already set by pthread_create.
860  *
861  * \param tv pointer to the ThreadVars of the calling thread.
862  */
864 {
866  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
867  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
869  SetCPUAffinity(tv->cpu_affinity);
870  }
871 
872 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
877  bool use_iface_affinity = RunmodeIsAutofp() && tv->cpu_affinity == RECEIVE_CPU_SET &&
878  FindAffinityByInterface(taf, tv->iface_name) != NULL;
879  use_iface_affinity |= RunmodeIsWorkers() && tv->cpu_affinity == WORKER_CPU_SET &&
880  FindAffinityByInterface(taf, tv->iface_name) != NULL;
881 
882  if (use_iface_affinity) {
884  }
885 
886  if (UtilAffinityGetAffinedCPUNum(taf) == 0) {
887  if (!taf->nocpu_warned) {
888  SCLogWarning("No CPU affinity set for %s", AffinityGetYamlPath(taf));
889  taf->nocpu_warned = true;
890  }
891  }
892 
893  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
894  uint16_t cpu = AffinityGetNextCPU(tv, taf);
895  SetCPUAffinity(cpu);
896  /* If CPU is in a set overwrite the default thread prio */
897  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
899  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
901  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
903  } else {
904  tv->thread_priority = taf->prio;
905  }
906  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
907  "%d, thread id %lu", tv->thread_priority,
908  tv->name, cpu, SCGetThreadIdLong());
909  } else {
910  SetCPUAffinitySet(&taf->cpu_set);
911  tv->thread_priority = taf->prio;
912  SCLogPerf("Setting prio %d for thread \"%s\", "
913  "thread id %lu", tv->thread_priority,
915  }
917  }
918 #endif
919 
920  return TM_ECODE_OK;
921 }
922 
923 /**
924  * \brief Creates and returns the TV instance for a new thread.
925  *
926  * \param name Name of this TV instance
927  * \param inq_name Incoming queue name
928  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
929  * \param outq_name Outgoing queue name
930  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
931  * \param slots String representation for the slot function to be used
932  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
933  * \param mucond Flag to indicate whether to initialize the condition
934  * and the mutex variables for this newly created TV.
935  *
936  * \retval the newly created TV instance, or NULL on error
937  */
938 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
939  const char *outq_name, const char *outqh_name, const char *slots,
940  void * (*fn_p)(void *), int mucond)
941 {
942  ThreadVars *tv = NULL;
943  Tmq *tmq = NULL;
944  Tmqh *tmqh = NULL;
945 
946  SCLogDebug("creating thread \"%s\"...", name);
947 
948  /* XXX create separate function for this: allocate a thread container */
949  tv = SCCalloc(1, sizeof(ThreadVars) + ThreadStorageSize());
950  if (unlikely(tv == NULL))
951  goto error;
952 
953  SC_ATOMIC_INIT(tv->flags);
955 
956  strlcpy(tv->name, name, sizeof(tv->name));
957 
958  /* default state for every newly created thread */
960 
961  /* set the incoming queue */
962  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
963  SCLogDebug("inq_name \"%s\"", inq_name);
964 
965  tmq = TmqGetQueueByName(inq_name);
966  if (tmq == NULL) {
967  tmq = TmqCreateQueue(inq_name);
968  if (tmq == NULL)
969  goto error;
970  }
971  SCLogDebug("tmq %p", tmq);
972 
973  tv->inq = tmq;
974  tv->inq->reader_cnt++;
975  SCLogDebug("tv->inq %p", tv->inq);
976  }
977  if (inqh_name != NULL) {
978  SCLogDebug("inqh_name \"%s\"", inqh_name);
979 
980  int id = TmqhNameToID(inqh_name);
981  if (id <= 0) {
982  goto error;
983  }
984  tmqh = TmqhGetQueueHandlerByName(inqh_name);
985  if (tmqh == NULL)
986  goto error;
987 
988  tv->tmqh_in = tmqh->InHandler;
989  tv->inq_id = (uint8_t)id;
990  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
991  }
992 
993  /* set the outgoing queue */
994  if (outqh_name != NULL) {
995  SCLogDebug("outqh_name \"%s\"", outqh_name);
996 
997  int id = TmqhNameToID(outqh_name);
998  if (id <= 0) {
999  goto error;
1000  }
1001 
1002  tmqh = TmqhGetQueueHandlerByName(outqh_name);
1003  if (tmqh == NULL)
1004  goto error;
1005 
1006  tv->tmqh_out = tmqh->OutHandler;
1007  tv->outq_id = (uint8_t)id;
1008 
1009  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1010  SCLogDebug("outq_name \"%s\"", outq_name);
1011 
1012  if (tmqh->OutHandlerCtxSetup != NULL) {
1013  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1014  if (tv->outctx == NULL)
1015  goto error;
1016  tv->outq = NULL;
1017  } else {
1018  tmq = TmqGetQueueByName(outq_name);
1019  if (tmq == NULL) {
1020  tmq = TmqCreateQueue(outq_name);
1021  if (tmq == NULL)
1022  goto error;
1023  }
1024  SCLogDebug("tmq %p", tmq);
1025 
1026  tv->outq = tmq;
1027  tv->outctx = NULL;
1028  tv->outq->writer_cnt++;
1029  }
1030  }
1031  }
1032 
1033  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1034  goto error;
1035  }
1036 
1037  if (mucond != 0)
1038  TmThreadInitMC(tv);
1039 
1041 
1042  return tv;
1043 
1044 error:
1045  SCLogError("failed to setup a thread");
1046 
1047  if (tv != NULL)
1048  SCFree(tv);
1049  return NULL;
1050 }
1051 
1052 /**
1053  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1054  * This function doesn't support custom slots, and hence shouldn't be
1055  * supplied \"custom\" as its slot type. All PPT threads are created
1056  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1057  * conditional variables are not used to kill the thread.
1058  *
1059  * \param name Name of this TV instance
1060  * \param inq_name Incoming queue name
1061  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1062  * \param outq_name Outgoing queue name
1063  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1064  * \param slots String representation for the slot function to be used
1065  *
1066  * \retval the newly created TV instance, or NULL on error
1067  */
1068 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1069  const char *inqh_name, const char *outq_name,
1070  const char *outqh_name, const char *slots)
1071 {
1072  ThreadVars *tv = NULL;
1073 
1074  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1075  slots, NULL, 0);
1076 
1077  if (tv != NULL) {
1078  tv->type = TVT_PPT;
1080  }
1081 
1082  return tv;
1083 }
1084 
1085 /**
1086  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1087  * This function supports only custom slot functions and hence a
1088  * function pointer should be sent as an argument.
1089  *
1090  * \param name Name of this TV instance
1091  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1092  * \param mucond Flag to indicate whether to initialize the condition
1093  * and the mutex variables for this newly created TV.
1094  *
1095  * \retval the newly created TV instance, or NULL on error
1096  */
1097 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1098  int mucond)
1099 {
1100  ThreadVars *tv = NULL;
1101 
1102  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1103 
1104  if (tv != NULL) {
1105  tv->type = TVT_MGMT;
1108  }
1109 
1110  return tv;
1111 }
1112 
1113 /**
1114  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1115  * This function supports only custom slot functions and hence a
1116  * function pointer should be sent as an argument.
1117  *
1118  * \param name Name of this TV instance
1119  * \param module Name of TmModule with MANAGEMENT flag set.
1120  * \param mucond Flag to indicate whether to initialize the condition
1121  * and the mutex variables for this newly created TV.
1122  *
1123  * \retval the newly created TV instance, or NULL on error
1124  */
1125 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1126  int mucond)
1127 {
1128  ThreadVars *tv = NULL;
1129 
1130  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1131 
1132  if (tv != NULL) {
1133  tv->type = TVT_MGMT;
1136 
1137  TmModule *m = TmModuleGetByName(module);
1138  if (m) {
1139  TmSlotSetFuncAppend(tv, m, NULL);
1140  }
1141  }
1142 
1143  return tv;
1144 }
1145 
1146 /**
1147  * \brief Creates and returns the TV instance for a Command thread (CMD).
1148  * This function supports only custom slot functions and hence a
1149  * function pointer should be sent as an argument.
1150  *
1151  * \param name Name of this TV instance
1152  * \param module Name of TmModule with COMMAND flag set.
1153  * \param mucond Flag to indicate whether to initialize the condition
1154  * and the mutex variables for this newly created TV.
1155  *
1156  * \retval the newly created TV instance, or NULL on error
1157  */
1158 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1159  int mucond)
1160 {
1161  ThreadVars *tv = NULL;
1162 
1163  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1164 
1165  if (tv != NULL) {
1166  tv->type = TVT_CMD;
1169 
1170  TmModule *m = TmModuleGetByName(module);
1171  if (m) {
1172  TmSlotSetFuncAppend(tv, m, NULL);
1173  }
1174  }
1175 
1176  return tv;
1177 }
1178 
1179 /**
1180  * \brief Appends this TV to tv_root based on its type
1181  *
1182  * \param type holds the type this TV belongs to.
1183  */
1185 {
1187 
1188  if (tv_root[type] == NULL) {
1189  tv_root[type] = tv;
1190  tv->next = NULL;
1191 
1193 
1194  return;
1195  }
1196 
1197  ThreadVars *t = tv_root[type];
1198 
1199  while (t) {
1200  if (t->next == NULL) {
1201  t->next = tv;
1202  tv->next = NULL;
1203  break;
1204  }
1205 
1206  t = t->next;
1207  }
1208 
1210 }
1211 
1212 static bool ThreadStillHasPackets(ThreadVars *tv)
1213 {
1214  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1215  /* we wait till we dry out all the inq packets, before we
1216  * kill this thread. Do note that you should have disabled
1217  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1218  PacketQueue *q = tv->inq->pq;
1219  SCMutexLock(&q->mutex_q);
1220  uint32_t len = q->len;
1221  SCMutexUnlock(&q->mutex_q);
1222  if (len != 0) {
1223  return true;
1224  }
1225  }
1226 
1227  if (tv->stream_pq != NULL) {
1229  uint32_t len = tv->stream_pq->len;
1231 
1232  if (len != 0) {
1233  return true;
1234  }
1235  }
1236  return false;
1237 }
1238 
1239 /**
1240  * \brief Kill a thread.
1241  *
1242  * \param tv A ThreadVars instance corresponding to the thread that has to be
1243  * killed.
1244  *
1245  * \retval r 1 killed successfully
1246  * 0 not yet ready, needs another look
1247  */
1248 static int TmThreadKillThread(ThreadVars *tv)
1249 {
1250  BUG_ON(tv == NULL);
1251 
1252  /* kill only once :) */
1253  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1254  return 1;
1255  }
1256 
1257  /* set the thread flag informing the thread that it needs to be
1258  * terminated */
1261 
1262  /* to be sure, signal more */
1263  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1264  if (tv->inq_id != TMQH_NOT_SET) {
1266  if (qh != NULL && qh->InShutdownHandler != NULL) {
1267  qh->InShutdownHandler(tv);
1268  }
1269  }
1270  if (tv->inq != NULL) {
1271  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1272  SCMutexLock(&tv->inq->pq->mutex_q);
1273  SCCondSignal(&tv->inq->pq->cond_q);
1274  SCMutexUnlock(&tv->inq->pq->mutex_q);
1275  }
1276  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1277  }
1278 
1279  if (tv->ctrl_cond != NULL ) {
1281  pthread_cond_broadcast(tv->ctrl_cond);
1283  }
1284  return 0;
1285  }
1286 
1287  if (tv->outctx != NULL) {
1288  if (tv->outq_id != TMQH_NOT_SET) {
1290  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1291  qh->OutHandlerCtxFree(tv->outctx);
1292  tv->outctx = NULL;
1293  }
1294  }
1295  }
1296 
1297  /* Join the thread and flag as dead, unless the thread ID is 0 as
1298  * its not a thread created by Suricata. */
1299  if (tv->t) {
1300  pthread_join(tv->t, NULL);
1301  SCLogDebug("thread %s stopped", tv->name);
1302  }
1304  return 1;
1305 }
1306 
1307 static bool ThreadBusy(ThreadVars *tv)
1308 {
1309  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
1310  TmModule *tm = TmModuleGetById(s->tm_id);
1311  if (tm && tm->ThreadBusy != NULL) {
1312  if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
1313  return true;
1314  }
1315  }
1316  return false;
1317 }
1318 
1319 /** \internal
1320  *
1321  * \brief make sure that all packet threads are done processing their
1322  * in-flight packets, including 'injected' flow packets.
1323  */
1324 static void TmThreadDrainPacketThreads(void)
1325 {
1326  ThreadVars *tv = NULL;
1327  struct timeval start_ts;
1328  struct timeval cur_ts;
1329  gettimeofday(&start_ts, NULL);
1330 
1331 again:
1332  gettimeofday(&cur_ts, NULL);
1333  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1334  SCLogWarning("unable to get all packet threads "
1335  "to process their packets in time");
1336  return;
1337  }
1338 
1340 
1341  /* all receive threads are part of packet processing threads */
1342  tv = tv_root[TVT_PPT];
1343  while (tv) {
1344  if (ThreadStillHasPackets(tv)) {
1345  /* we wait till we dry out all the inq packets, before we
1346  * kill this thread. Do note that you should have disabled
1347  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1349 
1350  /* sleep outside lock */
1351  SleepMsec(1);
1352  goto again;
1353  }
1354  if (ThreadBusy(tv)) {
1356 
1357  Packet *p = PacketGetFromAlloc();
1358  if (p != NULL) {
1361  PacketQueue *q = tv->stream_pq;
1362  SCMutexLock(&q->mutex_q);
1363  PacketEnqueue(q, p);
1364  SCCondSignal(&q->cond_q);
1365  SCMutexUnlock(&q->mutex_q);
1366  }
1367 
1368  /* don't sleep while holding a lock */
1369  SleepMsec(1);
1370  goto again;
1371  }
1372  tv = tv->next;
1373  }
1374 
1376 }
1377 
1378 /**
1379  * \brief Disable all threads having the specified TMs.
1380  *
1381  * Breaks out of the packet acquisition loop, and bumps
1382  * into the 'flow loop', where it will process packets
1383  * from the flow engine's shutdown handling.
1384  */
1386 {
1387  ThreadVars *tv = NULL;
1388  struct timeval start_ts;
1389  struct timeval cur_ts;
1390  gettimeofday(&start_ts, NULL);
1391 
1392 again:
1393  gettimeofday(&cur_ts, NULL);
1394  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1395  FatalError("Engine unable to disable detect "
1396  "thread - \"%s\". Killing engine",
1397  tv->name);
1398  }
1399 
1401 
1402  /* all receive threads are part of packet processing threads */
1403  tv = tv_root[TVT_PPT];
1404 
1405  /* we do have to keep in mind that TVs are arranged in the order
1406  * right from receive to log. The moment we fail to find a
1407  * receive TM amongst the slots in a tv, it indicates we are done
1408  * with all receive threads */
1409  while (tv) {
1410  int disable = 0;
1411  TmModule *tm = NULL;
1412  /* obtain the slots for this TV */
1413  TmSlot *slots = tv->tm_slots;
1414  while (slots != NULL) {
1415  tm = TmModuleGetById(slots->tm_id);
1416 
1417  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1418  disable = 1;
1419  break;
1420  }
1421 
1422  slots = slots->slot_next;
1423  continue;
1424  }
1425 
1426  if (disable) {
1427  if (ThreadStillHasPackets(tv)) {
1428  /* we wait till we dry out all the inq packets, before we
1429  * kill this thread. Do note that you should have disabled
1430  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1432  /* don't sleep while holding a lock */
1433  SleepMsec(1);
1434  goto again;
1435  }
1436 
1437  if (ThreadBusy(tv)) {
1439 
1440  Packet *p = PacketGetFromAlloc();
1441  if (p != NULL) {
1444  PacketQueue *q = tv->stream_pq;
1445  SCMutexLock(&q->mutex_q);
1446  PacketEnqueue(q, p);
1447  SCCondSignal(&q->cond_q);
1448  SCMutexUnlock(&q->mutex_q);
1449  }
1450 
1451  /* don't sleep while holding a lock */
1452  SleepMsec(1);
1453  goto again;
1454  }
1455 
1456  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1457  if (tm && tm->PktAcqBreakLoop != NULL) {
1458  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1459  }
1461 
1462  if (tv->inq != NULL) {
1463  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1464  SCMutexLock(&tv->inq->pq->mutex_q);
1465  SCCondSignal(&tv->inq->pq->cond_q);
1466  SCMutexUnlock(&tv->inq->pq->mutex_q);
1467  }
1468  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1469  }
1470 
1471  /* wait for it to enter the 'flow loop' stage */
1472  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1474 
1475  SleepMsec(1);
1476  goto again;
1477  }
1478  }
1479 
1480  tv = tv->next;
1481  }
1482 
1484 
1485  /* finally wait for all packet threads to have
1486  * processed all of their 'live' packets so we
1487  * don't process the last live packets together
1488  * with FFR packets */
1489  TmThreadDrainPacketThreads();
1490 }
1491 
1492 #ifdef DEBUG_VALIDATION
1493 static void TmThreadDumpThreads(void);
1494 #endif
1495 
1496 static void TmThreadDebugValidateNoMorePackets(void)
1497 {
1498 #ifdef DEBUG_VALIDATION
1500  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1501  if (ThreadStillHasPackets(tv)) {
1503  TmThreadDumpThreads();
1505  }
1506  }
1508 #endif
1509 }
1510 
1511 /** \internal
1512  * \brief check if a thread has any of the modules indicated by TM_FLAG_*
1513  * \param tv thread
1514  * \param flags TM_FLAG_*'s
1515  * \retval bool true if at least on of the flags is present */
1516 static inline bool CheckModuleFlags(const ThreadVars *tv, const uint8_t flags)
1517 {
1518  return (tv->tmm_flags & flags) != 0;
1519 }
1520 
1521 /**
1522  * \brief Disable all packet threads
1523  * \param set flag to set
1524  * \param check flag to check
1525  * \param module_flags bitflags of TmModule's to apply the `set` flag to.
1526  *
1527  * Support 2 stages in shutting down the packet threads:
1528  * 1. set THV_REQ_FLOW_LOOP and wait for THV_FLOW_LOOP
1529  * 2. set THV_KILL and wait for THV_RUNNING_DONE
1530  *
1531  * During step 1 the main loop is exited, and the flow loop logic is entered.
1532  * During step 2, the flow loop logic is done and the thread closes.
1533  *
1534  * `module_flags` limits which threads are disabled
1535  */
1537  const uint16_t set, const uint16_t check, const uint8_t module_flags)
1538 {
1539  struct timeval start_ts;
1540  struct timeval cur_ts;
1541 
1542  /* first drain all packet threads of their packets */
1543  TmThreadDrainPacketThreads();
1544 
1545  /* since all the threads possibly able to produce more packets
1546  * are now gone or inactive, we should see no packets anywhere
1547  * anymore. */
1548  TmThreadDebugValidateNoMorePackets();
1549 
1550  gettimeofday(&start_ts, NULL);
1551 again:
1552  gettimeofday(&cur_ts, NULL);
1553  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1554  FatalError("Engine unable to disable packet "
1555  "threads. Killing engine");
1556  }
1557 
1558  /* loop through the packet threads and kill them */
1560  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1561  /* only set flow worker threads to THV_REQ_FLOW_LOOP */
1562  if (!CheckModuleFlags(tv, module_flags)) {
1563  SCLogDebug("%s does not have any of the modules %02x, skip", tv->name, module_flags);
1564  continue;
1565  }
1566  TmThreadsSetFlag(tv, set);
1567 
1568  /* separate worker threads (autofp) will still wait at their
1569  * input queues. So nudge them here so they will observe the
1570  * THV_KILL flag. */
1571  if (tv->inq != NULL) {
1572  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1573  SCMutexLock(&tv->inq->pq->mutex_q);
1574  SCCondSignal(&tv->inq->pq->cond_q);
1575  SCMutexUnlock(&tv->inq->pq->mutex_q);
1576  }
1577  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1578  }
1579 
1580  /* wait for it to reach the expected state */
1581  if (!TmThreadsCheckFlag(tv, check)) {
1583  SCLogDebug("%s did not reach state %u, again", tv->name, check);
1584 
1585  SleepMsec(1);
1586  goto again;
1587  }
1588  }
1590 }
1591 
1592 #define MIN_WAIT_TIME 100
1593 #define MAX_WAIT_TIME 999999
1595 {
1596  ThreadVars *tv = NULL;
1597  unsigned int sleep_usec = MIN_WAIT_TIME;
1598 
1599  BUG_ON((family < 0) || (family >= TVT_MAX));
1600 
1601 again:
1603  tv = tv_root[family];
1604 
1605  while (tv) {
1606  int r = TmThreadKillThread(tv);
1607  if (r == 0) {
1609  SleepUsec(sleep_usec);
1610  sleep_usec *= 2; /* slowly back off */
1611  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1612  goto again;
1613  }
1614  sleep_usec = MIN_WAIT_TIME; /* reset */
1615 
1616  tv = tv->next;
1617  }
1619 }
1620 #undef MIN_WAIT_TIME
1621 #undef MAX_WAIT_TIME
1622 
1624 {
1625  int i = 0;
1626 
1627  for (i = 0; i < TVT_MAX; i++) {
1629  }
1630 }
1631 
1632 static void TmThreadFree(ThreadVars *tv)
1633 {
1634  TmSlot *s;
1635  TmSlot *ps;
1636  if (tv == NULL)
1637  return;
1638 
1639  SCLogDebug("Freeing thread '%s'.", tv->name);
1640 
1642 
1643  if (tv->flow_queue) {
1644  BUG_ON(tv->flow_queue->qlen != 0);
1645  SCFree(tv->flow_queue);
1646  }
1647 
1649 
1650  TmThreadDeinitMC(tv);
1651 
1652  if (tv->printable_name) {
1654  }
1655 
1656  if (tv->iface_name) {
1657  SCFree(tv->iface_name);
1658  }
1659 
1660  if (tv->stream_pq_local) {
1664  }
1665 
1666  s = (TmSlot *)tv->tm_slots;
1667  while (s) {
1668  ps = s;
1669  s = s->slot_next;
1670  SCFree(ps);
1671  }
1672 
1674  SCFree(tv);
1675 }
1676 
1678 {
1679  ThreadVars *tv = NULL;
1680  ThreadVars *ptv = NULL;
1681 
1682  if ((family < 0) || (family >= TVT_MAX))
1683  return;
1684 
1686  tv = tv_root[family];
1687 
1688  while (tv) {
1689  ptv = tv;
1690  tv = tv->next;
1691  TmThreadFree(ptv);
1692  }
1693  tv_root[family] = NULL;
1695 }
1696 
1697 /**
1698  * \brief Spawns a thread associated with the ThreadVars instance tv
1699  *
1700  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1701  */
1703 {
1704  pthread_attr_t attr;
1705  if (tv->tm_func == NULL) {
1706  FatalError("No thread function set");
1707  }
1708 
1709  /* Initialize and set thread detached attribute */
1710  pthread_attr_init(&attr);
1711 
1712  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1713 
1714  /* Adjust thread stack size if configured */
1716  SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1717  if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1718  FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
1720  }
1721  }
1722 
1723  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1724  if (rc) {
1725  FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc,
1726  strerror(errno));
1727  }
1728 
1729 #if DEBUG && HAVE_PTHREAD_GETATTR_NP
1731  if (pthread_getattr_np(tv->t, &attr) == 0) {
1732  size_t stack_size;
1733  void *stack_addr;
1734  pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1735  SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1736  } else {
1737  SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1738  "pthread_getattr_np() is %" PRId32,
1739  rc);
1740  }
1741  }
1742 #endif
1743 
1745 
1746  TmThreadAppend(tv, tv->type);
1747  return TM_ECODE_OK;
1748 }
1749 
1750 /**
1751  * \brief Spawns a "fake" lib thread associated with the ThreadVars instance tv
1752  *
1753  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1754  */
1756 {
1757  if (tv->tm_func == NULL) {
1758  printf("ERROR: no thread function set\n");
1759  return TM_ECODE_FAILED;
1760  }
1761 
1762  if (tv->tm_func((void *)tv) == (void *)-1) {
1763  return TM_ECODE_FAILED;
1764  }
1765 
1767 
1768  return TM_ECODE_OK;
1769 }
1770 
1771 /**
1772  * \brief Initializes the mutex and condition variables for this TV
1773  *
1774  * It can be used by a thread to control a wait loop that can also be
1775  * influenced by other threads.
1776  *
1777  * \param tv Pointer to a TV instance
1778  */
1780 {
1781  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1782  FatalError("Fatal error encountered in TmThreadInitMC. "
1783  "Exiting...");
1784  }
1785 
1786  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1787  printf("Error initializing the tv->m mutex\n");
1788  exit(EXIT_FAILURE);
1789  }
1790 
1791  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1792  FatalError("Fatal error encountered in TmThreadInitMC. "
1793  "Exiting...");
1794  }
1795 
1796  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1797  FatalError("Error initializing the tv->cond condition "
1798  "variable");
1799  }
1800 }
1801 
1802 static void TmThreadDeinitMC(ThreadVars *tv)
1803 {
1804  if (tv->ctrl_mutex) {
1806  SCFree(tv->ctrl_mutex);
1807  }
1808  if (tv->ctrl_cond) {
1810  SCFree(tv->ctrl_cond);
1811  }
1812 }
1813 
1814 /**
1815  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1816  * the kill flag has been set or not on the thread.
1817  *
1818  * \param tv Pointer to the TV instance.
1819  */
1821 {
1822  while (!TmThreadsCheckFlag(tv, flags)) {
1823  SleepUsec(100);
1824  }
1825 }
1826 
1827 /**
1828  * \brief Unpauses a thread
1829  *
1830  * \param tv Pointer to a TV instance that has to be unpaused
1831  */
1833 {
1835 }
1836 
1837 static TmEcode WaitOnThreadsRunningByType(const int t)
1838 {
1839  struct timeval start_ts;
1840  struct timeval cur_ts;
1841  uint32_t thread_cnt = 0;
1842 
1843  /* on retries, this will init to the last thread that started up already */
1844  ThreadVars *tv_start = tv_root[t];
1846  for (ThreadVars *tv = tv_start; tv != NULL; tv = tv->next) {
1847  thread_cnt++;
1848  }
1850 
1851  /* give threads a second each to start up, plus a margin of a minute. */
1852  uint32_t time_budget = 60 + thread_cnt;
1853 
1854  gettimeofday(&start_ts, NULL);
1855 again:
1857  ThreadVars *tv = tv_start;
1858  while (tv != NULL) {
1861 
1862  SCLogError("thread \"%s\" failed to "
1863  "start: flags %04x",
1864  tv->name, SC_ATOMIC_GET(tv->flags));
1865  return TM_ECODE_FAILED;
1866  }
1867 
1870 
1871  /* 60 seconds provided for the thread to transition from
1872  * THV_INIT_DONE to THV_RUNNING */
1873  gettimeofday(&cur_ts, NULL);
1874  if (((uint32_t)cur_ts.tv_sec - (uint32_t)start_ts.tv_sec) > time_budget) {
1875  SCLogError("thread \"%s\" failed to "
1876  "start in time: flags %04x. Total threads: %u. Time budget %us",
1877  tv->name, SC_ATOMIC_GET(tv->flags), thread_cnt, time_budget);
1878  return TM_ECODE_FAILED;
1879  }
1880 
1881  /* sleep a little to give the thread some
1882  * time to start running */
1883  SleepUsec(100);
1884  goto again;
1885  }
1886  tv_start = tv;
1887 
1888  tv = tv->next;
1889  }
1891  return TM_ECODE_OK;
1892 }
1893 
1894 /**
1895  * \brief Waits for all threads to be in a running state
1896  *
1897  * \retval TM_ECODE_OK if all are running or error if a thread failed
1898  */
1900 {
1901  uint16_t RX_num = 0;
1902  uint16_t W_num = 0;
1903  uint16_t FM_num = 0;
1904  uint16_t FR_num = 0;
1905  uint16_t TX_num = 0;
1906 
1907  for (int i = 0; i < TVT_MAX; i++) {
1908  if (WaitOnThreadsRunningByType(i) != TM_ECODE_OK)
1909  return TM_ECODE_FAILED;
1910  }
1911 
1913  for (int i = 0; i < TVT_MAX; i++) {
1914  for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
1915  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1916  RX_num++;
1917  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1918  W_num++;
1919  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1920  TX_num++;
1921  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1922  FM_num++;
1923  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1924  FR_num++;
1925  }
1926  }
1928 
1929  /* Construct a welcome string displaying
1930  * initialized thread types and counts */
1931  uint16_t app_len = 32;
1932  uint16_t buf_len = 256;
1933 
1934  char append_str[app_len];
1935  char thread_counts[buf_len];
1936 
1937  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1938  if (RX_num > 0) {
1939  snprintf(append_str, app_len, "RX: %u ", RX_num);
1940  strlcat(thread_counts, append_str, buf_len);
1941  }
1942  if (W_num > 0) {
1943  snprintf(append_str, app_len, "W: %u ", W_num);
1944  strlcat(thread_counts, append_str, buf_len);
1945  }
1946  if (TX_num > 0) {
1947  snprintf(append_str, app_len, "TX: %u ", TX_num);
1948  strlcat(thread_counts, append_str, buf_len);
1949  }
1950  if (FM_num > 0) {
1951  snprintf(append_str, app_len, "FM: %u ", FM_num);
1952  strlcat(thread_counts, append_str, buf_len);
1953  }
1954  if (FR_num > 0) {
1955  snprintf(append_str, app_len, "FR: %u ", FR_num);
1956  strlcat(thread_counts, append_str, buf_len);
1957  }
1958  snprintf(append_str, app_len, " Engine started.");
1959  strlcat(thread_counts, append_str, buf_len);
1960  SCLogNotice("%s", thread_counts);
1961 
1962  return TM_ECODE_OK;
1963 }
1964 
1965 /**
1966  * \brief Unpauses all threads present in tv_root
1967  */
1969 {
1971  for (int i = 0; i < TVT_MAX; i++) {
1972  ThreadVars *tv = tv_root[i];
1973  while (tv != NULL) {
1975  tv = tv->next;
1976  }
1977  }
1979 }
1980 
1981 /**
1982  * \brief Used to check the thread for certain conditions of failure.
1983  */
1985 {
1987  for (int i = 0; i < TVT_MAX; i++) {
1988  ThreadVars *tv = tv_root[i];
1989  while (tv) {
1991  FatalError("thread %s failed", tv->name);
1992  }
1993  tv = tv->next;
1994  }
1995  }
1997 }
1998 
1999 /**
2000  * \brief Used to check if all threads have finished their initialization. On
2001  * finding an un-initialized thread, it waits till that thread completes
2002  * its initialization, before proceeding to the next thread.
2003  *
2004  * \retval TM_ECODE_OK all initialized properly
2005  * \retval TM_ECODE_FAILED failure
2006  */
2008 {
2009  struct timeval start_ts;
2010  struct timeval cur_ts;
2011  gettimeofday(&start_ts, NULL);
2012 
2013 again:
2015  for (int i = 0; i < TVT_MAX; i++) {
2016  ThreadVars *tv = tv_root[i];
2017  while (tv != NULL) {
2020 
2021  SCLogError("thread \"%s\" failed to "
2022  "initialize: flags %04x",
2023  tv->name, SC_ATOMIC_GET(tv->flags));
2024  return TM_ECODE_FAILED;
2025  }
2026 
2027  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
2029 
2030  gettimeofday(&cur_ts, NULL);
2031  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
2032  SCLogError("thread \"%s\" failed to "
2033  "initialize in time: flags %04x",
2034  tv->name, SC_ATOMIC_GET(tv->flags));
2035  return TM_ECODE_FAILED;
2036  }
2037 
2038  /* sleep a little to give the thread some
2039  * time to finish initialization */
2040  SleepUsec(100);
2041  goto again;
2042  }
2043 
2046  SCLogError("thread \"%s\" failed to "
2047  "initialize.",
2048  tv->name);
2049  return TM_ECODE_FAILED;
2050  }
2053  SCLogError("thread \"%s\" closed on "
2054  "initialization.",
2055  tv->name);
2056  return TM_ECODE_FAILED;
2057  }
2058 
2059  tv = tv->next;
2060  }
2061  }
2063 
2064  return TM_ECODE_OK;
2065 }
2066 
2067 /**
2068  * \brief returns a count of all the threads that match the flag
2069  */
2071 {
2072  uint32_t cnt = 0;
2074  for (int i = 0; i < TVT_MAX; i++) {
2075  ThreadVars *tv = tv_root[i];
2076  while (tv != NULL) {
2077  if ((tv->tmm_flags & flags) == flags)
2078  cnt++;
2079 
2080  tv = tv->next;
2081  }
2082  }
2084  return cnt;
2085 }
2086 
2087 #ifdef DEBUG_VALIDATION
2088 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2089 {
2090  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2092  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2093  tv, s, s->tm_id, m->name);
2094  }
2095 }
2096 
2097 static void TmThreadDumpThreads(void)
2098 {
2100  for (int i = 0; i < TVT_MAX; i++) {
2101  ThreadVars *tv = tv_root[i];
2102  while (tv != NULL) {
2103  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2104  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2105  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2106  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2107  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2108  } else if (tv->stream_pq_local != NULL) {
2109  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2110  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2111  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2112  }
2113  }
2114  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2115  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2116  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2117  }
2118  TmThreadDoDumpSlots(tv);
2119  tv = tv->next;
2120  }
2121  }
2124 }
2125 #endif
2126 
2127 /* Aligned to CLS to avoid false sharing between atomic ops. */
2128 typedef struct Thread_ {
2129  ThreadVars *tv; /**< threadvars structure */
2130  const char *name;
2131  int type;
2132  int in_use; /**< bool to indicate this is in use */
2133 
2134  SC_ATOMIC_DECLARE(SCTime_t, pktts); /**< current packet time of this thread
2135  * (offline mode) */
2136  SCTime_t sys_sec_stamp; /**< timestamp in real system
2137  * time when the pktts was last updated. */
2139 } __attribute__((aligned(CLS))) Thread;
2141 typedef struct Threads_ {
2142  Thread *threads;
2143  size_t threads_size;
2144  int threads_cnt;
2146 
2147 static bool thread_store_sealed = false;
2148 static Threads thread_store = { NULL, 0, 0 };
2149 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2150 
2152 {
2153  SCMutexLock(&thread_store_lock);
2154  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2155  thread_store_sealed = true;
2156  SCMutexUnlock(&thread_store_lock);
2157 }
2158 
2160 {
2161  SCMutexLock(&thread_store_lock);
2162  DEBUG_VALIDATE_BUG_ON(!thread_store_sealed);
2163  thread_store_sealed = false;
2164  SCMutexUnlock(&thread_store_lock);
2165 }
2166 
2168 {
2169  SCMutexLock(&thread_store_lock);
2170  for (size_t s = 0; s < thread_store.threads_size; s++) {
2171  Thread *t = &thread_store.threads[s];
2172  if (t == NULL || t->in_use == 0)
2173  continue;
2174 
2175  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2176  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2177  if (t->tv) {
2178  ThreadVars *tv = t->tv;
2179  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2180  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2181  tv, tv->type, tv->name, tv->tmm_flags, flags);
2182  }
2183  }
2184  SCMutexUnlock(&thread_store_lock);
2185 }
2186 
2187 #define STEP 32
2188 /**
2189  * \retval id thread id, or 0 if not found
2190  */
2192 {
2193  SCMutexLock(&thread_store_lock);
2194  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2195  if (thread_store.threads == NULL) {
2196  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2197  BUG_ON(thread_store.threads == NULL);
2198  thread_store.threads_size = STEP;
2199  }
2200 
2201  size_t s;
2202  for (s = 0; s < thread_store.threads_size; s++) {
2203  if (thread_store.threads[s].in_use == 0) {
2204  Thread *t = &thread_store.threads[s];
2205  SCSpinInit(&t->spin, 0);
2206  SCSpinLock(&t->spin);
2207  t->name = tv->name;
2208  t->type = type;
2209  t->tv = tv;
2210  t->in_use = 1;
2211  SCSpinUnlock(&t->spin);
2212 
2213  SCMutexUnlock(&thread_store_lock);
2214  return (int)(s+1);
2215  }
2216  }
2217 
2218  /* if we get here the array is completely filled */
2219  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2220  BUG_ON(newmem == NULL);
2221  thread_store.threads = newmem;
2222  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2223 
2224  Thread *t = &thread_store.threads[thread_store.threads_size];
2225  SCSpinInit(&t->spin, 0);
2226  SCSpinLock(&t->spin);
2227  t->name = tv->name;
2228  t->type = type;
2229  t->tv = tv;
2230  t->in_use = 1;
2231  SCSpinUnlock(&t->spin);
2232 
2233  s = thread_store.threads_size;
2234  thread_store.threads_size += STEP;
2235 
2236  SCMutexUnlock(&thread_store_lock);
2237  return (int)(s+1);
2238 }
2239 #undef STEP
2240 
2241 void TmThreadsUnregisterThread(const int id)
2242 {
2243  SCMutexLock(&thread_store_lock);
2244  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2245  if (id <= 0 || id > (int)thread_store.threads_size) {
2246  SCMutexUnlock(&thread_store_lock);
2247  return;
2248  }
2249 
2250  /* id is one higher than index */
2251  int idx = id - 1;
2252 
2253  /* reset thread_id, which serves as clearing the record */
2254  thread_store.threads[idx].in_use = 0;
2255 
2256  /* check if we have at least one registered thread left */
2257  size_t s;
2258  for (s = 0; s < thread_store.threads_size; s++) {
2259  Thread *t = &thread_store.threads[s];
2260  if (t->in_use == 1) {
2261  goto end;
2262  }
2263  }
2264 
2265  /* if we get here no threads are registered */
2266  SCFree(thread_store.threads);
2267  thread_store.threads = NULL;
2268  thread_store.threads_size = 0;
2269  thread_store.threads_cnt = 0;
2270 
2271 end:
2272  SCMutexUnlock(&thread_store_lock);
2273 }
2274 
2275 void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2276 {
2277  SCTime_t now = SCTimeGetTime();
2278  int idx = id - 1;
2279  Thread *t = &thread_store.threads[idx];
2280  SCSpinLock(&t->spin);
2281  SC_ATOMIC_SET(t->pktts, ts);
2282 
2283 #ifdef DEBUG
2284  if (t->sys_sec_stamp.secs != 0) {
2285  SCTime_t tmpts = SCTIME_ADD_SECS(t->sys_sec_stamp, 3);
2286  if (SCTIME_CMP_LT(tmpts, now)) {
2287  SCLogDebug("%s: thread slept for %u secs", t->name, (uint32_t)(now.secs - tmpts.secs));
2288  }
2289  }
2290 #endif
2291 
2292  t->sys_sec_stamp = now;
2293  SCSpinUnlock(&t->spin);
2294 }
2295 
2297 {
2298  static SCTime_t nullts = SCTIME_INITIALIZER;
2299  bool ready = true;
2300  for (size_t s = 0; s < thread_store.threads_size; s++) {
2301  Thread *t = &thread_store.threads[s];
2302  if (!t->in_use) {
2303  break;
2304  }
2305  SCSpinLock(&t->spin);
2306  if (t->type != TVT_PPT) {
2307  SCSpinUnlock(&t->spin);
2308  continue;
2309  }
2310  if (SCTIME_CMP_EQ(t->sys_sec_stamp, nullts)) {
2311  ready = false;
2312  SCSpinUnlock(&t->spin);
2313  break;
2314  }
2315  SCSpinUnlock(&t->spin);
2316  }
2317  return ready;
2318 }
2319 
2321 {
2322  SCTime_t now = SCTimeGetTime();
2323  for (size_t s = 0; s < thread_store.threads_size; s++) {
2324  Thread *t = &thread_store.threads[s];
2325  if (!t->in_use) {
2326  break;
2327  }
2328  SCSpinLock(&t->spin);
2329  if (t->type != TVT_PPT) {
2330  SCSpinUnlock(&t->spin);
2331  continue;
2332  }
2333  SC_ATOMIC_SET(t->pktts, ts);
2334  t->sys_sec_stamp = now;
2335  SCSpinUnlock(&t->spin);
2336  }
2337 }
2338 
2340 {
2341  DEBUG_VALIDATE_BUG_ON(idx == 0);
2342  const int i = idx - 1;
2343  Thread *t = &thread_store.threads[i];
2344  return SC_ATOMIC_GET(t->pktts);
2345 }
2346 
2347 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2348 {
2349  struct timeval local = { 0 };
2350  static SCTime_t nullts = SCTIME_INITIALIZER;
2351  bool set = false;
2352  SCTime_t now = SCTimeGetTime();
2353 
2354  for (size_t s = 0; s < thread_store.threads_size; s++) {
2355  Thread *t = &thread_store.threads[s];
2356  if (t->in_use == 0) {
2357  break;
2358  }
2359  SCSpinLock(&t->spin);
2360  /* only packet threads set timestamps based on packets */
2361  if (t->type != TVT_PPT) {
2362  SCSpinUnlock(&t->spin);
2363  continue;
2364  }
2365  SCTime_t pktts = SC_ATOMIC_GET(t->pktts);
2366  if (SCTIME_CMP_NEQ(pktts, nullts)) {
2367  SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5);
2368  /* ignore sleeping threads */
2369  if (SCTIME_CMP_LT(sys_sec_stamp, now)) {
2370  SCSpinUnlock(&t->spin);
2371  continue;
2372  }
2373  if (!set) {
2374  SCTIME_TO_TIMEVAL(&local, pktts);
2375  set = true;
2376  } else {
2377  if (SCTIME_CMP_LT(pktts, SCTIME_FROM_TIMEVAL(&local))) {
2378  SCTIME_TO_TIMEVAL(&local, pktts);
2379  }
2380  }
2381  }
2382  SCSpinUnlock(&t->spin);
2383  }
2384  *ts = local;
2385  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2386 }
2387 
2389 {
2390  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2391  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2392  /* always create at least one thread */
2393  if (thread_max == 0)
2394  thread_max = ncpus * threading_detect_ratio;
2395  if (thread_max < 1)
2396  thread_max = 1;
2397  if (thread_max > 1024) {
2398  SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2399  thread_max = 1024;
2400  }
2401  return (uint16_t)thread_max;
2402 }
2403 
2404 /** \brief inject a flow into a threads flow queue
2405  */
2406 void TmThreadsInjectFlowById(Flow *f, const int id)
2407 {
2408  if (id > 0 && id <= (int)thread_store.threads_size) {
2409  int idx = id - 1;
2410  Thread *t = &thread_store.threads[idx];
2411  ThreadVars *tv = t->tv;
2412  if (tv != NULL && tv->flow_queue != NULL) {
2413  FlowEnqueue(tv->flow_queue, f);
2414 
2415  /* wake up listening thread(s) if necessary */
2416  if (tv->inq != NULL) {
2417  SCMutexLock(&tv->inq->pq->mutex_q);
2418  SCCondSignal(&tv->inq->pq->cond_q);
2419  SCMutexUnlock(&tv->inq->pq->mutex_q);
2420  } else if (tv->break_loop) {
2421  TmThreadsCaptureBreakLoop(tv);
2422  }
2423  return;
2424  }
2425  }
2426  BUG_ON(1);
2427 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:68
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:822
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:77
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:81
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:70
SCTmThreadsSlotPacketLoopFinish
bool SCTmThreadsSlotPacketLoopFinish(ThreadVars *tv)
Definition: tm-threads.c:273
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:131
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2296
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1779
tm-threads.h
TmThreadsSealThreads
void TmThreadsSealThreads(void)
Definition: tm-threads.c:2151
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:85
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
TmThreadLibSpawn
TmEcode TmThreadLibSpawn(ThreadVars *tv)
Spawns a "fake" lib thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1755
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:133
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1702
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1125
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check, const uint8_t module_flags)
Disable all packet threads.
Definition: tm-threads.c:1536
ThreadVars_::iface_name
char * iface_name
Definition: threadvars.h:135
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
threading_set_cpu_affinity
bool threading_set_cpu_affinity
Definition: runmodes.c:62
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:863
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:66
SCTIME_CMP_NEQ
#define SCTIME_CMP_NEQ(a, b)
Definition: util-time.h:107
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:70
Thread_::SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(SCTime_t, pktts)
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:90
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:314
TmThreadContinueThreads
void TmThreadContinueThreads(void)
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1968
CLS
#define CLS
Definition: suricata-common.h:69
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1068
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:380
AffinityGetYamlPath
char * AffinityGetYamlPath(ThreadsAffinityType *taf)
Get the YAML path for the given affinity type. The path is built using the parent name (if available)...
Definition: util-affinity.c:429
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:88
CaptureStatsSetup
void CaptureStatsSetup(ThreadVars *tv)
Definition: decode.c:1046
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:104
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:48
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:282
PKT_SRC_SHUTDOWN_FLUSH
@ PKT_SRC_SHUTDOWN_FLUSH
Definition: decode.h:64
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:101
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1820
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:175
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:91
thread-callbacks.h
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:88
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:56
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:45
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:83
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:62
Packet_::flags
uint32_t flags
Definition: decode.h:544
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:80
threads.h
UtilAffinityGetAffinedCPUNum
uint16_t UtilAffinityGetAffinedCPUNum(ThreadsAffinityType *taf)
Return the total number of CPUs in a given affinity.
Definition: util-affinity.c:1043
Threads
Threads
Definition: tm-threads.c:2145
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:348
TmThreadsUnsealThreads
void TmThreadsUnsealThreads(void)
Definition: tm-threads.c:2159
ThreadVars_::t
pthread_t t
Definition: threadvars.h:59
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:305
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:71
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2070
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:103
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:66
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:116
MIN
#define MIN(x, y)
Definition: suricata-common.h:408
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:82
SCSpinLock
#define SCSpinLock
Definition: threads-debug.h:236
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:73
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1385
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:388
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:89
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:60
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:122
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:774
SCTIME_CMP_EQ
#define SCTIME_CMP_EQ(a, b)
Definition: util-time.h:108
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:89
m
SCMutex m
Definition: flow-hash.h:6
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:44
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:61
TmModule_::ThreadBusy
bool(* ThreadBusy)(ThreadVars *tv, void *thread_data)
Definition: tm-modules.h:67
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:142
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:305
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:82
rww_lock_contention
thread_local uint64_t rww_lock_contention
PacketQueueNoLock_
simple fifo queue for packets
Definition: packet-queue.h:34
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:173
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2187
Thread_::in_use
int in_use
Definition: tm-threads.c:2132
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:38
TmThreadsGetThreadTime
SCTime_t TmThreadsGetThreadTime(const int idx)
Definition: tm-threads.c:2339
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:58
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:235
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:81
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2406
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:80
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2347
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmThreadsProcessDecodePseudoPackets
TmEcode TmThreadsProcessDecodePseudoPackets(ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
Definition: tm-threads.c:114
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:53
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:46
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:46
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1327
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:109
util-signal.h
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:384
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:78
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:785
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1592
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2129
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1832
RunmodeIsWorkers
bool RunmodeIsWorkers(void)
Definition: runmodes.c:204
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:57
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:2007
RunmodeIsAutofp
bool RunmodeIsAutofp(void)
Definition: runmodes.c:209
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:61
tv
ThreadVars * tv
Definition: tm-threads.c:2140
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:58
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:82
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:95
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:208
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:58
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:120
threading_set_stack_size
uint64_t threading_set_stack_size
Definition: runmodes.c:63
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1270
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
Definition: tm-threads.c:2275
SCEnter
#define SCEnter(...)
Definition: util-debug.h:284
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:79
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:69
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:56
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1677
THV_KILL
#define THV_KILL
Definition: threadvars.h:40
THV_REQ_FLOW_LOOP
#define THV_REQ_FLOW_LOOP
Definition: threadvars.h:47
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:938
FlowQueueNew
FlowQueue * FlowQueueNew(void)
Definition: flow-queue.c:35
SCSpinUnlock
#define SCSpinUnlock
Definition: threads-debug.h:238
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:867
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2241
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2191
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:262
thread-storage.h
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
ThreadsAffinityType_::nocpu_warned
bool nocpu_warned
Definition: util-affinity.h:91
__attribute__
struct Thread_ __attribute__((aligned(CLS)))
Definition: tm-threads.c:2139
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:124
ThreadVars_::id
int id
Definition: threadvars.h:86
ThreadVars_::type
uint8_t type
Definition: threadvars.h:71
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:317
SCTIME_TO_TIMEVAL
#define SCTIME_TO_TIMEVAL(tv, t)
Definition: util-time.h:97
TmThreadTimeoutLoop
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
Definition: tm-threads.c:166
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:88
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:85
SCReturn
#define SCReturn
Definition: util-debug.h:286
stream.h
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:377
Thread_::sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2136
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1623
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:139
Packet_
Definition: decode.h:501
RECEIVE_CPU_SET
@ RECEIVE_CPU_SET
Definition: util-affinity.h:57
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:129
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:69
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1593
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:79
TmSlot_
Definition: tm-threads.h:53
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:89
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:68
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:80
name
const char * name
Definition: tm-threads.c:2141
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
runmodes.h
StatsSetupPrivate
int StatsSetupPrivate(StatsThreadContext *stats, const char *thread_name)
Definition: counters.c:1229
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1097
AffinityGetNextCPU
uint16_t AffinityGetNextCPU(ThreadVars *tv, ThreadsAffinityType *taf)
Definition: util-affinity.c:1006
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
TmModule_
Definition: tm-modules.h:47
StatsThreadInit
void StatsThreadInit(StatsThreadContext *stats)
Definition: counters.c:1249
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:256
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
Definition: tm-threads.c:2320
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:115
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
SCTIME_CMP_LT
#define SCTIME_CMP_LT(a, b)
Definition: util-time.h:105
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:39
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:37
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:658
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:379
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1594
Thread_::type
int type
Definition: tm-threads.c:2131
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:140
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:89
Packet_::flow
struct Flow_ * flow
Definition: decode.h:546
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1184
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:74
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:54
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:831
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:35
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:38
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:105
TmThreadsWaitForUnpause
bool TmThreadsWaitForUnpause(ThreadVars *tv)
Wait for a thread to become unpaused.
Definition: tm-threads.c:363
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:41
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:132
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:100
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:847
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:241
StatsSyncCounters
void StatsSyncCounters(StatsThreadContext *stats)
Definition: counters.c:478
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:72
sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2147
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:51
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:77
FatalError
#define FatalError(...)
Definition: util-debug.h:517
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax(void)
Definition: tm-threads.c:2388
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:66
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1158
ThreadStorageSize
unsigned int ThreadStorageSize(void)
Definition: thread-storage.c:25
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:143
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:52
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:216
threadvars.h
util-validate.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:262
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:89
SCSpinInit
#define SCSpinInit
Definition: threads-debug.h:239
SCThreadRunInitCallbacks
void SCThreadRunInitCallbacks(ThreadVars *tv)
Definition: thread-callbacks.c:48
ThreadsAffinityType_
Definition: util-affinity.h:72
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:37
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:635
TmSlot_::tm_flags
uint8_t tm_flags
Definition: tm-threads.h:67
Thread_::name
const char * name
Definition: tm-threads.c:2130
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:131
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:50
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:90
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:274
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:90
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:74
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:69
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:73
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:317
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:83
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:111
SCSpinlock
#define SCSpinlock
Definition: threads-debug.h:235
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:376
suricata.h
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:485
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:141
Tmq_
Definition: tm-queues.h:29
Thread_::spin
SCSpinlock spin
Definition: tm-threads.c:2138
TmThreadWaitOnThreadRunning
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
Definition: tm-threads.c:1899
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2167
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1984
type
int type
Definition: tm-threads.c:2142
ThreadFreeStorage
void ThreadFreeStorage(ThreadVars *tv)
Definition: thread-storage.c:50
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:128
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:82
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:84
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
FindAffinityByInterface
ThreadsAffinityType * FindAffinityByInterface(ThreadsAffinityType *parent, const char *interface_name)
Definition: util-affinity.c:113
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:108
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:265
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:93
SCTIME_INITIALIZER
#define SCTIME_INITIALIZER
Definition: util-time.h:51
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:250
SCTIME_ADD_SECS
#define SCTIME_ADD_SECS(ts, s)
Definition: util-time.h:64
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:42
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2128
ThreadVars_::stats
StatsThreadContext stats
Definition: threadvars.h:121
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:288
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:121
StatsThreadCleanup
void StatsThreadCleanup(StatsThreadContext *stats)
Definition: counters.c:1345
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:63
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:297
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:102
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:80
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:952
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:359
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:176
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:350