suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2024 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "thread-callbacks.h"
33 #include "threadvars.h"
34 #include "thread-storage.h"
35 #include "tm-queues.h"
36 #include "tm-queuehandlers.h"
37 #include "tm-threads.h"
38 #include "tmqh-packetpool.h"
39 #include "threads.h"
40 #include "util-affinity.h"
41 #include "util-debug.h"
42 #include "util-privs.h"
43 #include "util-cpu.h"
44 #include "util-optimize.h"
45 #include "util-profiling.h"
46 #include "util-signal.h"
47 #include "queue.h"
48 #include "util-validate.h"
49 
50 #ifdef PROFILE_LOCKING
51 thread_local uint64_t mutex_lock_contention;
52 thread_local uint64_t mutex_lock_wait_ticks;
53 thread_local uint64_t mutex_lock_cnt;
54 
55 thread_local uint64_t spin_lock_contention;
56 thread_local uint64_t spin_lock_wait_ticks;
57 thread_local uint64_t spin_lock_cnt;
58 
59 thread_local uint64_t rww_lock_contention;
60 thread_local uint64_t rww_lock_wait_ticks;
61 thread_local uint64_t rww_lock_cnt;
62 
63 thread_local uint64_t rwr_lock_contention;
64 thread_local uint64_t rwr_lock_wait_ticks;
65 thread_local uint64_t rwr_lock_cnt;
66 #endif
67 
68 #ifdef OS_FREEBSD
69 #include <sched.h>
70 #include <sys/param.h>
71 #include <sys/resource.h>
72 #include <sys/cpuset.h>
73 #include <sys/thr.h>
74 #define cpu_set_t cpuset_t
75 #endif /* OS_FREEBSD */
76 
77 /* prototypes */
78 static int SetCPUAffinity(uint16_t cpu);
79 static void TmThreadDeinitMC(ThreadVars *tv);
80 
81 /* root of the threadvars list */
82 ThreadVars *tv_root[TVT_MAX] = { NULL };
83 
84 /* lock to protect tv_root */
86 
87 /**
88  * \brief Check if a thread flag is set.
89  *
90  * \retval 1 flag is set.
91  * \retval 0 flag is not set.
92  */
93 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
94 {
95  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
96 }
97 
98 /**
99  * \brief Set a thread flag.
100  */
101 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
102 {
103  SC_ATOMIC_OR(tv->flags, flag);
104 }
105 
106 /**
107  * \brief Unset a thread flag.
108  */
109 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
110 {
111  SC_ATOMIC_AND(tv->flags, ~flag);
112 }
113 
115  ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
116 {
117  while (decode_pq->top != NULL) {
118  Packet *extra_p = PacketDequeueNoLock(decode_pq);
119  if (unlikely(extra_p == NULL))
120  continue;
121  DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
122 
123  if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
125  }
126  }
128 }
129 
130 /**
131  * \brief Separate run function so we can call it recursively.
132  */
134 {
135  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
136  PACKET_PROFILING_TMM_START(p, s->tm_id);
137  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
138  PACKET_PROFILING_TMM_END(p, s->tm_id);
139  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
140 
141  /* handle error */
142  if (unlikely(r == TM_ECODE_FAILED)) {
143  /* Encountered error. Return packets to packetpool and return */
144  TmThreadsSlotProcessPktFail(tv, NULL);
145  return TM_ECODE_FAILED;
146  }
147  if (s->tm_flags & TM_FLAG_DECODE_TM) {
148  if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
149  TM_ECODE_OK) {
150  return TM_ECODE_FAILED;
151  }
152  }
153  }
154 
155  return TM_ECODE_OK;
156 }
157 
158 /** \internal
159  *
160  * \brief Process flow timeout packets
161  *
162  * Process flow timeout pseudo packets. During shutdown this loop
163  * is run until the flow engine kills the thread and the queue is
164  * empty.
165  */
167 {
168  TmSlot *fw_slot = tv->tm_flowworker;
169  int r = TM_ECODE_OK;
170 
171  if (tv->stream_pq == NULL || fw_slot == NULL) {
172  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
173  return r;
174  }
175 
176  SCLogDebug("flow end loop starting");
177  while (1) {
179  uint32_t len = tv->stream_pq->len;
181  if (len > 0) {
182  while (len--) {
186  if (likely(p)) {
187  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
188  r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
189  if (r == TM_ECODE_FAILED) {
190  break;
191  }
192  }
193  }
194  } else {
196  break;
197  }
198  SleepUsec(1);
199  }
200  }
201  SCLogDebug("flow end loop complete");
203 
204  return r;
205 }
206 
207 static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv)
208 {
209  TmSlot *s = tv->tm_slots;
210 
212 
213  if (tv->thread_setup_flags != 0)
215 
217  PacketPoolInit();
218 
219  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
220  if (slot->SlotThreadInit != NULL) {
221  void *slot_data = NULL;
222  TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
223  if (r != TM_ECODE_OK) {
224  if (r == TM_ECODE_DONE) {
225  EngineDone();
227  goto error;
228  } else {
230  goto error;
231  }
232  }
233  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
234  }
235 
236  /* if the flowworker module is the first, get the threads input queue */
237  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
238  tv->stream_pq = tv->inq->pq;
239  tv->tm_flowworker = slot;
240  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
242  if (tv->flow_queue == NULL) {
244  goto error;
245  }
246  /* setup a queue */
247  } else if (slot->tm_id == TMM_FLOWWORKER) {
248  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
249  if (tv->stream_pq_local == NULL)
250  FatalError("failed to alloc PacketQueue");
253  tv->tm_flowworker = slot;
254  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
256  if (tv->flow_queue == NULL) {
258  goto error;
259  }
260  }
261  }
262 
264 
266 
267  return true;
268 
269 error:
270  return false;
271 }
272 
274 {
275  TmSlot *s = tv->tm_slots;
276  bool rc = true;
277 
279 
281 
282  /* process all pseudo packets the flow timeout may throw at us */
284 
287 
289 
290  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
291  if (slot->SlotThreadExitPrintStats != NULL) {
292  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
293  }
294 
295  if (slot->SlotThreadDeinit != NULL) {
296  TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
297  if (r != TM_ECODE_OK) {
299  rc = false;
300  break;
301  }
302  }
303  }
304 
305  tv->stream_pq = NULL;
307  return rc;
308 }
309 
310 static void *TmThreadsSlotPktAcqLoop(void *td)
311 {
312  ThreadVars *tv = (ThreadVars *)td;
313  TmSlot *s = tv->tm_slots;
314  TmEcode r = TM_ECODE_OK;
315 
316  /* check if we are setup properly */
317  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
318  SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
319  " PktAcqLoop=%p, tmqh_in=%p,"
320  " tmqh_out=%p",
321  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
323  pthread_exit(NULL);
324  return NULL;
325  }
326 
327  if (!TmThreadsSlotPktAcqLoopInit(td)) {
328  goto error;
329  }
330 
331  bool run = TmThreadsWaitForUnpause(tv);
332 
333  while (run) {
334  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
335 
336  if (r == TM_ECODE_FAILED) {
338  run = false;
339  }
341  run = false;
342  }
343  if (r == TM_ECODE_DONE) {
344  run = false;
345  }
346  }
348  goto error;
349  }
350 
351  SCLogDebug("%s ending", tv->name);
352  pthread_exit((void *) 0);
353  return NULL;
354 
355 error:
356  pthread_exit(NULL);
357  return NULL;
358 }
359 
360 /**
361  * Also returns if the kill flag is set.
362  */
364 {
367 
368  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
369  SleepUsec(100);
370 
372  return false;
373  }
374 
376  }
377 
378  return true;
379 }
380 
381 static void *TmThreadsLib(void *td)
382 {
383  ThreadVars *tv = (ThreadVars *)td;
384  TmSlot *s = tv->tm_slots;
385 
386  /* check if we are setup properly */
387  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
388  SCLogError("TmSlot or ThreadVars badly setup: s=%p, tmqh_in=%p,"
389  " tmqh_out=%p",
390  s, tv->tmqh_in, tv->tmqh_out);
392  return NULL;
393  }
394 
395  if (!TmThreadsSlotPktAcqLoopInit(tv)) {
396  goto error;
397  }
398 
399  if (!TmThreadsWaitForUnpause(tv)) {
400  goto error;
401  }
402 
403  return NULL;
404 
405 error:
406  tv->stream_pq = NULL;
407  return (void *)-1;
408 }
409 
410 static void *TmThreadsSlotVar(void *td)
411 {
412  ThreadVars *tv = (ThreadVars *)td;
413  TmSlot *s = (TmSlot *)tv->tm_slots;
414  Packet *p = NULL;
415  TmEcode r = TM_ECODE_OK;
416 
418  PacketPoolInit();//Empty();
419 
421 
422  if (tv->thread_setup_flags != 0)
424 
425  /* Drop the capabilities for this thread */
426  SCDropCaps(tv);
427 
428  /* check if we are setup properly */
429  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
431  pthread_exit(NULL);
432  return NULL;
433  }
434 
435  for (; s != NULL; s = s->slot_next) {
436  if (s->SlotThreadInit != NULL) {
437  void *slot_data = NULL;
438  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
439  if (r != TM_ECODE_OK) {
441  goto error;
442  }
443  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
444  }
445 
446  /* special case: we need to access the stream queue
447  * from the flow timeout code */
448 
449  /* if the flowworker module is the first, get the threads input queue */
450  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
451  tv->stream_pq = tv->inq->pq;
452  tv->tm_flowworker = s;
453  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
455  if (tv->flow_queue == NULL) {
457  pthread_exit(NULL);
458  return NULL;
459  }
460  /* setup a queue */
461  } else if (s->tm_id == TMM_FLOWWORKER) {
462  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
463  if (tv->stream_pq_local == NULL)
464  FatalError("failed to alloc PacketQueue");
467  tv->tm_flowworker = s;
468  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
470  if (tv->flow_queue == NULL) {
472  pthread_exit(NULL);
473  return NULL;
474  }
475  }
476  }
477 
479 
480  // Each 'worker' thread uses this func to process/decode the packet read.
481  // Each decode method is different to receive methods in that they do not
482  // enter infinite loops. They use this as the core loop. As a result, at this
483  // point the worker threads can be considered both initialized and running.
485  bool run = TmThreadsWaitForUnpause(tv);
486 
487  s = (TmSlot *)tv->tm_slots;
488 
489  while (run) {
490  /* input a packet */
491  p = tv->tmqh_in(tv);
492 
493  /* if we didn't get a packet see if we need to do some housekeeping */
494  if (unlikely(p == NULL)) {
495  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
497  if (p != NULL) {
498  p->flags |= PKT_PSEUDO_STREAM_END;
500  }
501  }
502  }
503 
504  if (p != NULL) {
505  /* run the thread module(s) */
506  r = TmThreadsSlotVarRun(tv, p, s);
507  if (r == TM_ECODE_FAILED) {
510  break;
511  }
512 
513  /* output the packet */
514  tv->tmqh_out(tv, p);
515 
516  /* now handle the stream pq packets */
517  TmThreadsHandleInjectedPackets(tv);
518  }
519 
521  run = false;
522  }
523  }
525  goto error;
526  }
528 
529  pthread_exit(NULL);
530  return NULL;
531 
532 error:
533  tv->stream_pq = NULL;
534  pthread_exit(NULL);
535  return NULL;
536 }
537 
538 static void *TmThreadsManagement(void *td)
539 {
540  ThreadVars *tv = (ThreadVars *)td;
541  TmSlot *s = (TmSlot *)tv->tm_slots;
542  TmEcode r = TM_ECODE_OK;
543 
544  BUG_ON(s == NULL);
545 
547 
548  if (tv->thread_setup_flags != 0)
550 
551  /* Drop the capabilities for this thread */
552  SCDropCaps(tv);
553 
554  SCLogDebug("%s starting", tv->name);
555 
556  if (s->SlotThreadInit != NULL) {
557  void *slot_data = NULL;
558  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
559  if (r != TM_ECODE_OK) {
561  pthread_exit(NULL);
562  return NULL;
563  }
564  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
565  }
566 
568 
570 
571  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
572  /* handle error */
573  if (r == TM_ECODE_FAILED) {
575  }
576 
579  }
580 
583 
584  if (s->SlotThreadExitPrintStats != NULL) {
585  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
586  }
587 
588  if (s->SlotThreadDeinit != NULL) {
589  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
590  if (r != TM_ECODE_OK) {
592  pthread_exit(NULL);
593  return NULL;
594  }
595  }
596 
598  pthread_exit((void *) 0);
599  return NULL;
600 }
601 
602 /**
603  * \brief We set the slot functions.
604  *
605  * \param tv Pointer to the TV to set the slot function for.
606  * \param name Name of the slot variant.
607  * \param fn_p Pointer to a custom slot function. Used only if slot variant
608  * "name" is "custom".
609  *
610  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
611  */
612 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
613 {
614  if (name == NULL) {
615  if (fn_p == NULL) {
616  printf("Both slot name and function pointer can't be NULL inside "
617  "TmThreadSetSlots\n");
618  goto error;
619  } else {
620  name = "custom";
621  }
622  }
623 
624  if (strcmp(name, "varslot") == 0) {
625  tv->tm_func = TmThreadsSlotVar;
626  } else if (strcmp(name, "pktacqloop") == 0) {
627  tv->tm_func = TmThreadsSlotPktAcqLoop;
628  } else if (strcmp(name, "management") == 0) {
629  tv->tm_func = TmThreadsManagement;
630  } else if (strcmp(name, "command") == 0) {
631  tv->tm_func = TmThreadsManagement;
632  } else if (strcmp(name, "lib") == 0) {
633  tv->tm_func = TmThreadsLib;
634  } else if (strcmp(name, "custom") == 0) {
635  if (fn_p == NULL)
636  goto error;
637  tv->tm_func = fn_p;
638  } else {
639  printf("Error: Slot \"%s\" not supported\n", name);
640  goto error;
641  }
642 
643  return TM_ECODE_OK;
644 
645 error:
646  return TM_ECODE_FAILED;
647 }
648 
649 /**
650  * \brief Appends a new entry to the slots.
651  *
652  * \param tv TV the slot is attached to.
653  * \param tm TM to append.
654  * \param data Data to be passed on to the slot init function.
655  *
656  * \retval The allocated TmSlot or NULL if there is an error
657  */
658 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
659 {
660  TmSlot *slot = SCCalloc(1, sizeof(TmSlot));
661  if (unlikely(slot == NULL))
662  return;
663  SC_ATOMIC_INITPTR(slot->slot_data);
664  slot->SlotThreadInit = tm->ThreadInit;
665  slot->slot_initdata = data;
666  if (tm->Func) {
667  slot->SlotFunc = tm->Func;
668  } else if (tm->PktAcqLoop) {
669  slot->PktAcqLoop = tm->PktAcqLoop;
670  if (tm->PktAcqBreakLoop) {
671  tv->break_loop = true;
672  }
673  } else if (tm->Management) {
674  slot->Management = tm->Management;
675  }
677  slot->SlotThreadDeinit = tm->ThreadDeinit;
678  /* we don't have to check for the return value "-1". We wouldn't have
679  * received a TM as arg, if it didn't exist */
680  slot->tm_id = TmModuleGetIDForTM(tm);
681  slot->tm_flags |= tm->flags;
682 
683  tv->tmm_flags |= tm->flags;
684  tv->cap_flags |= tm->cap_flags;
685 
686  if (tv->tm_slots == NULL) {
687  tv->tm_slots = slot;
688  } else {
689  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
690 
691  /* get the last slot */
692  for ( ; a != NULL; a = a->slot_next) {
693  b = a;
694  }
695  /* append the new slot */
696  if (b != NULL) {
697  b->slot_next = slot;
698  }
699  }
700 }
701 
702 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
703 static int SetCPUAffinitySet(cpu_set_t *cs)
704 {
705 #if defined OS_FREEBSD
706  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
707  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
708 #elif OS_DARWIN
709  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
710  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
711 #else
712  pid_t tid = (pid_t)syscall(SYS_gettid);
713  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
714 #endif /* OS_FREEBSD */
715 
716  if (r != 0) {
717  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
718  strerror(errno));
719  return -1;
720  }
721 
722  return 0;
723 }
724 #endif
725 
726 
727 /**
728  * \brief Set the thread affinity on the calling thread.
729  *
730  * \param cpuid Id of the core/cpu to setup the affinity.
731  *
732  * \retval 0 If all goes well; -1 if something is wrong.
733  */
734 static int SetCPUAffinity(uint16_t cpuid)
735 {
736 #if defined __OpenBSD__ || defined sun
737  return 0;
738 #else
739  int cpu = (int)cpuid;
740 
741 #if defined OS_WIN32 || defined __CYGWIN__
742  DWORD cs = 1 << cpu;
743 
744  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
745  if (r != 0) {
746  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
747  strerror(errno));
748  return -1;
749  }
750  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
751  SCGetThreadIdLong(), cpu);
752 
753  return 0;
754 
755 #else
756  cpu_set_t cs;
757  memset(&cs, 0, sizeof(cs));
758 
759  CPU_ZERO(&cs);
760  CPU_SET(cpu, &cs);
761  return SetCPUAffinitySet(&cs);
762 #endif /* windows */
763 #endif /* not supported */
764 }
765 
766 
767 /**
768  * \brief Set the thread options (thread priority).
769  *
770  * \param tv Pointer to the ThreadVars to setup the thread priority.
771  *
772  * \retval TM_ECODE_OK.
773  */
775 {
777  tv->thread_priority = prio;
778 
779  return TM_ECODE_OK;
780 }
781 
782 /**
783  * \brief Adjusting nice value for threads.
784  */
786 {
787  SCEnter();
788 #ifndef __CYGWIN__
789 #ifdef OS_WIN32
790  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
791  SCLogError("Error setting priority for "
792  "thread %s: %s",
793  tv->name, strerror(errno));
794  } else {
795  SCLogDebug("Priority set to %"PRId32" for thread %s",
797  }
798 #else
799  int ret = nice(tv->thread_priority);
800  if (ret == -1) {
801  SCLogError("Error setting nice value %d "
802  "for thread %s: %s",
803  tv->thread_priority, tv->name, strerror(errno));
804  } else {
805  SCLogDebug("Nice value set to %"PRId32" for thread %s",
807  }
808 #endif /* OS_WIN32 */
809 #endif
810  SCReturn;
811 }
812 
813 
814 /**
815  * \brief Set the thread options (cpu affinity).
816  *
817  * \param tv pointer to the ThreadVars to setup the affinity.
818  * \param cpu cpu on which affinity is set.
819  *
820  * \retval TM_ECODE_OK
821  */
823 {
825  tv->cpu_affinity = cpu;
826 
827  return TM_ECODE_OK;
828 }
829 
830 
832 {
834  return TM_ECODE_OK;
835 
836  if (type > MAX_CPU_SET) {
837  SCLogError("invalid cpu type family");
838  return TM_ECODE_FAILED;
839  }
840 
842  tv->cpu_affinity = type;
843 
844  return TM_ECODE_OK;
845 }
846 
848 {
849  if (type >= MAX_CPU_SET) {
850  SCLogError("invalid cpu type family");
851  return 0;
852  }
853 
855 }
856 
857 /**
858  * \brief Set the thread options (cpu affinitythread).
859  * Priority should be already set by pthread_create.
860  *
861  * \param tv pointer to the ThreadVars of the calling thread.
862  */
864 {
866  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
867  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
869  SetCPUAffinity(tv->cpu_affinity);
870  }
871 
872 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
877  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
878  uint16_t cpu = AffinityGetNextCPU(taf);
879  SetCPUAffinity(cpu);
880  /* If CPU is in a set overwrite the default thread prio */
881  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
883  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
885  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
887  } else {
888  tv->thread_priority = taf->prio;
889  }
890  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
891  "%d, thread id %lu", tv->thread_priority,
892  tv->name, cpu, SCGetThreadIdLong());
893  } else {
894  SetCPUAffinitySet(&taf->cpu_set);
895  tv->thread_priority = taf->prio;
896  SCLogPerf("Setting prio %d for thread \"%s\", "
897  "thread id %lu", tv->thread_priority,
899  }
901  }
902 #endif
903 
904  return TM_ECODE_OK;
905 }
906 
907 /**
908  * \brief Creates and returns the TV instance for a new thread.
909  *
910  * \param name Name of this TV instance
911  * \param inq_name Incoming queue name
912  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
913  * \param outq_name Outgoing queue name
914  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
915  * \param slots String representation for the slot function to be used
916  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
917  * \param mucond Flag to indicate whether to initialize the condition
918  * and the mutex variables for this newly created TV.
919  *
920  * \retval the newly created TV instance, or NULL on error
921  */
922 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
923  const char *outq_name, const char *outqh_name, const char *slots,
924  void * (*fn_p)(void *), int mucond)
925 {
926  ThreadVars *tv = NULL;
927  Tmq *tmq = NULL;
928  Tmqh *tmqh = NULL;
929 
930  SCLogDebug("creating thread \"%s\"...", name);
931 
932  /* XXX create separate function for this: allocate a thread container */
933  tv = SCCalloc(1, sizeof(ThreadVars) + ThreadStorageSize());
934  if (unlikely(tv == NULL))
935  goto error;
936 
937  SC_ATOMIC_INIT(tv->flags);
938  SCMutexInit(&tv->perf_public_ctx.m, NULL);
939 
940  strlcpy(tv->name, name, sizeof(tv->name));
941 
942  /* default state for every newly created thread */
944 
945  /* set the incoming queue */
946  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
947  SCLogDebug("inq_name \"%s\"", inq_name);
948 
949  tmq = TmqGetQueueByName(inq_name);
950  if (tmq == NULL) {
951  tmq = TmqCreateQueue(inq_name);
952  if (tmq == NULL)
953  goto error;
954  }
955  SCLogDebug("tmq %p", tmq);
956 
957  tv->inq = tmq;
958  tv->inq->reader_cnt++;
959  SCLogDebug("tv->inq %p", tv->inq);
960  }
961  if (inqh_name != NULL) {
962  SCLogDebug("inqh_name \"%s\"", inqh_name);
963 
964  int id = TmqhNameToID(inqh_name);
965  if (id <= 0) {
966  goto error;
967  }
968  tmqh = TmqhGetQueueHandlerByName(inqh_name);
969  if (tmqh == NULL)
970  goto error;
971 
972  tv->tmqh_in = tmqh->InHandler;
973  tv->inq_id = (uint8_t)id;
974  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
975  }
976 
977  /* set the outgoing queue */
978  if (outqh_name != NULL) {
979  SCLogDebug("outqh_name \"%s\"", outqh_name);
980 
981  int id = TmqhNameToID(outqh_name);
982  if (id <= 0) {
983  goto error;
984  }
985 
986  tmqh = TmqhGetQueueHandlerByName(outqh_name);
987  if (tmqh == NULL)
988  goto error;
989 
990  tv->tmqh_out = tmqh->OutHandler;
991  tv->outq_id = (uint8_t)id;
992 
993  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
994  SCLogDebug("outq_name \"%s\"", outq_name);
995 
996  if (tmqh->OutHandlerCtxSetup != NULL) {
997  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
998  if (tv->outctx == NULL)
999  goto error;
1000  tv->outq = NULL;
1001  } else {
1002  tmq = TmqGetQueueByName(outq_name);
1003  if (tmq == NULL) {
1004  tmq = TmqCreateQueue(outq_name);
1005  if (tmq == NULL)
1006  goto error;
1007  }
1008  SCLogDebug("tmq %p", tmq);
1009 
1010  tv->outq = tmq;
1011  tv->outctx = NULL;
1012  tv->outq->writer_cnt++;
1013  }
1014  }
1015  }
1016 
1017  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1018  goto error;
1019  }
1020 
1021  if (mucond != 0)
1022  TmThreadInitMC(tv);
1023 
1025 
1026  return tv;
1027 
1028 error:
1029  SCLogError("failed to setup a thread");
1030 
1031  if (tv != NULL)
1032  SCFree(tv);
1033  return NULL;
1034 }
1035 
1036 /**
1037  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1038  * This function doesn't support custom slots, and hence shouldn't be
1039  * supplied \"custom\" as its slot type. All PPT threads are created
1040  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1041  * conditional variables are not used to kill the thread.
1042  *
1043  * \param name Name of this TV instance
1044  * \param inq_name Incoming queue name
1045  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1046  * \param outq_name Outgoing queue name
1047  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1048  * \param slots String representation for the slot function to be used
1049  *
1050  * \retval the newly created TV instance, or NULL on error
1051  */
1052 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1053  const char *inqh_name, const char *outq_name,
1054  const char *outqh_name, const char *slots)
1055 {
1056  ThreadVars *tv = NULL;
1057 
1058  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1059  slots, NULL, 0);
1060 
1061  if (tv != NULL) {
1062  tv->type = TVT_PPT;
1064  }
1065 
1066  return tv;
1067 }
1068 
1069 /**
1070  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1071  * This function supports only custom slot functions and hence a
1072  * function pointer should be sent as an argument.
1073  *
1074  * \param name Name of this TV instance
1075  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1076  * \param mucond Flag to indicate whether to initialize the condition
1077  * and the mutex variables for this newly created TV.
1078  *
1079  * \retval the newly created TV instance, or NULL on error
1080  */
1081 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1082  int mucond)
1083 {
1084  ThreadVars *tv = NULL;
1085 
1086  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1087 
1088  if (tv != NULL) {
1089  tv->type = TVT_MGMT;
1092  }
1093 
1094  return tv;
1095 }
1096 
1097 /**
1098  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1099  * This function supports only custom slot functions and hence a
1100  * function pointer should be sent as an argument.
1101  *
1102  * \param name Name of this TV instance
1103  * \param module Name of TmModule with MANAGEMENT flag set.
1104  * \param mucond Flag to indicate whether to initialize the condition
1105  * and the mutex variables for this newly created TV.
1106  *
1107  * \retval the newly created TV instance, or NULL on error
1108  */
1109 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1110  int mucond)
1111 {
1112  ThreadVars *tv = NULL;
1113 
1114  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1115 
1116  if (tv != NULL) {
1117  tv->type = TVT_MGMT;
1120 
1121  TmModule *m = TmModuleGetByName(module);
1122  if (m) {
1123  TmSlotSetFuncAppend(tv, m, NULL);
1124  }
1125  }
1126 
1127  return tv;
1128 }
1129 
1130 /**
1131  * \brief Creates and returns the TV instance for a Command thread (CMD).
1132  * This function supports only custom slot functions and hence a
1133  * function pointer should be sent as an argument.
1134  *
1135  * \param name Name of this TV instance
1136  * \param module Name of TmModule with COMMAND flag set.
1137  * \param mucond Flag to indicate whether to initialize the condition
1138  * and the mutex variables for this newly created TV.
1139  *
1140  * \retval the newly created TV instance, or NULL on error
1141  */
1142 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1143  int mucond)
1144 {
1145  ThreadVars *tv = NULL;
1146 
1147  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1148 
1149  if (tv != NULL) {
1150  tv->type = TVT_CMD;
1153 
1154  TmModule *m = TmModuleGetByName(module);
1155  if (m) {
1156  TmSlotSetFuncAppend(tv, m, NULL);
1157  }
1158  }
1159 
1160  return tv;
1161 }
1162 
1163 /**
1164  * \brief Appends this TV to tv_root based on its type
1165  *
1166  * \param type holds the type this TV belongs to.
1167  */
1169 {
1171 
1172  if (tv_root[type] == NULL) {
1173  tv_root[type] = tv;
1174  tv->next = NULL;
1175 
1177 
1178  return;
1179  }
1180 
1181  ThreadVars *t = tv_root[type];
1182 
1183  while (t) {
1184  if (t->next == NULL) {
1185  t->next = tv;
1186  tv->next = NULL;
1187  break;
1188  }
1189 
1190  t = t->next;
1191  }
1192 
1194 }
1195 
1196 static bool ThreadStillHasPackets(ThreadVars *tv)
1197 {
1198  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1199  /* we wait till we dry out all the inq packets, before we
1200  * kill this thread. Do note that you should have disabled
1201  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1202  PacketQueue *q = tv->inq->pq;
1203  SCMutexLock(&q->mutex_q);
1204  uint32_t len = q->len;
1205  SCMutexUnlock(&q->mutex_q);
1206  if (len != 0) {
1207  return true;
1208  }
1209  }
1210 
1211  if (tv->stream_pq != NULL) {
1213  uint32_t len = tv->stream_pq->len;
1215 
1216  if (len != 0) {
1217  return true;
1218  }
1219  }
1220  return false;
1221 }
1222 
1223 /**
1224  * \brief Kill a thread.
1225  *
1226  * \param tv A ThreadVars instance corresponding to the thread that has to be
1227  * killed.
1228  *
1229  * \retval r 1 killed successfully
1230  * 0 not yet ready, needs another look
1231  */
1232 static int TmThreadKillThread(ThreadVars *tv)
1233 {
1234  BUG_ON(tv == NULL);
1235 
1236  /* kill only once :) */
1237  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1238  return 1;
1239  }
1240 
1241  /* set the thread flag informing the thread that it needs to be
1242  * terminated */
1245 
1246  /* to be sure, signal more */
1247  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1248  if (tv->inq_id != TMQH_NOT_SET) {
1250  if (qh != NULL && qh->InShutdownHandler != NULL) {
1251  qh->InShutdownHandler(tv);
1252  }
1253  }
1254  if (tv->inq != NULL) {
1255  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1256  SCMutexLock(&tv->inq->pq->mutex_q);
1257  SCCondSignal(&tv->inq->pq->cond_q);
1258  SCMutexUnlock(&tv->inq->pq->mutex_q);
1259  }
1260  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1261  }
1262 
1263  if (tv->ctrl_cond != NULL ) {
1265  pthread_cond_broadcast(tv->ctrl_cond);
1267  }
1268  return 0;
1269  }
1270 
1271  if (tv->outctx != NULL) {
1272  if (tv->outq_id != TMQH_NOT_SET) {
1274  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1275  qh->OutHandlerCtxFree(tv->outctx);
1276  tv->outctx = NULL;
1277  }
1278  }
1279  }
1280 
1281  /* Join the thread and flag as dead, unless the thread ID is 0 as
1282  * its not a thread created by Suricata. */
1283  if (tv->t) {
1284  pthread_join(tv->t, NULL);
1285  SCLogDebug("thread %s stopped", tv->name);
1286  }
1288  return 1;
1289 }
1290 
1291 static bool ThreadBusy(ThreadVars *tv)
1292 {
1293  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
1294  TmModule *tm = TmModuleGetById(s->tm_id);
1295  if (tm && tm->ThreadBusy != NULL) {
1296  if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
1297  return true;
1298  }
1299  }
1300  return false;
1301 }
1302 
1303 /** \internal
1304  *
1305  * \brief make sure that all packet threads are done processing their
1306  * in-flight packets, including 'injected' flow packets.
1307  */
1308 static void TmThreadDrainPacketThreads(void)
1309 {
1310  ThreadVars *tv = NULL;
1311  struct timeval start_ts;
1312  struct timeval cur_ts;
1313  gettimeofday(&start_ts, NULL);
1314 
1315 again:
1316  gettimeofday(&cur_ts, NULL);
1317  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1318  SCLogWarning("unable to get all packet threads "
1319  "to process their packets in time");
1320  return;
1321  }
1322 
1324 
1325  /* all receive threads are part of packet processing threads */
1326  tv = tv_root[TVT_PPT];
1327  while (tv) {
1328  if (ThreadStillHasPackets(tv)) {
1329  /* we wait till we dry out all the inq packets, before we
1330  * kill this thread. Do note that you should have disabled
1331  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1333 
1334  /* sleep outside lock */
1335  SleepMsec(1);
1336  goto again;
1337  }
1338  if (ThreadBusy(tv)) {
1340 
1341  Packet *p = PacketGetFromAlloc();
1342  if (p != NULL) {
1345  PacketQueue *q = tv->stream_pq;
1346  SCMutexLock(&q->mutex_q);
1347  PacketEnqueue(q, p);
1348  SCCondSignal(&q->cond_q);
1349  SCMutexUnlock(&q->mutex_q);
1350  }
1351 
1352  /* don't sleep while holding a lock */
1353  SleepMsec(1);
1354  goto again;
1355  }
1356  tv = tv->next;
1357  }
1358 
1360 }
1361 
1362 /**
1363  * \brief Disable all threads having the specified TMs.
1364  *
1365  * Breaks out of the packet acquisition loop, and bumps
1366  * into the 'flow loop', where it will process packets
1367  * from the flow engine's shutdown handling.
1368  */
1370 {
1371  ThreadVars *tv = NULL;
1372  struct timeval start_ts;
1373  struct timeval cur_ts;
1374  gettimeofday(&start_ts, NULL);
1375 
1376 again:
1377  gettimeofday(&cur_ts, NULL);
1378  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1379  FatalError("Engine unable to disable detect "
1380  "thread - \"%s\". Killing engine",
1381  tv->name);
1382  }
1383 
1385 
1386  /* all receive threads are part of packet processing threads */
1387  tv = tv_root[TVT_PPT];
1388 
1389  /* we do have to keep in mind that TVs are arranged in the order
1390  * right from receive to log. The moment we fail to find a
1391  * receive TM amongst the slots in a tv, it indicates we are done
1392  * with all receive threads */
1393  while (tv) {
1394  int disable = 0;
1395  TmModule *tm = NULL;
1396  /* obtain the slots for this TV */
1397  TmSlot *slots = tv->tm_slots;
1398  while (slots != NULL) {
1399  tm = TmModuleGetById(slots->tm_id);
1400 
1401  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1402  disable = 1;
1403  break;
1404  }
1405 
1406  slots = slots->slot_next;
1407  continue;
1408  }
1409 
1410  if (disable) {
1411  if (ThreadStillHasPackets(tv)) {
1412  /* we wait till we dry out all the inq packets, before we
1413  * kill this thread. Do note that you should have disabled
1414  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1416  /* don't sleep while holding a lock */
1417  SleepMsec(1);
1418  goto again;
1419  }
1420 
1421  if (ThreadBusy(tv)) {
1423 
1424  Packet *p = PacketGetFromAlloc();
1425  if (p != NULL) {
1428  PacketQueue *q = tv->stream_pq;
1429  SCMutexLock(&q->mutex_q);
1430  PacketEnqueue(q, p);
1431  SCCondSignal(&q->cond_q);
1432  SCMutexUnlock(&q->mutex_q);
1433  }
1434 
1435  /* don't sleep while holding a lock */
1436  SleepMsec(1);
1437  goto again;
1438  }
1439 
1440  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1441  if (tm && tm->PktAcqBreakLoop != NULL) {
1442  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1443  }
1445 
1446  if (tv->inq != NULL) {
1447  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1448  SCMutexLock(&tv->inq->pq->mutex_q);
1449  SCCondSignal(&tv->inq->pq->cond_q);
1450  SCMutexUnlock(&tv->inq->pq->mutex_q);
1451  }
1452  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1453  }
1454 
1455  /* wait for it to enter the 'flow loop' stage */
1456  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1458 
1459  SleepMsec(1);
1460  goto again;
1461  }
1462  }
1463 
1464  tv = tv->next;
1465  }
1466 
1468 
1469  /* finally wait for all packet threads to have
1470  * processed all of their 'live' packets so we
1471  * don't process the last live packets together
1472  * with FFR packets */
1473  TmThreadDrainPacketThreads();
1474 }
1475 
1476 #ifdef DEBUG_VALIDATION
1477 static void TmThreadDumpThreads(void);
1478 #endif
1479 
1480 static void TmThreadDebugValidateNoMorePackets(void)
1481 {
1482 #ifdef DEBUG_VALIDATION
1484  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1485  if (ThreadStillHasPackets(tv)) {
1487  TmThreadDumpThreads();
1488  abort();
1489  }
1490  }
1492 #endif
1493 }
1494 
1495 /**
1496  * \brief Disable all packet threads
1497  * \param set flag to set
1498  * \param check flag to check
1499  *
1500  * Support 2 stages in shutting down the packet threads:
1501  * 1. set THV_REQ_FLOW_LOOP and wait for THV_FLOW_LOOP
1502  * 2. set THV_KILL and wait for THV_RUNNING_DONE
1503  *
1504  * During step 1 the main loop is exited, and the flow loop logic is entered.
1505  * During step 2, the flow loop logic is done and the thread closes.
1506  */
1507 void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check)
1508 {
1509  struct timeval start_ts;
1510  struct timeval cur_ts;
1511 
1512  /* first drain all packet threads of their packets */
1513  TmThreadDrainPacketThreads();
1514 
1515  /* since all the threads possibly able to produce more packets
1516  * are now gone or inactive, we should see no packets anywhere
1517  * anymore. */
1518  TmThreadDebugValidateNoMorePackets();
1519 
1520  gettimeofday(&start_ts, NULL);
1521 again:
1522  gettimeofday(&cur_ts, NULL);
1523  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1524  FatalError("Engine unable to disable packet "
1525  "threads. Killing engine");
1526  }
1527 
1528  /* loop through the packet threads and kill them */
1530  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1531  TmThreadsSetFlag(tv, set);
1532 
1533  /* separate worker threads (autofp) will still wait at their
1534  * input queues. So nudge them here so they will observe the
1535  * THV_KILL flag. */
1536  if (tv->inq != NULL) {
1537  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1538  SCMutexLock(&tv->inq->pq->mutex_q);
1539  SCCondSignal(&tv->inq->pq->cond_q);
1540  SCMutexUnlock(&tv->inq->pq->mutex_q);
1541  }
1542  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1543  }
1544 
1545  /* wait for it to reach the expected state */
1546  while (!TmThreadsCheckFlag(tv, check)) {
1548 
1549  SleepMsec(1);
1550  goto again;
1551  }
1552  }
1554 }
1555 
1556 #define MIN_WAIT_TIME 100
1557 #define MAX_WAIT_TIME 999999
1559 {
1560  ThreadVars *tv = NULL;
1561  unsigned int sleep_usec = MIN_WAIT_TIME;
1562 
1563  BUG_ON((family < 0) || (family >= TVT_MAX));
1564 
1565 again:
1567  tv = tv_root[family];
1568 
1569  while (tv) {
1570  int r = TmThreadKillThread(tv);
1571  if (r == 0) {
1573  SleepUsec(sleep_usec);
1574  sleep_usec *= 2; /* slowly back off */
1575  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1576  goto again;
1577  }
1578  sleep_usec = MIN_WAIT_TIME; /* reset */
1579 
1580  tv = tv->next;
1581  }
1583 }
1584 #undef MIN_WAIT_TIME
1585 #undef MAX_WAIT_TIME
1586 
1588 {
1589  int i = 0;
1590 
1591  for (i = 0; i < TVT_MAX; i++) {
1593  }
1594 }
1595 
1596 static void TmThreadFree(ThreadVars *tv)
1597 {
1598  TmSlot *s;
1599  TmSlot *ps;
1600  if (tv == NULL)
1601  return;
1602 
1603  SCLogDebug("Freeing thread '%s'.", tv->name);
1604 
1606 
1607  if (tv->flow_queue) {
1608  BUG_ON(tv->flow_queue->qlen != 0);
1609  SCFree(tv->flow_queue);
1610  }
1611 
1613 
1614  TmThreadDeinitMC(tv);
1615 
1616  if (tv->thread_group_name) {
1618  }
1619 
1620  if (tv->printable_name) {
1622  }
1623 
1624  if (tv->stream_pq_local) {
1628  }
1629 
1630  s = (TmSlot *)tv->tm_slots;
1631  while (s) {
1632  ps = s;
1633  s = s->slot_next;
1634  SCFree(ps);
1635  }
1636 
1638  SCFree(tv);
1639 }
1640 
1642 {
1643  char *thread_group_name = NULL;
1644 
1645  if (name == NULL)
1646  return;
1647 
1648  if (tv == NULL)
1649  return;
1650 
1651  thread_group_name = SCStrdup(name);
1652  if (unlikely(thread_group_name == NULL)) {
1653  SCLogError("error allocating memory");
1654  return;
1655  }
1656  tv->thread_group_name = thread_group_name;
1657 }
1658 
1660 {
1661  ThreadVars *tv = NULL;
1662  ThreadVars *ptv = NULL;
1663 
1664  if ((family < 0) || (family >= TVT_MAX))
1665  return;
1666 
1668  tv = tv_root[family];
1669 
1670  while (tv) {
1671  ptv = tv;
1672  tv = tv->next;
1673  TmThreadFree(ptv);
1674  }
1675  tv_root[family] = NULL;
1677 }
1678 
1679 /**
1680  * \brief Spawns a thread associated with the ThreadVars instance tv
1681  *
1682  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1683  */
1685 {
1686  pthread_attr_t attr;
1687  if (tv->tm_func == NULL) {
1688  FatalError("No thread function set");
1689  }
1690 
1691  /* Initialize and set thread detached attribute */
1692  pthread_attr_init(&attr);
1693 
1694  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1695 
1696  /* Adjust thread stack size if configured */
1698  SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1699  if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1700  FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
1702  }
1703  }
1704 
1705  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1706  if (rc) {
1707  FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc,
1708  strerror(errno));
1709  }
1710 
1711 #if DEBUG && HAVE_PTHREAD_GETATTR_NP
1713  if (pthread_getattr_np(tv->t, &attr) == 0) {
1714  size_t stack_size;
1715  void *stack_addr;
1716  pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1717  SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1718  } else {
1719  SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1720  "pthread_getattr_np() is %" PRId32,
1721  rc);
1722  }
1723  }
1724 #endif
1725 
1727 
1728  TmThreadAppend(tv, tv->type);
1729  return TM_ECODE_OK;
1730 }
1731 
1732 /**
1733  * \brief Spawns a "fake" lib thread associated with the ThreadVars instance tv
1734  *
1735  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1736  */
1738 {
1739  if (tv->tm_func == NULL) {
1740  printf("ERROR: no thread function set\n");
1741  return TM_ECODE_FAILED;
1742  }
1743 
1744  if (tv->tm_func((void *)tv) == (void *)-1) {
1745  return TM_ECODE_FAILED;
1746  }
1747 
1749 
1750  return TM_ECODE_OK;
1751 }
1752 
1753 /**
1754  * \brief Initializes the mutex and condition variables for this TV
1755  *
1756  * It can be used by a thread to control a wait loop that can also be
1757  * influenced by other threads.
1758  *
1759  * \param tv Pointer to a TV instance
1760  */
1762 {
1763  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1764  FatalError("Fatal error encountered in TmThreadInitMC. "
1765  "Exiting...");
1766  }
1767 
1768  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1769  printf("Error initializing the tv->m mutex\n");
1770  exit(EXIT_FAILURE);
1771  }
1772 
1773  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1774  FatalError("Fatal error encountered in TmThreadInitMC. "
1775  "Exiting...");
1776  }
1777 
1778  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1779  FatalError("Error initializing the tv->cond condition "
1780  "variable");
1781  }
1782 }
1783 
1784 static void TmThreadDeinitMC(ThreadVars *tv)
1785 {
1786  if (tv->ctrl_mutex) {
1788  SCFree(tv->ctrl_mutex);
1789  }
1790  if (tv->ctrl_cond) {
1792  SCFree(tv->ctrl_cond);
1793  }
1794 }
1795 
1796 /**
1797  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1798  * the kill flag has been set or not on the thread.
1799  *
1800  * \param tv Pointer to the TV instance.
1801  */
1803 {
1804  while (!TmThreadsCheckFlag(tv, flags)) {
1805  SleepUsec(100);
1806  }
1807 }
1808 
1809 /**
1810  * \brief Unpauses a thread
1811  *
1812  * \param tv Pointer to a TV instance that has to be unpaused
1813  */
1815 {
1817 }
1818 
1819 static TmEcode WaitOnThreadsRunningByType(const int t)
1820 {
1821  struct timeval start_ts;
1822  struct timeval cur_ts;
1823  uint32_t thread_cnt = 0;
1824 
1825  /* on retries, this will init to the last thread that started up already */
1826  ThreadVars *tv_start = tv_root[t];
1828  for (ThreadVars *tv = tv_start; tv != NULL; tv = tv->next) {
1829  thread_cnt++;
1830  }
1832 
1833  /* give threads a second each to start up, plus a margin of a minute. */
1834  uint32_t time_budget = 60 + thread_cnt;
1835 
1836  gettimeofday(&start_ts, NULL);
1837 again:
1839  ThreadVars *tv = tv_start;
1840  while (tv != NULL) {
1843 
1844  SCLogError("thread \"%s\" failed to "
1845  "start: flags %04x",
1846  tv->name, SC_ATOMIC_GET(tv->flags));
1847  return TM_ECODE_FAILED;
1848  }
1849 
1852 
1853  /* 60 seconds provided for the thread to transition from
1854  * THV_INIT_DONE to THV_RUNNING */
1855  gettimeofday(&cur_ts, NULL);
1856  if (((uint32_t)cur_ts.tv_sec - (uint32_t)start_ts.tv_sec) > time_budget) {
1857  SCLogError("thread \"%s\" failed to "
1858  "start in time: flags %04x. Total threads: %u. Time budget %us",
1859  tv->name, SC_ATOMIC_GET(tv->flags), thread_cnt, time_budget);
1860  return TM_ECODE_FAILED;
1861  }
1862 
1863  /* sleep a little to give the thread some
1864  * time to start running */
1865  SleepUsec(100);
1866  goto again;
1867  }
1868  tv_start = tv;
1869 
1870  tv = tv->next;
1871  }
1873  return TM_ECODE_OK;
1874 }
1875 
1876 /**
1877  * \brief Waits for all threads to be in a running state
1878  *
1879  * \retval TM_ECODE_OK if all are running or error if a thread failed
1880  */
1882 {
1883  uint16_t RX_num = 0;
1884  uint16_t W_num = 0;
1885  uint16_t FM_num = 0;
1886  uint16_t FR_num = 0;
1887  uint16_t TX_num = 0;
1888 
1889  for (int i = 0; i < TVT_MAX; i++) {
1890  if (WaitOnThreadsRunningByType(i) != TM_ECODE_OK)
1891  return TM_ECODE_FAILED;
1892  }
1893 
1895  for (int i = 0; i < TVT_MAX; i++) {
1896  for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
1897  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1898  RX_num++;
1899  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1900  W_num++;
1901  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1902  TX_num++;
1903  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1904  FM_num++;
1905  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1906  FR_num++;
1907  }
1908  }
1910 
1911  /* Construct a welcome string displaying
1912  * initialized thread types and counts */
1913  uint16_t app_len = 32;
1914  uint16_t buf_len = 256;
1915 
1916  char append_str[app_len];
1917  char thread_counts[buf_len];
1918 
1919  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1920  if (RX_num > 0) {
1921  snprintf(append_str, app_len, "RX: %u ", RX_num);
1922  strlcat(thread_counts, append_str, buf_len);
1923  }
1924  if (W_num > 0) {
1925  snprintf(append_str, app_len, "W: %u ", W_num);
1926  strlcat(thread_counts, append_str, buf_len);
1927  }
1928  if (TX_num > 0) {
1929  snprintf(append_str, app_len, "TX: %u ", TX_num);
1930  strlcat(thread_counts, append_str, buf_len);
1931  }
1932  if (FM_num > 0) {
1933  snprintf(append_str, app_len, "FM: %u ", FM_num);
1934  strlcat(thread_counts, append_str, buf_len);
1935  }
1936  if (FR_num > 0) {
1937  snprintf(append_str, app_len, "FR: %u ", FR_num);
1938  strlcat(thread_counts, append_str, buf_len);
1939  }
1940  snprintf(append_str, app_len, " Engine started.");
1941  strlcat(thread_counts, append_str, buf_len);
1942  SCLogNotice("%s", thread_counts);
1943 
1944  return TM_ECODE_OK;
1945 }
1946 
1947 /**
1948  * \brief Unpauses all threads present in tv_root
1949  */
1951 {
1953  for (int i = 0; i < TVT_MAX; i++) {
1954  ThreadVars *tv = tv_root[i];
1955  while (tv != NULL) {
1957  tv = tv->next;
1958  }
1959  }
1961 }
1962 
1963 /**
1964  * \brief Used to check the thread for certain conditions of failure.
1965  */
1967 {
1969  for (int i = 0; i < TVT_MAX; i++) {
1970  ThreadVars *tv = tv_root[i];
1971  while (tv) {
1973  FatalError("thread %s failed", tv->name);
1974  }
1975  tv = tv->next;
1976  }
1977  }
1979 }
1980 
1981 /**
1982  * \brief Used to check if all threads have finished their initialization. On
1983  * finding an un-initialized thread, it waits till that thread completes
1984  * its initialization, before proceeding to the next thread.
1985  *
1986  * \retval TM_ECODE_OK all initialized properly
1987  * \retval TM_ECODE_FAILED failure
1988  */
1990 {
1991  struct timeval start_ts;
1992  struct timeval cur_ts;
1993  gettimeofday(&start_ts, NULL);
1994 
1995 again:
1997  for (int i = 0; i < TVT_MAX; i++) {
1998  ThreadVars *tv = tv_root[i];
1999  while (tv != NULL) {
2002 
2003  SCLogError("thread \"%s\" failed to "
2004  "initialize: flags %04x",
2005  tv->name, SC_ATOMIC_GET(tv->flags));
2006  return TM_ECODE_FAILED;
2007  }
2008 
2009  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
2011 
2012  gettimeofday(&cur_ts, NULL);
2013  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
2014  SCLogError("thread \"%s\" failed to "
2015  "initialize in time: flags %04x",
2016  tv->name, SC_ATOMIC_GET(tv->flags));
2017  return TM_ECODE_FAILED;
2018  }
2019 
2020  /* sleep a little to give the thread some
2021  * time to finish initialization */
2022  SleepUsec(100);
2023  goto again;
2024  }
2025 
2028  SCLogError("thread \"%s\" failed to "
2029  "initialize.",
2030  tv->name);
2031  return TM_ECODE_FAILED;
2032  }
2035  SCLogError("thread \"%s\" closed on "
2036  "initialization.",
2037  tv->name);
2038  return TM_ECODE_FAILED;
2039  }
2040 
2041  tv = tv->next;
2042  }
2043  }
2045 
2046  return TM_ECODE_OK;
2047 }
2048 
2049 /**
2050  * \brief returns a count of all the threads that match the flag
2051  */
2053 {
2054  uint32_t cnt = 0;
2056  for (int i = 0; i < TVT_MAX; i++) {
2057  ThreadVars *tv = tv_root[i];
2058  while (tv != NULL) {
2059  if ((tv->tmm_flags & flags) == flags)
2060  cnt++;
2061 
2062  tv = tv->next;
2063  }
2064  }
2066  return cnt;
2067 }
2068 
2069 #ifdef DEBUG_VALIDATION
2070 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2071 {
2072  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2074  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2075  tv, s, s->tm_id, m->name);
2076  }
2077 }
2078 
2079 static void TmThreadDumpThreads(void)
2080 {
2082  for (int i = 0; i < TVT_MAX; i++) {
2083  ThreadVars *tv = tv_root[i];
2084  while (tv != NULL) {
2085  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2086  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2087  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2088  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2089  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2090  } else if (tv->stream_pq_local != NULL) {
2091  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2092  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2093  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2094  }
2095  }
2096  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2097  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2098  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2099  }
2100  TmThreadDoDumpSlots(tv);
2101  tv = tv->next;
2102  }
2103  }
2106 }
2107 #endif
2108 
2109 /* Aligned to CLS to avoid false sharing between atomic ops. */
2110 typedef struct Thread_ {
2111  ThreadVars *tv; /**< threadvars structure */
2112  const char *name;
2113  int type;
2114  int in_use; /**< bool to indicate this is in use */
2115 
2116  SC_ATOMIC_DECLARE(SCTime_t, pktts); /**< current packet time of this thread
2117  * (offline mode) */
2118  SCTime_t sys_sec_stamp; /**< timestamp in real system
2119  * time when the pktts was last updated. */
2121 } __attribute__((aligned(CLS))) Thread;
2123 typedef struct Threads_ {
2124  Thread *threads;
2125  size_t threads_size;
2126  int threads_cnt;
2128 
2129 static bool thread_store_sealed = false;
2130 static Threads thread_store = { NULL, 0, 0 };
2131 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2132 
2134 {
2135  SCMutexLock(&thread_store_lock);
2136  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2137  thread_store_sealed = true;
2138  SCMutexUnlock(&thread_store_lock);
2139 }
2140 
2142 {
2143  SCMutexLock(&thread_store_lock);
2144  DEBUG_VALIDATE_BUG_ON(!thread_store_sealed);
2145  thread_store_sealed = false;
2146  SCMutexUnlock(&thread_store_lock);
2147 }
2148 
2150 {
2151  SCMutexLock(&thread_store_lock);
2152  for (size_t s = 0; s < thread_store.threads_size; s++) {
2153  Thread *t = &thread_store.threads[s];
2154  if (t == NULL || t->in_use == 0)
2155  continue;
2156 
2157  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2158  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2159  if (t->tv) {
2160  ThreadVars *tv = t->tv;
2161  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2162  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2163  tv, tv->type, tv->name, tv->tmm_flags, flags);
2164  }
2165  }
2166  SCMutexUnlock(&thread_store_lock);
2167 }
2168 
2169 #define STEP 32
2170 /**
2171  * \retval id thread id, or 0 if not found
2172  */
2174 {
2175  SCMutexLock(&thread_store_lock);
2176  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2177  if (thread_store.threads == NULL) {
2178  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2179  BUG_ON(thread_store.threads == NULL);
2180  thread_store.threads_size = STEP;
2181  }
2182 
2183  size_t s;
2184  for (s = 0; s < thread_store.threads_size; s++) {
2185  if (thread_store.threads[s].in_use == 0) {
2186  Thread *t = &thread_store.threads[s];
2187  SCSpinInit(&t->spin, 0);
2188  SCSpinLock(&t->spin);
2189  t->name = tv->name;
2190  t->type = type;
2191  t->tv = tv;
2192  t->in_use = 1;
2193  SCSpinUnlock(&t->spin);
2194 
2195  SCMutexUnlock(&thread_store_lock);
2196  return (int)(s+1);
2197  }
2198  }
2199 
2200  /* if we get here the array is completely filled */
2201  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2202  BUG_ON(newmem == NULL);
2203  thread_store.threads = newmem;
2204  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2205 
2206  Thread *t = &thread_store.threads[thread_store.threads_size];
2207  SCSpinInit(&t->spin, 0);
2208  SCSpinLock(&t->spin);
2209  t->name = tv->name;
2210  t->type = type;
2211  t->tv = tv;
2212  t->in_use = 1;
2213  SCSpinUnlock(&t->spin);
2214 
2215  s = thread_store.threads_size;
2216  thread_store.threads_size += STEP;
2217 
2218  SCMutexUnlock(&thread_store_lock);
2219  return (int)(s+1);
2220 }
2221 #undef STEP
2222 
2223 void TmThreadsUnregisterThread(const int id)
2224 {
2225  SCMutexLock(&thread_store_lock);
2226  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2227  if (id <= 0 || id > (int)thread_store.threads_size) {
2228  SCMutexUnlock(&thread_store_lock);
2229  return;
2230  }
2231 
2232  /* id is one higher than index */
2233  int idx = id - 1;
2234 
2235  /* reset thread_id, which serves as clearing the record */
2236  thread_store.threads[idx].in_use = 0;
2237 
2238  /* check if we have at least one registered thread left */
2239  size_t s;
2240  for (s = 0; s < thread_store.threads_size; s++) {
2241  Thread *t = &thread_store.threads[s];
2242  if (t->in_use == 1) {
2243  goto end;
2244  }
2245  }
2246 
2247  /* if we get here no threads are registered */
2248  SCFree(thread_store.threads);
2249  thread_store.threads = NULL;
2250  thread_store.threads_size = 0;
2251  thread_store.threads_cnt = 0;
2252 
2253 end:
2254  SCMutexUnlock(&thread_store_lock);
2255 }
2256 
2257 void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2258 {
2259  SCTime_t now = SCTimeGetTime();
2260  int idx = id - 1;
2261  Thread *t = &thread_store.threads[idx];
2262  SCSpinLock(&t->spin);
2263  SC_ATOMIC_SET(t->pktts, ts);
2264 
2265 #ifdef DEBUG
2266  if (t->sys_sec_stamp.secs != 0) {
2267  SCTime_t tmpts = SCTIME_ADD_SECS(t->sys_sec_stamp, 3);
2268  if (SCTIME_CMP_LT(tmpts, now)) {
2269  SCLogDebug("%s: thread slept for %u secs", t->name, (uint32_t)(now.secs - tmpts.secs));
2270  }
2271  }
2272 #endif
2273 
2274  t->sys_sec_stamp = now;
2275  SCSpinUnlock(&t->spin);
2276 }
2277 
2279 {
2280  static SCTime_t nullts = SCTIME_INITIALIZER;
2281  bool ready = true;
2282  for (size_t s = 0; s < thread_store.threads_size; s++) {
2283  Thread *t = &thread_store.threads[s];
2284  if (!t->in_use) {
2285  break;
2286  }
2287  SCSpinLock(&t->spin);
2288  if (t->type != TVT_PPT) {
2289  SCSpinUnlock(&t->spin);
2290  continue;
2291  }
2292  if (SCTIME_CMP_EQ(t->sys_sec_stamp, nullts)) {
2293  ready = false;
2294  SCSpinUnlock(&t->spin);
2295  break;
2296  }
2297  SCSpinUnlock(&t->spin);
2298  }
2299  return ready;
2300 }
2301 
2303 {
2304  SCTime_t now = SCTimeGetTime();
2305  for (size_t s = 0; s < thread_store.threads_size; s++) {
2306  Thread *t = &thread_store.threads[s];
2307  if (!t->in_use) {
2308  break;
2309  }
2310  SCSpinLock(&t->spin);
2311  if (t->type != TVT_PPT) {
2312  SCSpinUnlock(&t->spin);
2313  continue;
2314  }
2315  SC_ATOMIC_SET(t->pktts, ts);
2316  t->sys_sec_stamp = now;
2317  SCSpinUnlock(&t->spin);
2318  }
2319 }
2320 
2322 {
2323  BUG_ON(idx == 0);
2324  const int i = idx - 1;
2325  Thread *t = &thread_store.threads[i];
2326  return SC_ATOMIC_GET(t->pktts);
2327 }
2328 
2329 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2330 {
2331  struct timeval local = { 0 };
2332  static SCTime_t nullts = SCTIME_INITIALIZER;
2333  bool set = false;
2334  SCTime_t now = SCTimeGetTime();
2335 
2336  for (size_t s = 0; s < thread_store.threads_size; s++) {
2337  Thread *t = &thread_store.threads[s];
2338  if (t->in_use == 0) {
2339  break;
2340  }
2341  SCSpinLock(&t->spin);
2342  /* only packet threads set timestamps based on packets */
2343  if (t->type != TVT_PPT) {
2344  SCSpinUnlock(&t->spin);
2345  continue;
2346  }
2347  SCTime_t pktts = SC_ATOMIC_GET(t->pktts);
2348  if (SCTIME_CMP_NEQ(pktts, nullts)) {
2349  SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5);
2350  /* ignore sleeping threads */
2351  if (SCTIME_CMP_LT(sys_sec_stamp, now)) {
2352  SCSpinUnlock(&t->spin);
2353  continue;
2354  }
2355  if (!set) {
2356  SCTIME_TO_TIMEVAL(&local, pktts);
2357  set = true;
2358  } else {
2359  if (SCTIME_CMP_LT(pktts, SCTIME_FROM_TIMEVAL(&local))) {
2360  SCTIME_TO_TIMEVAL(&local, pktts);
2361  }
2362  }
2363  }
2364  SCSpinUnlock(&t->spin);
2365  }
2366  *ts = local;
2367  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2368 }
2369 
2371 {
2372  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2373  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2374  /* always create at least one thread */
2375  if (thread_max == 0)
2376  thread_max = ncpus * threading_detect_ratio;
2377  if (thread_max < 1)
2378  thread_max = 1;
2379  if (thread_max > 1024) {
2380  SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2381  thread_max = 1024;
2382  }
2383  return (uint16_t)thread_max;
2384 }
2385 
2386 /** \brief inject a flow into a threads flow queue
2387  */
2388 void TmThreadsInjectFlowById(Flow *f, const int id)
2389 {
2390  if (id > 0 && id <= (int)thread_store.threads_size) {
2391  int idx = id - 1;
2392  Thread *t = &thread_store.threads[idx];
2393  ThreadVars *tv = t->tv;
2394  if (tv != NULL && tv->flow_queue != NULL) {
2395  FlowEnqueue(tv->flow_queue, f);
2396 
2397  /* wake up listening thread(s) if necessary */
2398  if (tv->inq != NULL) {
2399  SCMutexLock(&tv->inq->pq->mutex_q);
2400  SCCondSignal(&tv->inq->pq->cond_q);
2401  SCMutexUnlock(&tv->inq->pq->mutex_q);
2402  } else if (tv->break_loop) {
2403  TmThreadsCaptureBreakLoop(tv);
2404  }
2405  return;
2406  }
2407  }
2408  BUG_ON(1);
2409 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:68
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:822
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:73
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:76
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:70
SCTmThreadsSlotPacketLoopFinish
bool SCTmThreadsSlotPacketLoopFinish(ThreadVars *tv)
Definition: tm-threads.c:273
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:135
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2278
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1761
tm-threads.h
TmThreadsSealThreads
void TmThreadsSealThreads(void)
Definition: tm-threads.c:2133
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:70
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
TmThreadLibSpawn
TmEcode TmThreadLibSpawn(ThreadVars *tv)
Spawns a "fake" lib thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1737
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:133
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1684
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1109
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
threading_set_cpu_affinity
bool threading_set_cpu_affinity
Definition: runmodes.c:62
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:863
SCTIME_CMP_NEQ
#define SCTIME_CMP_NEQ(a, b)
Definition: util-time.h:107
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:70
Thread_::SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(SCTime_t, pktts)
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:314
TmThreadContinueThreads
void TmThreadContinueThreads(void)
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1950
CLS
#define CLS
Definition: suricata-common.h:69
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1052
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:55
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:379
TmThreadSetGroupName
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1641
CaptureStatsSetup
void CaptureStatsSetup(ThreadVars *tv)
Definition: decode.c:997
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:105
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:48
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
PKT_SRC_SHUTDOWN_FLUSH
@ PKT_SRC_SHUTDOWN_FLUSH
Definition: decode.h:64
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:101
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:88
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1802
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:175
thread-callbacks.h
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:56
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:45
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:83
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:62
Packet_::flags
uint32_t flags
Definition: decode.h:535
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:75
threads.h
Threads
Threads
Definition: tm-threads.c:2127
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:356
TmThreadsUnsealThreads
void TmThreadsUnsealThreads(void)
Definition: tm-threads.c:2141
ThreadVars_::t
pthread_t t
Definition: threadvars.h:59
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:303
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:71
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:61
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2052
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:104
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:66
StatsSetupPrivate
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1205
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:53
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:117
MIN
#define MIN(x, y)
Definition: suricata-common.h:408
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:82
SCSpinLock
#define SCSpinLock
Definition: threads-debug.h:235
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:74
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1369
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:387
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:121
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:774
SCTIME_CMP_EQ
#define SCTIME_CMP_EQ(a, b)
Definition: util-time.h:108
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:89
m
SCMutex m
Definition: flow-hash.h:6
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:44
TmModule_::ThreadBusy
bool(* ThreadBusy)(ThreadVars *tv, void *thread_data)
Definition: tm-modules.h:63
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:143
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:314
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:82
rww_lock_contention
thread_local uint64_t rww_lock_contention
PacketQueueNoLock_
simple fifo queue for packets
Definition: packet-queue.h:34
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:173
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2169
Thread_::in_use
int in_use
Definition: tm-threads.c:2114
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:38
TmThreadsGetThreadTime
SCTime_t TmThreadsGetThreadTime(const int idx)
Definition: tm-threads.c:2321
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:89
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:244
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:81
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2388
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:81
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2329
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmThreadsProcessDecodePseudoPackets
TmEcode TmThreadsProcessDecodePseudoPackets(ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
Definition: tm-threads.c:114
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:46
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:46
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1309
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:109
util-signal.h
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:79
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:785
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1556
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2111
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1814
AffinityGetNextCPU
uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
Definition: util-affinity.c:279
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:90
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:57
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:1989
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
tv
ThreadVars * tv
Definition: tm-threads.c:2122
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:77
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:96
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:208
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:58
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
threading_set_stack_size
uint64_t threading_set_stack_size
Definition: runmodes.c:63
ThreadVars_::perf_public_ctx
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:128
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1252
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
Definition: tm-threads.c:2257
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:79
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:65
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1659
THV_KILL
#define THV_KILL
Definition: threadvars.h:40
THV_REQ_FLOW_LOOP
#define THV_REQ_FLOW_LOOP
Definition: threadvars.h:47
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:922
FlowQueueNew
FlowQueue * FlowQueueNew(void)
Definition: flow-queue.c:35
SCSpinUnlock
#define SCSpinUnlock
Definition: threads-debug.h:237
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:826
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2223
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2173
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
thread-storage.h
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
__attribute__
struct Thread_ __attribute__((aligned(CLS)))
Definition: tm-threads.c:2121
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:125
ThreadVars_::id
int id
Definition: threadvars.h:87
ThreadVars_::type
uint8_t type
Definition: threadvars.h:72
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:317
SCTIME_TO_TIMEVAL
#define SCTIME_TO_TIMEVAL(tv, t)
Definition: util-time.h:97
TmThreadTimeoutLoop
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
Definition: tm-threads.c:166
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:88
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:85
SCReturn
#define SCReturn
Definition: util-debug.h:273
stream.h
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:376
Thread_::sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2118
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1587
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:139
Packet_
Definition: decode.h:492
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:133
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:91
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:56
ThreadVars_::thread_group_name
char * thread_group_name
Definition: threadvars.h:67
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:69
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1557
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:74
TmSlot_
Definition: tm-threads.h:53
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:67
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:69
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:80
name
const char * name
Definition: tm-threads.c:2123
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:90
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
runmodes.h
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1081
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(const uint16_t set, const uint16_t check)
Disable all packet threads.
Definition: tm-threads.c:1507
TmModule_
Definition: tm-modules.h:43
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:255
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
Definition: tm-threads.c:2302
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:116
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
SCTIME_CMP_LT
#define SCTIME_CMP_LT(a, b)
Definition: util-time.h:105
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:39
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:37
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:658
StatsPublicThreadContext_::m
SCMutex m
Definition: counters.h:75
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:378
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1558
Thread_::type
int type
Definition: tm-threads.c:2113
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:90
Packet_::flow
struct Flow_ * flow
Definition: decode.h:537
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1168
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:75
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:54
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:831
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:35
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:34
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:106
TmThreadsWaitForUnpause
bool TmThreadsWaitForUnpause(ThreadVars *tv)
Wait for a thread to become unpaused.
Definition: tm-threads.c:363
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:41
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:136
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:101
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:847
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:72
sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2129
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:77
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
FatalError
#define FatalError(...)
Definition: util-debug.h:502
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax(void)
Definition: tm-threads.c:2370
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:66
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1142
StatsSyncCounters
void StatsSyncCounters(ThreadVars *tv)
Definition: counters.c:445
ThreadStorageSize
unsigned int ThreadStorageSize(void)
Definition: thread-storage.c:25
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:144
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:216
threadvars.h
util-validate.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:232
SCSpinInit
#define SCSpinInit
Definition: threads-debug.h:238
SCThreadRunInitCallbacks
void SCThreadRunInitCallbacks(ThreadVars *tv)
Definition: thread-callbacks.c:48
ThreadsAffinityType_
Definition: util-affinity.h:65
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:37
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:626
TmSlot_::tm_flags
uint8_t tm_flags
Definition: tm-threads.h:67
Thread_::name
const char * name
Definition: tm-threads.c:2112
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:131
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:50
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:91
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:74
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:69
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:73
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:317
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:84
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:112
SCSpinlock
#define SCSpinlock
Definition: threads-debug.h:234
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:88
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:89
suricata.h
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:463
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:142
Tmq_
Definition: tm-queues.h:29
Thread_::spin
SCSpinlock spin
Definition: tm-threads.c:2120
TmThreadWaitOnThreadRunning
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
Definition: tm-threads.c:1881
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2149
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1966
type
int type
Definition: tm-threads.c:2124
ThreadFreeStorage
void ThreadFreeStorage(ThreadVars *tv)
Definition: thread-storage.c:50
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:132
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:83
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:69
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:108
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:274
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:93
SCTIME_INITIALIZER
#define SCTIME_INITIALIZER
Definition: util-time.h:51
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
SCTIME_ADD_SECS
#define SCTIME_ADD_SECS(ts, s)
Definition: util-time.h:64
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:42
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2110
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:275
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
StatsThreadCleanup
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1304
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:63
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:267
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:102
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:76
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:939
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:359
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:171
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:350