suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2024 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "thread-callbacks.h"
33 #include "threadvars.h"
34 #include "thread-storage.h"
35 #include "tm-queues.h"
36 #include "tm-queuehandlers.h"
37 #include "tm-threads.h"
38 #include "tmqh-packetpool.h"
39 #include "threads.h"
40 #include "util-affinity.h"
41 #include "util-debug.h"
42 #include "util-privs.h"
43 #include "util-cpu.h"
44 #include "util-optimize.h"
45 #include "util-profiling.h"
46 #include "util-signal.h"
47 #include "queue.h"
48 #include "util-validate.h"
49 
50 #ifdef PROFILE_LOCKING
51 thread_local uint64_t mutex_lock_contention;
52 thread_local uint64_t mutex_lock_wait_ticks;
53 thread_local uint64_t mutex_lock_cnt;
54 
55 thread_local uint64_t spin_lock_contention;
56 thread_local uint64_t spin_lock_wait_ticks;
57 thread_local uint64_t spin_lock_cnt;
58 
59 thread_local uint64_t rww_lock_contention;
60 thread_local uint64_t rww_lock_wait_ticks;
61 thread_local uint64_t rww_lock_cnt;
62 
63 thread_local uint64_t rwr_lock_contention;
64 thread_local uint64_t rwr_lock_wait_ticks;
65 thread_local uint64_t rwr_lock_cnt;
66 #endif
67 
68 #ifdef OS_FREEBSD
69 #include <sched.h>
70 #include <sys/param.h>
71 #include <sys/resource.h>
72 #include <sys/cpuset.h>
73 #include <sys/thr.h>
74 #define cpu_set_t cpuset_t
75 #endif /* OS_FREEBSD */
76 
77 /* prototypes */
78 static int SetCPUAffinity(uint16_t cpu);
79 static void TmThreadDeinitMC(ThreadVars *tv);
80 
81 /* root of the threadvars list */
82 ThreadVars *tv_root[TVT_MAX] = { NULL };
83 
84 /* lock to protect tv_root */
86 
87 /**
88  * \brief Check if a thread flag is set.
89  *
90  * \retval 1 flag is set.
91  * \retval 0 flag is not set.
92  */
93 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
94 {
95  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
96 }
97 
98 /**
99  * \brief Set a thread flag.
100  */
101 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
102 {
103  SC_ATOMIC_OR(tv->flags, flag);
104 }
105 
106 /**
107  * \brief Unset a thread flag.
108  */
109 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
110 {
111  SC_ATOMIC_AND(tv->flags, ~flag);
112 }
113 
115  ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
116 {
117  while (decode_pq->top != NULL) {
118  Packet *extra_p = PacketDequeueNoLock(decode_pq);
119  if (unlikely(extra_p == NULL))
120  continue;
121  DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
122 
123  if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
125  }
126  }
128 }
129 
130 /**
131  * \brief Separate run function so we can call it recursively.
132  */
134 {
135  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
136  PACKET_PROFILING_TMM_START(p, s->tm_id);
137  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
138  PACKET_PROFILING_TMM_END(p, s->tm_id);
139  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
140 
141  /* handle error */
142  if (unlikely(r == TM_ECODE_FAILED)) {
143  /* Encountered error. Return packets to packetpool and return */
144  TmThreadsSlotProcessPktFail(tv, NULL);
145  return TM_ECODE_FAILED;
146  }
147  if (s->tm_flags & TM_FLAG_DECODE_TM) {
148  if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
149  TM_ECODE_OK) {
150  return TM_ECODE_FAILED;
151  }
152  }
153  }
154 
155  return TM_ECODE_OK;
156 }
157 
158 /** \internal
159  *
160  * \brief Process flow timeout packets
161  *
162  * Process flow timeout pseudo packets. During shutdown this loop
163  * is run until the flow engine kills the thread and the queue is
164  * empty.
165  */
166 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
167 {
168  TmSlot *fw_slot = tv->tm_flowworker;
169  int r = TM_ECODE_OK;
170 
171  if (tv->stream_pq == NULL || fw_slot == NULL) {
172  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
173  return r;
174  }
175 
176  SCLogDebug("flow end loop starting");
177  while (1) {
179  uint32_t len = tv->stream_pq->len;
181  if (len > 0) {
182  while (len--) {
186  if (likely(p)) {
187  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
188  r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
189  if (r == TM_ECODE_FAILED) {
190  break;
191  }
192  }
193  }
194  } else {
196  break;
197  }
198  SleepUsec(1);
199  }
200  }
201  SCLogDebug("flow end loop complete");
203 
204  return r;
205 }
206 
207 /*
208 
209  pcap/nfq
210 
211  pkt read
212  callback
213  process_pkt
214 
215  pkt read
216  process_pkt
217 
218  slot:
219  setup
220 
221  pkt_ack_loop(tv, slot_data)
222 
223  deinit
224 
225  process_pkt:
226  while(s)
227  run s;
228  queue;
229 
230  */
231 
232 static void *TmThreadsSlotPktAcqLoop(void *td)
233 {
234  ThreadVars *tv = (ThreadVars *)td;
235  TmSlot *s = tv->tm_slots;
236  TmEcode r = TM_ECODE_OK;
237  TmSlot *slot = NULL;
238 
240 
241  if (tv->thread_setup_flags != 0)
243 
245  PacketPoolInit();
246 
247  /* check if we are setup properly */
248  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
249  SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
250  " PktAcqLoop=%p, tmqh_in=%p,"
251  " tmqh_out=%p",
252  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
254  pthread_exit((void *) -1);
255  return NULL;
256  }
257 
258  for (slot = s; slot != NULL; slot = slot->slot_next) {
259  if (slot->SlotThreadInit != NULL) {
260  void *slot_data = NULL;
261  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
262  if (r != TM_ECODE_OK) {
263  if (r == TM_ECODE_DONE) {
264  EngineDone();
266  goto error;
267  } else {
269  goto error;
270  }
271  }
272  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
273  }
274 
275  /* if the flowworker module is the first, get the threads input queue */
276  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
277  tv->stream_pq = tv->inq->pq;
278  tv->tm_flowworker = slot;
279  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
281  if (tv->flow_queue == NULL) {
283  pthread_exit((void *) -1);
284  return NULL;
285  }
286  /* setup a queue */
287  } else if (slot->tm_id == TMM_FLOWWORKER) {
288  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
289  if (tv->stream_pq_local == NULL)
290  FatalError("failed to alloc PacketQueue");
293  tv->tm_flowworker = slot;
294  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
296  if (tv->flow_queue == NULL) {
298  pthread_exit((void *) -1);
299  return NULL;
300  }
301  }
302  }
303 
305 
307  bool run = TmThreadsWaitForUnpause(tv);
308 
309  while (run) {
310  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
311 
312  if (r == TM_ECODE_FAILED) {
314  run = false;
315  }
317  run = false;
318  }
319  if (r == TM_ECODE_DONE) {
320  run = false;
321  }
322  }
324 
326 
327  /* process all pseudo packets the flow timeout may throw at us */
328  TmThreadTimeoutLoop(tv, s);
329 
332 
334 
335  for (slot = s; slot != NULL; slot = slot->slot_next) {
336  if (slot->SlotThreadExitPrintStats != NULL) {
337  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
338  }
339 
340  if (slot->SlotThreadDeinit != NULL) {
341  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
342  if (r != TM_ECODE_OK) {
344  goto error;
345  }
346  }
347  }
348 
349  tv->stream_pq = NULL;
350  SCLogDebug("%s ending", tv->name);
352  pthread_exit((void *) 0);
353  return NULL;
354 
355 error:
356  tv->stream_pq = NULL;
357  pthread_exit((void *) -1);
358  return NULL;
359 }
360 
361 /**
362  * Also returns if the kill flag is set.
363  */
365 {
368 
369  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
370  SleepUsec(100);
371 
373  return false;
374  }
375 
377  }
378 
379  return true;
380 }
381 
382 static void *TmThreadsSlotVar(void *td)
383 {
384  ThreadVars *tv = (ThreadVars *)td;
385  TmSlot *s = (TmSlot *)tv->tm_slots;
386  Packet *p = NULL;
387  TmEcode r = TM_ECODE_OK;
388 
390  PacketPoolInit();//Empty();
391 
393 
394  if (tv->thread_setup_flags != 0)
396 
397  /* Drop the capabilities for this thread */
398  SCDropCaps(tv);
399 
400  /* check if we are setup properly */
401  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
403  pthread_exit((void *) -1);
404  return NULL;
405  }
406 
407  for (; s != NULL; s = s->slot_next) {
408  if (s->SlotThreadInit != NULL) {
409  void *slot_data = NULL;
410  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
411  if (r != TM_ECODE_OK) {
413  goto error;
414  }
415  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
416  }
417 
418  /* special case: we need to access the stream queue
419  * from the flow timeout code */
420 
421  /* if the flowworker module is the first, get the threads input queue */
422  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
423  tv->stream_pq = tv->inq->pq;
424  tv->tm_flowworker = s;
425  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
427  if (tv->flow_queue == NULL) {
429  pthread_exit((void *) -1);
430  return NULL;
431  }
432  /* setup a queue */
433  } else if (s->tm_id == TMM_FLOWWORKER) {
434  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
435  if (tv->stream_pq_local == NULL)
436  FatalError("failed to alloc PacketQueue");
439  tv->tm_flowworker = s;
440  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
442  if (tv->flow_queue == NULL) {
444  pthread_exit((void *) -1);
445  return NULL;
446  }
447  }
448  }
449 
451 
452  // Each 'worker' thread uses this func to process/decode the packet read.
453  // Each decode method is different to receive methods in that they do not
454  // enter infinite loops. They use this as the core loop. As a result, at this
455  // point the worker threads can be considered both initialized and running.
457  bool run = TmThreadsWaitForUnpause(tv);
458 
459  s = (TmSlot *)tv->tm_slots;
460 
461  while (run) {
462  /* input a packet */
463  p = tv->tmqh_in(tv);
464 
465  /* if we didn't get a packet see if we need to do some housekeeping */
466  if (unlikely(p == NULL)) {
467  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
469  if (p != NULL) {
472  }
473  }
474  }
475 
476  if (p != NULL) {
477  /* run the thread module(s) */
478  r = TmThreadsSlotVarRun(tv, p, s);
479  if (r == TM_ECODE_FAILED) {
482  break;
483  }
484 
485  /* output the packet */
486  tv->tmqh_out(tv, p);
487 
488  /* now handle the stream pq packets */
489  TmThreadsHandleInjectedPackets(tv);
490  }
491 
493  run = false;
494  }
495  } /* while (run) */
497 
500 
502 
503  s = (TmSlot *)tv->tm_slots;
504 
505  for ( ; s != NULL; s = s->slot_next) {
506  if (s->SlotThreadExitPrintStats != NULL) {
507  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
508  }
509 
510  if (s->SlotThreadDeinit != NULL) {
511  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
512  if (r != TM_ECODE_OK) {
514  goto error;
515  }
516  }
517  }
518 
519  SCLogDebug("%s ending", tv->name);
520  tv->stream_pq = NULL;
522  pthread_exit((void *) 0);
523  return NULL;
524 
525 error:
526  tv->stream_pq = NULL;
527  pthread_exit((void *) -1);
528  return NULL;
529 }
530 
531 static void *TmThreadsManagement(void *td)
532 {
533  ThreadVars *tv = (ThreadVars *)td;
534  TmSlot *s = (TmSlot *)tv->tm_slots;
535  TmEcode r = TM_ECODE_OK;
536 
537  BUG_ON(s == NULL);
538 
540 
541  if (tv->thread_setup_flags != 0)
543 
544  /* Drop the capabilities for this thread */
545  SCDropCaps(tv);
546 
547  SCLogDebug("%s starting", tv->name);
548 
549  if (s->SlotThreadInit != NULL) {
550  void *slot_data = NULL;
551  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
552  if (r != TM_ECODE_OK) {
554  pthread_exit((void *) -1);
555  return NULL;
556  }
557  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
558  }
559 
561 
563 
564  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
565  /* handle error */
566  if (r == TM_ECODE_FAILED) {
568  }
569 
572  }
573 
576 
577  if (s->SlotThreadExitPrintStats != NULL) {
578  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
579  }
580 
581  if (s->SlotThreadDeinit != NULL) {
582  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
583  if (r != TM_ECODE_OK) {
585  pthread_exit((void *) -1);
586  return NULL;
587  }
588  }
589 
591  pthread_exit((void *) 0);
592  return NULL;
593 }
594 
595 /**
596  * \brief We set the slot functions.
597  *
598  * \param tv Pointer to the TV to set the slot function for.
599  * \param name Name of the slot variant.
600  * \param fn_p Pointer to a custom slot function. Used only if slot variant
601  * "name" is "custom".
602  *
603  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
604  */
605 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
606 {
607  if (name == NULL) {
608  if (fn_p == NULL) {
609  printf("Both slot name and function pointer can't be NULL inside "
610  "TmThreadSetSlots\n");
611  goto error;
612  } else {
613  name = "custom";
614  }
615  }
616 
617  if (strcmp(name, "varslot") == 0) {
618  tv->tm_func = TmThreadsSlotVar;
619  } else if (strcmp(name, "pktacqloop") == 0) {
620  tv->tm_func = TmThreadsSlotPktAcqLoop;
621  } else if (strcmp(name, "management") == 0) {
622  tv->tm_func = TmThreadsManagement;
623  } else if (strcmp(name, "command") == 0) {
624  tv->tm_func = TmThreadsManagement;
625  } else if (strcmp(name, "custom") == 0) {
626  if (fn_p == NULL)
627  goto error;
628  tv->tm_func = fn_p;
629  } else {
630  printf("Error: Slot \"%s\" not supported\n", name);
631  goto error;
632  }
633 
634  return TM_ECODE_OK;
635 
636 error:
637  return TM_ECODE_FAILED;
638 }
639 
640 /**
641  * \brief Appends a new entry to the slots.
642  *
643  * \param tv TV the slot is attached to.
644  * \param tm TM to append.
645  * \param data Data to be passed on to the slot init function.
646  *
647  * \retval The allocated TmSlot or NULL if there is an error
648  */
649 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
650 {
651  TmSlot *slot = SCCalloc(1, sizeof(TmSlot));
652  if (unlikely(slot == NULL))
653  return;
654  SC_ATOMIC_INITPTR(slot->slot_data);
655  slot->SlotThreadInit = tm->ThreadInit;
656  slot->slot_initdata = data;
657  if (tm->Func) {
658  slot->SlotFunc = tm->Func;
659  } else if (tm->PktAcqLoop) {
660  slot->PktAcqLoop = tm->PktAcqLoop;
661  if (tm->PktAcqBreakLoop) {
662  tv->break_loop = true;
663  }
664  } else if (tm->Management) {
665  slot->Management = tm->Management;
666  }
668  slot->SlotThreadDeinit = tm->ThreadDeinit;
669  /* we don't have to check for the return value "-1". We wouldn't have
670  * received a TM as arg, if it didn't exist */
671  slot->tm_id = TmModuleGetIDForTM(tm);
672  slot->tm_flags |= tm->flags;
673 
674  tv->tmm_flags |= tm->flags;
675  tv->cap_flags |= tm->cap_flags;
676 
677  if (tv->tm_slots == NULL) {
678  tv->tm_slots = slot;
679  } else {
680  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
681 
682  /* get the last slot */
683  for ( ; a != NULL; a = a->slot_next) {
684  b = a;
685  }
686  /* append the new slot */
687  if (b != NULL) {
688  b->slot_next = slot;
689  }
690  }
691 }
692 
693 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
694 static int SetCPUAffinitySet(cpu_set_t *cs)
695 {
696 #if defined OS_FREEBSD
697  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
698  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
699 #elif OS_DARWIN
700  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
701  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
702 #else
703  pid_t tid = syscall(SYS_gettid);
704  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
705 #endif /* OS_FREEBSD */
706 
707  if (r != 0) {
708  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
709  strerror(errno));
710  return -1;
711  }
712 
713  return 0;
714 }
715 #endif
716 
717 
718 /**
719  * \brief Set the thread affinity on the calling thread.
720  *
721  * \param cpuid Id of the core/cpu to setup the affinity.
722  *
723  * \retval 0 If all goes well; -1 if something is wrong.
724  */
725 static int SetCPUAffinity(uint16_t cpuid)
726 {
727 #if defined __OpenBSD__ || defined sun
728  return 0;
729 #else
730  int cpu = (int)cpuid;
731 
732 #if defined OS_WIN32 || defined __CYGWIN__
733  DWORD cs = 1 << cpu;
734 
735  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
736  if (r != 0) {
737  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
738  strerror(errno));
739  return -1;
740  }
741  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
742  SCGetThreadIdLong(), cpu);
743 
744  return 0;
745 
746 #else
747  cpu_set_t cs;
748  memset(&cs, 0, sizeof(cs));
749 
750  CPU_ZERO(&cs);
751  CPU_SET(cpu, &cs);
752  return SetCPUAffinitySet(&cs);
753 #endif /* windows */
754 #endif /* not supported */
755 }
756 
757 
758 /**
759  * \brief Set the thread options (thread priority).
760  *
761  * \param tv Pointer to the ThreadVars to setup the thread priority.
762  *
763  * \retval TM_ECODE_OK.
764  */
766 {
768  tv->thread_priority = prio;
769 
770  return TM_ECODE_OK;
771 }
772 
773 /**
774  * \brief Adjusting nice value for threads.
775  */
777 {
778  SCEnter();
779 #ifndef __CYGWIN__
780 #ifdef OS_WIN32
781  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
782  SCLogError("Error setting priority for "
783  "thread %s: %s",
784  tv->name, strerror(errno));
785  } else {
786  SCLogDebug("Priority set to %"PRId32" for thread %s",
788  }
789 #else
790  int ret = nice(tv->thread_priority);
791  if (ret == -1) {
792  SCLogError("Error setting nice value %d "
793  "for thread %s: %s",
794  tv->thread_priority, tv->name, strerror(errno));
795  } else {
796  SCLogDebug("Nice value set to %"PRId32" for thread %s",
798  }
799 #endif /* OS_WIN32 */
800 #endif
801  SCReturn;
802 }
803 
804 
805 /**
806  * \brief Set the thread options (cpu affinity).
807  *
808  * \param tv pointer to the ThreadVars to setup the affinity.
809  * \param cpu cpu on which affinity is set.
810  *
811  * \retval TM_ECODE_OK
812  */
814 {
816  tv->cpu_affinity = cpu;
817 
818  return TM_ECODE_OK;
819 }
820 
821 
823 {
825  return TM_ECODE_OK;
826 
827  if (type > MAX_CPU_SET) {
828  SCLogError("invalid cpu type family");
829  return TM_ECODE_FAILED;
830  }
831 
833  tv->cpu_affinity = type;
834 
835  return TM_ECODE_OK;
836 }
837 
839 {
840  if (type >= MAX_CPU_SET) {
841  SCLogError("invalid cpu type family");
842  return 0;
843  }
844 
846 }
847 
848 /**
849  * \brief Set the thread options (cpu affinitythread).
850  * Priority should be already set by pthread_create.
851  *
852  * \param tv pointer to the ThreadVars of the calling thread.
853  */
855 {
857  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
858  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
860  SetCPUAffinity(tv->cpu_affinity);
861  }
862 
863 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
868  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
869  uint16_t cpu = AffinityGetNextCPU(taf);
870  SetCPUAffinity(cpu);
871  /* If CPU is in a set overwrite the default thread prio */
872  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
874  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
876  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
878  } else {
879  tv->thread_priority = taf->prio;
880  }
881  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
882  "%d, thread id %lu", tv->thread_priority,
883  tv->name, cpu, SCGetThreadIdLong());
884  } else {
885  SetCPUAffinitySet(&taf->cpu_set);
886  tv->thread_priority = taf->prio;
887  SCLogPerf("Setting prio %d for thread \"%s\", "
888  "thread id %lu", tv->thread_priority,
890  }
892  }
893 #endif
894 
895  return TM_ECODE_OK;
896 }
897 
898 /**
899  * \brief Creates and returns the TV instance for a new thread.
900  *
901  * \param name Name of this TV instance
902  * \param inq_name Incoming queue name
903  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
904  * \param outq_name Outgoing queue name
905  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
906  * \param slots String representation for the slot function to be used
907  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
908  * \param mucond Flag to indicate whether to initialize the condition
909  * and the mutex variables for this newly created TV.
910  *
911  * \retval the newly created TV instance, or NULL on error
912  */
913 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
914  const char *outq_name, const char *outqh_name, const char *slots,
915  void * (*fn_p)(void *), int mucond)
916 {
917  ThreadVars *tv = NULL;
918  Tmq *tmq = NULL;
919  Tmqh *tmqh = NULL;
920 
921  SCLogDebug("creating thread \"%s\"...", name);
922 
923  /* XXX create separate function for this: allocate a thread container */
924  tv = SCCalloc(1, sizeof(ThreadVars) + ThreadStorageSize());
925  if (unlikely(tv == NULL))
926  goto error;
927 
928  SC_ATOMIC_INIT(tv->flags);
929  SCMutexInit(&tv->perf_public_ctx.m, NULL);
930 
931  strlcpy(tv->name, name, sizeof(tv->name));
932 
933  /* default state for every newly created thread */
936 
937  /* set the incoming queue */
938  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
939  SCLogDebug("inq_name \"%s\"", inq_name);
940 
941  tmq = TmqGetQueueByName(inq_name);
942  if (tmq == NULL) {
943  tmq = TmqCreateQueue(inq_name);
944  if (tmq == NULL)
945  goto error;
946  }
947  SCLogDebug("tmq %p", tmq);
948 
949  tv->inq = tmq;
950  tv->inq->reader_cnt++;
951  SCLogDebug("tv->inq %p", tv->inq);
952  }
953  if (inqh_name != NULL) {
954  SCLogDebug("inqh_name \"%s\"", inqh_name);
955 
956  int id = TmqhNameToID(inqh_name);
957  if (id <= 0) {
958  goto error;
959  }
960  tmqh = TmqhGetQueueHandlerByName(inqh_name);
961  if (tmqh == NULL)
962  goto error;
963 
964  tv->tmqh_in = tmqh->InHandler;
965  tv->inq_id = (uint8_t)id;
966  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
967  }
968 
969  /* set the outgoing queue */
970  if (outqh_name != NULL) {
971  SCLogDebug("outqh_name \"%s\"", outqh_name);
972 
973  int id = TmqhNameToID(outqh_name);
974  if (id <= 0) {
975  goto error;
976  }
977 
978  tmqh = TmqhGetQueueHandlerByName(outqh_name);
979  if (tmqh == NULL)
980  goto error;
981 
982  tv->tmqh_out = tmqh->OutHandler;
983  tv->outq_id = (uint8_t)id;
984 
985  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
986  SCLogDebug("outq_name \"%s\"", outq_name);
987 
988  if (tmqh->OutHandlerCtxSetup != NULL) {
989  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
990  if (tv->outctx == NULL)
991  goto error;
992  tv->outq = NULL;
993  } else {
994  tmq = TmqGetQueueByName(outq_name);
995  if (tmq == NULL) {
996  tmq = TmqCreateQueue(outq_name);
997  if (tmq == NULL)
998  goto error;
999  }
1000  SCLogDebug("tmq %p", tmq);
1001 
1002  tv->outq = tmq;
1003  tv->outctx = NULL;
1004  tv->outq->writer_cnt++;
1005  }
1006  }
1007  }
1008 
1009  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1010  goto error;
1011  }
1012 
1013  if (mucond != 0)
1014  TmThreadInitMC(tv);
1015 
1017 
1018  return tv;
1019 
1020 error:
1021  SCLogError("failed to setup a thread");
1022 
1023  if (tv != NULL)
1024  SCFree(tv);
1025  return NULL;
1026 }
1027 
1028 /**
1029  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1030  * This function doesn't support custom slots, and hence shouldn't be
1031  * supplied \"custom\" as its slot type. All PPT threads are created
1032  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1033  * conditional variables are not used to kill the thread.
1034  *
1035  * \param name Name of this TV instance
1036  * \param inq_name Incoming queue name
1037  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1038  * \param outq_name Outgoing queue name
1039  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1040  * \param slots String representation for the slot function to be used
1041  *
1042  * \retval the newly created TV instance, or NULL on error
1043  */
1044 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1045  const char *inqh_name, const char *outq_name,
1046  const char *outqh_name, const char *slots)
1047 {
1048  ThreadVars *tv = NULL;
1049 
1050  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1051  slots, NULL, 0);
1052 
1053  if (tv != NULL) {
1054  tv->type = TVT_PPT;
1056  }
1057 
1058  return tv;
1059 }
1060 
1061 /**
1062  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1063  * This function supports only custom slot functions and hence a
1064  * function pointer should be sent as an argument.
1065  *
1066  * \param name Name of this TV instance
1067  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1068  * \param mucond Flag to indicate whether to initialize the condition
1069  * and the mutex variables for this newly created TV.
1070  *
1071  * \retval the newly created TV instance, or NULL on error
1072  */
1073 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1074  int mucond)
1075 {
1076  ThreadVars *tv = NULL;
1077 
1078  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1079 
1080  if (tv != NULL) {
1081  tv->type = TVT_MGMT;
1084  }
1085 
1086  return tv;
1087 }
1088 
1089 /**
1090  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1091  * This function supports only custom slot functions and hence a
1092  * function pointer should be sent as an argument.
1093  *
1094  * \param name Name of this TV instance
1095  * \param module Name of TmModule with MANAGEMENT flag set.
1096  * \param mucond Flag to indicate whether to initialize the condition
1097  * and the mutex variables for this newly created TV.
1098  *
1099  * \retval the newly created TV instance, or NULL on error
1100  */
1101 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1102  int mucond)
1103 {
1104  ThreadVars *tv = NULL;
1105 
1106  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1107 
1108  if (tv != NULL) {
1109  tv->type = TVT_MGMT;
1112 
1113  TmModule *m = TmModuleGetByName(module);
1114  if (m) {
1115  TmSlotSetFuncAppend(tv, m, NULL);
1116  }
1117  }
1118 
1119  return tv;
1120 }
1121 
1122 /**
1123  * \brief Creates and returns the TV instance for a Command thread (CMD).
1124  * This function supports only custom slot functions and hence a
1125  * function pointer should be sent as an argument.
1126  *
1127  * \param name Name of this TV instance
1128  * \param module Name of TmModule with COMMAND flag set.
1129  * \param mucond Flag to indicate whether to initialize the condition
1130  * and the mutex variables for this newly created TV.
1131  *
1132  * \retval the newly created TV instance, or NULL on error
1133  */
1134 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1135  int mucond)
1136 {
1137  ThreadVars *tv = NULL;
1138 
1139  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1140 
1141  if (tv != NULL) {
1142  tv->type = TVT_CMD;
1145 
1146  TmModule *m = TmModuleGetByName(module);
1147  if (m) {
1148  TmSlotSetFuncAppend(tv, m, NULL);
1149  }
1150  }
1151 
1152  return tv;
1153 }
1154 
1155 /**
1156  * \brief Appends this TV to tv_root based on its type
1157  *
1158  * \param type holds the type this TV belongs to.
1159  */
1161 {
1163 
1164  if (tv_root[type] == NULL) {
1165  tv_root[type] = tv;
1166  tv->next = NULL;
1167 
1169 
1170  return;
1171  }
1172 
1173  ThreadVars *t = tv_root[type];
1174 
1175  while (t) {
1176  if (t->next == NULL) {
1177  t->next = tv;
1178  tv->next = NULL;
1179  break;
1180  }
1181 
1182  t = t->next;
1183  }
1184 
1186 }
1187 
1188 static bool ThreadStillHasPackets(ThreadVars *tv)
1189 {
1190  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1191  /* we wait till we dry out all the inq packets, before we
1192  * kill this thread. Do note that you should have disabled
1193  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1194  PacketQueue *q = tv->inq->pq;
1195  SCMutexLock(&q->mutex_q);
1196  uint32_t len = q->len;
1197  SCMutexUnlock(&q->mutex_q);
1198  if (len != 0) {
1199  return true;
1200  }
1201  }
1202 
1203  if (tv->stream_pq != NULL) {
1205  uint32_t len = tv->stream_pq->len;
1207 
1208  if (len != 0) {
1209  return true;
1210  }
1211  }
1212  return false;
1213 }
1214 
1215 /**
1216  * \brief Kill a thread.
1217  *
1218  * \param tv A ThreadVars instance corresponding to the thread that has to be
1219  * killed.
1220  *
1221  * \retval r 1 killed successfully
1222  * 0 not yet ready, needs another look
1223  */
1224 static int TmThreadKillThread(ThreadVars *tv)
1225 {
1226  BUG_ON(tv == NULL);
1227 
1228  /* kill only once :) */
1229  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1230  return 1;
1231  }
1232 
1233  /* set the thread flag informing the thread that it needs to be
1234  * terminated */
1237 
1238  /* to be sure, signal more */
1239  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1240  if (tv->inq_id != TMQH_NOT_SET) {
1242  if (qh != NULL && qh->InShutdownHandler != NULL) {
1243  qh->InShutdownHandler(tv);
1244  }
1245  }
1246  if (tv->inq != NULL) {
1247  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1248  SCMutexLock(&tv->inq->pq->mutex_q);
1249  SCCondSignal(&tv->inq->pq->cond_q);
1250  SCMutexUnlock(&tv->inq->pq->mutex_q);
1251  }
1252  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1253  }
1254 
1255  if (tv->ctrl_cond != NULL ) {
1257  pthread_cond_broadcast(tv->ctrl_cond);
1259  }
1260  return 0;
1261  }
1262 
1263  if (tv->outctx != NULL) {
1264  if (tv->outq_id != TMQH_NOT_SET) {
1266  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1267  qh->OutHandlerCtxFree(tv->outctx);
1268  tv->outctx = NULL;
1269  }
1270  }
1271  }
1272 
1273  /* join it and flag it as dead */
1274  pthread_join(tv->t, NULL);
1275  SCLogDebug("thread %s stopped", tv->name);
1277  return 1;
1278 }
1279 
1280 static bool ThreadBusy(ThreadVars *tv)
1281 {
1282  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
1283  TmModule *tm = TmModuleGetById(s->tm_id);
1284  if (tm && tm->ThreadBusy != NULL) {
1285  if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
1286  return true;
1287  }
1288  }
1289  return false;
1290 }
1291 
1292 /** \internal
1293  *
1294  * \brief make sure that all packet threads are done processing their
1295  * in-flight packets, including 'injected' flow packets.
1296  */
1297 static void TmThreadDrainPacketThreads(void)
1298 {
1299  ThreadVars *tv = NULL;
1300  struct timeval start_ts;
1301  struct timeval cur_ts;
1302  gettimeofday(&start_ts, NULL);
1303 
1304 again:
1305  gettimeofday(&cur_ts, NULL);
1306  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1307  SCLogWarning("unable to get all packet threads "
1308  "to process their packets in time");
1309  return;
1310  }
1311 
1313 
1314  /* all receive threads are part of packet processing threads */
1315  tv = tv_root[TVT_PPT];
1316  while (tv) {
1317  if (ThreadStillHasPackets(tv)) {
1318  /* we wait till we dry out all the inq packets, before we
1319  * kill this thread. Do note that you should have disabled
1320  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1322 
1323  /* sleep outside lock */
1324  SleepMsec(1);
1325  goto again;
1326  }
1327  if (ThreadBusy(tv)) {
1329 
1330  Packet *p = PacketGetFromAlloc();
1331  if (p != NULL) {
1334  PacketQueue *q = tv->stream_pq;
1335  SCMutexLock(&q->mutex_q);
1336  PacketEnqueue(q, p);
1337  SCCondSignal(&q->cond_q);
1338  SCMutexUnlock(&q->mutex_q);
1339  }
1340 
1341  /* don't sleep while holding a lock */
1342  SleepMsec(1);
1343  goto again;
1344  }
1345  tv = tv->next;
1346  }
1347 
1349 }
1350 
1351 /**
1352  * \brief Disable all threads having the specified TMs.
1353  *
1354  * Breaks out of the packet acquisition loop, and bumps
1355  * into the 'flow loop', where it will process packets
1356  * from the flow engine's shutdown handling.
1357  */
1359 {
1360  ThreadVars *tv = NULL;
1361  struct timeval start_ts;
1362  struct timeval cur_ts;
1363  gettimeofday(&start_ts, NULL);
1364 
1365 again:
1366  gettimeofday(&cur_ts, NULL);
1367  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1368  FatalError("Engine unable to disable detect "
1369  "thread - \"%s\". Killing engine",
1370  tv->name);
1371  }
1372 
1374 
1375  /* all receive threads are part of packet processing threads */
1376  tv = tv_root[TVT_PPT];
1377 
1378  /* we do have to keep in mind that TVs are arranged in the order
1379  * right from receive to log. The moment we fail to find a
1380  * receive TM amongst the slots in a tv, it indicates we are done
1381  * with all receive threads */
1382  while (tv) {
1383  int disable = 0;
1384  TmModule *tm = NULL;
1385  /* obtain the slots for this TV */
1386  TmSlot *slots = tv->tm_slots;
1387  while (slots != NULL) {
1388  tm = TmModuleGetById(slots->tm_id);
1389 
1390  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1391  disable = 1;
1392  break;
1393  }
1394 
1395  slots = slots->slot_next;
1396  continue;
1397  }
1398 
1399  if (disable) {
1400  if (ThreadStillHasPackets(tv)) {
1401  /* we wait till we dry out all the inq packets, before we
1402  * kill this thread. Do note that you should have disabled
1403  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1405  /* don't sleep while holding a lock */
1406  SleepMsec(1);
1407  goto again;
1408  }
1409 
1410  if (ThreadBusy(tv)) {
1412 
1413  Packet *p = PacketGetFromAlloc();
1414  if (p != NULL) {
1417  PacketQueue *q = tv->stream_pq;
1418  SCMutexLock(&q->mutex_q);
1419  PacketEnqueue(q, p);
1420  SCCondSignal(&q->cond_q);
1421  SCMutexUnlock(&q->mutex_q);
1422  }
1423 
1424  /* don't sleep while holding a lock */
1425  SleepMsec(1);
1426  goto again;
1427  }
1428 
1429  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1430  if (tm && tm->PktAcqBreakLoop != NULL) {
1431  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1432  }
1434 
1435  if (tv->inq != NULL) {
1436  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1437  SCMutexLock(&tv->inq->pq->mutex_q);
1438  SCCondSignal(&tv->inq->pq->cond_q);
1439  SCMutexUnlock(&tv->inq->pq->mutex_q);
1440  }
1441  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1442  }
1443 
1444  /* wait for it to enter the 'flow loop' stage */
1445  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1447 
1448  SleepMsec(1);
1449  goto again;
1450  }
1451  }
1452 
1453  tv = tv->next;
1454  }
1455 
1457 
1458  /* finally wait for all packet threads to have
1459  * processed all of their 'live' packets so we
1460  * don't process the last live packets together
1461  * with FFR packets */
1462  TmThreadDrainPacketThreads();
1463 }
1464 
1465 #ifdef DEBUG_VALIDATION
1466 static void TmThreadDumpThreads(void);
1467 #endif
1468 
1469 static void TmThreadDebugValidateNoMorePackets(void)
1470 {
1471 #ifdef DEBUG_VALIDATION
1473  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1474  if (ThreadStillHasPackets(tv)) {
1476  TmThreadDumpThreads();
1477  abort();
1478  }
1479  }
1481 #endif
1482 }
1483 
1484 /**
1485  * \brief Disable all packet threads
1486  */
1488 {
1489  struct timeval start_ts;
1490  struct timeval cur_ts;
1491 
1492  /* first drain all packet threads of their packets */
1493  TmThreadDrainPacketThreads();
1494 
1495  /* since all the threads possibly able to produce more packets
1496  * are now gone or inactive, we should see no packets anywhere
1497  * anymore. */
1498  TmThreadDebugValidateNoMorePackets();
1499 
1500  gettimeofday(&start_ts, NULL);
1501 again:
1502  gettimeofday(&cur_ts, NULL);
1503  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1504  FatalError("Engine unable to disable packet "
1505  "threads. Killing engine");
1506  }
1507 
1508  /* loop through the packet threads and kill them */
1510  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1512 
1513  /* separate worker threads (autofp) will still wait at their
1514  * input queues. So nudge them here so they will observe the
1515  * THV_KILL flag. */
1516  if (tv->inq != NULL) {
1517  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1518  SCMutexLock(&tv->inq->pq->mutex_q);
1519  SCCondSignal(&tv->inq->pq->cond_q);
1520  SCMutexUnlock(&tv->inq->pq->mutex_q);
1521  }
1522  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1523  }
1524 
1527 
1528  SleepMsec(1);
1529  goto again;
1530  }
1531  }
1533 }
1534 
1535 #define MIN_WAIT_TIME 100
1536 #define MAX_WAIT_TIME 999999
1538 {
1539  ThreadVars *tv = NULL;
1540  unsigned int sleep_usec = MIN_WAIT_TIME;
1541 
1542  BUG_ON((family < 0) || (family >= TVT_MAX));
1543 
1544 again:
1546  tv = tv_root[family];
1547 
1548  while (tv) {
1549  int r = TmThreadKillThread(tv);
1550  if (r == 0) {
1552  SleepUsec(sleep_usec);
1553  sleep_usec *= 2; /* slowly back off */
1554  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1555  goto again;
1556  }
1557  sleep_usec = MIN_WAIT_TIME; /* reset */
1558 
1559  tv = tv->next;
1560  }
1562 }
1563 #undef MIN_WAIT_TIME
1564 #undef MAX_WAIT_TIME
1565 
1567 {
1568  int i = 0;
1569 
1570  for (i = 0; i < TVT_MAX; i++) {
1572  }
1573 }
1574 
1575 static void TmThreadFree(ThreadVars *tv)
1576 {
1577  TmSlot *s;
1578  TmSlot *ps;
1579  if (tv == NULL)
1580  return;
1581 
1582  SCLogDebug("Freeing thread '%s'.", tv->name);
1583 
1585 
1586  if (tv->flow_queue) {
1587  BUG_ON(tv->flow_queue->qlen != 0);
1588  SCFree(tv->flow_queue);
1589  }
1590 
1592 
1593  TmThreadDeinitMC(tv);
1594 
1595  if (tv->thread_group_name) {
1597  }
1598 
1599  if (tv->printable_name) {
1601  }
1602 
1603  if (tv->stream_pq_local) {
1607  }
1608 
1609  s = (TmSlot *)tv->tm_slots;
1610  while (s) {
1611  ps = s;
1612  s = s->slot_next;
1613  SCFree(ps);
1614  }
1615 
1617  SCFree(tv);
1618 }
1619 
1621 {
1622  char *thread_group_name = NULL;
1623 
1624  if (name == NULL)
1625  return;
1626 
1627  if (tv == NULL)
1628  return;
1629 
1630  thread_group_name = SCStrdup(name);
1631  if (unlikely(thread_group_name == NULL)) {
1632  SCLogError("error allocating memory");
1633  return;
1634  }
1635  tv->thread_group_name = thread_group_name;
1636 }
1637 
1639 {
1640  ThreadVars *tv = NULL;
1641  ThreadVars *ptv = NULL;
1642 
1643  if ((family < 0) || (family >= TVT_MAX))
1644  return;
1645 
1647  tv = tv_root[family];
1648 
1649  while (tv) {
1650  ptv = tv;
1651  tv = tv->next;
1652  TmThreadFree(ptv);
1653  }
1654  tv_root[family] = NULL;
1656 }
1657 
1658 /**
1659  * \brief Spawns a thread associated with the ThreadVars instance tv
1660  *
1661  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1662  */
1664 {
1665  pthread_attr_t attr;
1666  if (tv->tm_func == NULL) {
1667  FatalError("No thread function set");
1668  }
1669 
1670  /* Initialize and set thread detached attribute */
1671  pthread_attr_init(&attr);
1672 
1673  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1674 
1675  /* Adjust thread stack size if configured */
1677  SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1678  if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1679  FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
1681  }
1682  }
1683 
1684  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1685  if (rc) {
1686  FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc,
1687  strerror(errno));
1688  }
1689 
1690 #if DEBUG && HAVE_PTHREAD_GETATTR_NP
1692  if (pthread_getattr_np(tv->t, &attr) == 0) {
1693  size_t stack_size;
1694  void *stack_addr;
1695  pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1696  SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1697  } else {
1698  SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1699  "pthread_getattr_np() is %" PRId32,
1700  rc);
1701  }
1702  }
1703 #endif
1704 
1706 
1707  TmThreadAppend(tv, tv->type);
1708  return TM_ECODE_OK;
1709 }
1710 
1711 /**
1712  * \brief Initializes the mutex and condition variables for this TV
1713  *
1714  * It can be used by a thread to control a wait loop that can also be
1715  * influenced by other threads.
1716  *
1717  * \param tv Pointer to a TV instance
1718  */
1720 {
1721  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1722  FatalError("Fatal error encountered in TmThreadInitMC. "
1723  "Exiting...");
1724  }
1725 
1726  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1727  printf("Error initializing the tv->m mutex\n");
1728  exit(EXIT_FAILURE);
1729  }
1730 
1731  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1732  FatalError("Fatal error encountered in TmThreadInitMC. "
1733  "Exiting...");
1734  }
1735 
1736  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1737  FatalError("Error initializing the tv->cond condition "
1738  "variable");
1739  }
1740 }
1741 
1742 static void TmThreadDeinitMC(ThreadVars *tv)
1743 {
1744  if (tv->ctrl_mutex) {
1746  SCFree(tv->ctrl_mutex);
1747  }
1748  if (tv->ctrl_cond) {
1750  SCFree(tv->ctrl_cond);
1751  }
1752 }
1753 
1754 /**
1755  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1756  * the kill flag has been set or not on the thread.
1757  *
1758  * \param tv Pointer to the TV instance.
1759  */
1761 {
1762  while (!TmThreadsCheckFlag(tv, flags)) {
1763  SleepUsec(100);
1764  }
1765 }
1766 
1767 /**
1768  * \brief Unpauses a thread
1769  *
1770  * \param tv Pointer to a TV instance that has to be unpaused
1771  */
1773 {
1775 }
1776 
1777 static TmEcode WaitOnThreadsRunningByType(const int t)
1778 {
1779  struct timeval start_ts;
1780  struct timeval cur_ts;
1781  uint32_t thread_cnt = 0;
1782 
1783  /* on retries, this will init to the last thread that started up already */
1784  ThreadVars *tv_start = tv_root[t];
1786  for (ThreadVars *tv = tv_start; tv != NULL; tv = tv->next) {
1787  thread_cnt++;
1788  }
1790 
1791  /* give threads a second each to start up, plus a margin of a minute. */
1792  uint32_t time_budget = 60 + thread_cnt;
1793 
1794  gettimeofday(&start_ts, NULL);
1795 again:
1797  ThreadVars *tv = tv_start;
1798  while (tv != NULL) {
1801 
1802  SCLogError("thread \"%s\" failed to "
1803  "start: flags %04x",
1804  tv->name, SC_ATOMIC_GET(tv->flags));
1805  return TM_ECODE_FAILED;
1806  }
1807 
1810 
1811  /* 60 seconds provided for the thread to transition from
1812  * THV_INIT_DONE to THV_RUNNING */
1813  gettimeofday(&cur_ts, NULL);
1814  if (((uint32_t)cur_ts.tv_sec - (uint32_t)start_ts.tv_sec) > time_budget) {
1815  SCLogError("thread \"%s\" failed to "
1816  "start in time: flags %04x. Total threads: %u. Time budget %us",
1817  tv->name, SC_ATOMIC_GET(tv->flags), thread_cnt, time_budget);
1818  return TM_ECODE_FAILED;
1819  }
1820 
1821  /* sleep a little to give the thread some
1822  * time to start running */
1823  SleepUsec(100);
1824  goto again;
1825  }
1826  tv_start = tv;
1827 
1828  tv = tv->next;
1829  }
1831  return TM_ECODE_OK;
1832 }
1833 
1834 /**
1835  * \brief Waits for all threads to be in a running state
1836  *
1837  * \retval TM_ECODE_OK if all are running or error if a thread failed
1838  */
1840 {
1841  uint16_t RX_num = 0;
1842  uint16_t W_num = 0;
1843  uint16_t FM_num = 0;
1844  uint16_t FR_num = 0;
1845  uint16_t TX_num = 0;
1846 
1847  for (int i = 0; i < TVT_MAX; i++) {
1848  if (WaitOnThreadsRunningByType(i) != TM_ECODE_OK)
1849  return TM_ECODE_FAILED;
1850  }
1851 
1853  for (int i = 0; i < TVT_MAX; i++) {
1854  for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
1855  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1856  RX_num++;
1857  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1858  W_num++;
1859  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1860  TX_num++;
1861  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1862  FM_num++;
1863  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1864  FR_num++;
1865  }
1866  }
1868 
1869  /* Construct a welcome string displaying
1870  * initialized thread types and counts */
1871  uint16_t app_len = 32;
1872  uint16_t buf_len = 256;
1873 
1874  char append_str[app_len];
1875  char thread_counts[buf_len];
1876 
1877  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1878  if (RX_num > 0) {
1879  snprintf(append_str, app_len, "RX: %u ", RX_num);
1880  strlcat(thread_counts, append_str, buf_len);
1881  }
1882  if (W_num > 0) {
1883  snprintf(append_str, app_len, "W: %u ", W_num);
1884  strlcat(thread_counts, append_str, buf_len);
1885  }
1886  if (TX_num > 0) {
1887  snprintf(append_str, app_len, "TX: %u ", TX_num);
1888  strlcat(thread_counts, append_str, buf_len);
1889  }
1890  if (FM_num > 0) {
1891  snprintf(append_str, app_len, "FM: %u ", FM_num);
1892  strlcat(thread_counts, append_str, buf_len);
1893  }
1894  if (FR_num > 0) {
1895  snprintf(append_str, app_len, "FR: %u ", FR_num);
1896  strlcat(thread_counts, append_str, buf_len);
1897  }
1898  snprintf(append_str, app_len, " Engine started.");
1899  strlcat(thread_counts, append_str, buf_len);
1900  SCLogNotice("%s", thread_counts);
1901 
1902  return TM_ECODE_OK;
1903 }
1904 
1905 /**
1906  * \brief Unpauses all threads present in tv_root
1907  */
1909 {
1911  for (int i = 0; i < TVT_MAX; i++) {
1912  ThreadVars *tv = tv_root[i];
1913  while (tv != NULL) {
1915  tv = tv->next;
1916  }
1917  }
1919 }
1920 
1921 /**
1922  * \brief Used to check the thread for certain conditions of failure.
1923  */
1925 {
1927  for (int i = 0; i < TVT_MAX; i++) {
1928  ThreadVars *tv = tv_root[i];
1929  while (tv) {
1931  FatalError("thread %s failed", tv->name);
1932  }
1933  tv = tv->next;
1934  }
1935  }
1937 }
1938 
1939 /**
1940  * \brief Used to check if all threads have finished their initialization. On
1941  * finding an un-initialized thread, it waits till that thread completes
1942  * its initialization, before proceeding to the next thread.
1943  *
1944  * \retval TM_ECODE_OK all initialized properly
1945  * \retval TM_ECODE_FAILED failure
1946  */
1948 {
1949  struct timeval start_ts;
1950  struct timeval cur_ts;
1951  gettimeofday(&start_ts, NULL);
1952 
1953 again:
1955  for (int i = 0; i < TVT_MAX; i++) {
1956  ThreadVars *tv = tv_root[i];
1957  while (tv != NULL) {
1960 
1961  SCLogError("thread \"%s\" failed to "
1962  "initialize: flags %04x",
1963  tv->name, SC_ATOMIC_GET(tv->flags));
1964  return TM_ECODE_FAILED;
1965  }
1966 
1967  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
1969 
1970  gettimeofday(&cur_ts, NULL);
1971  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
1972  SCLogError("thread \"%s\" failed to "
1973  "initialize in time: flags %04x",
1974  tv->name, SC_ATOMIC_GET(tv->flags));
1975  return TM_ECODE_FAILED;
1976  }
1977 
1978  /* sleep a little to give the thread some
1979  * time to finish initialization */
1980  SleepUsec(100);
1981  goto again;
1982  }
1983 
1986  SCLogError("thread \"%s\" failed to "
1987  "initialize.",
1988  tv->name);
1989  return TM_ECODE_FAILED;
1990  }
1993  SCLogError("thread \"%s\" closed on "
1994  "initialization.",
1995  tv->name);
1996  return TM_ECODE_FAILED;
1997  }
1998 
1999  tv = tv->next;
2000  }
2001  }
2003 
2004  return TM_ECODE_OK;
2005 }
2006 
2007 /**
2008  * \brief returns a count of all the threads that match the flag
2009  */
2011 {
2012  uint32_t cnt = 0;
2014  for (int i = 0; i < TVT_MAX; i++) {
2015  ThreadVars *tv = tv_root[i];
2016  while (tv != NULL) {
2017  if ((tv->tmm_flags & flags) == flags)
2018  cnt++;
2019 
2020  tv = tv->next;
2021  }
2022  }
2024  return cnt;
2025 }
2026 
2027 #ifdef DEBUG_VALIDATION
2028 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2029 {
2030  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2032  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2033  tv, s, s->tm_id, m->name);
2034  }
2035 }
2036 
2037 static void TmThreadDumpThreads(void)
2038 {
2040  for (int i = 0; i < TVT_MAX; i++) {
2041  ThreadVars *tv = tv_root[i];
2042  while (tv != NULL) {
2043  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2044  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2045  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2046  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2047  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2048  } else if (tv->stream_pq_local != NULL) {
2049  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2050  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2051  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2052  }
2053  }
2054  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2055  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2056  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2057  }
2058  TmThreadDoDumpSlots(tv);
2059  tv = tv->next;
2060  }
2061  }
2064 }
2065 #endif
2066 
2067 /* Aligned to CLS to avoid false sharing between atomic ops. */
2068 typedef struct Thread_ {
2069  ThreadVars *tv; /**< threadvars structure */
2070  const char *name;
2071  int type;
2072  int in_use; /**< bool to indicate this is in use */
2073 
2074  SC_ATOMIC_DECLARE(SCTime_t, pktts); /**< current packet time of this thread
2075  * (offline mode) */
2076  SCTime_t sys_sec_stamp; /**< timestamp in real system
2077  * time when the pktts was last updated. */
2079 } __attribute__((aligned(CLS))) Thread;
2081 typedef struct Threads_ {
2082  Thread *threads;
2083  size_t threads_size;
2084  int threads_cnt;
2086 
2087 static bool thread_store_sealed = false;
2088 static Threads thread_store = { NULL, 0, 0 };
2089 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2090 
2092 {
2093  SCMutexLock(&thread_store_lock);
2094  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2095  thread_store_sealed = true;
2096  SCMutexUnlock(&thread_store_lock);
2097 }
2098 
2100 {
2101  SCMutexLock(&thread_store_lock);
2102  DEBUG_VALIDATE_BUG_ON(!thread_store_sealed);
2103  thread_store_sealed = false;
2104  SCMutexUnlock(&thread_store_lock);
2105 }
2106 
2108 {
2109  SCMutexLock(&thread_store_lock);
2110  for (size_t s = 0; s < thread_store.threads_size; s++) {
2111  Thread *t = &thread_store.threads[s];
2112  if (t == NULL || t->in_use == 0)
2113  continue;
2114 
2115  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2116  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2117  if (t->tv) {
2118  ThreadVars *tv = t->tv;
2119  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2120  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2121  tv, tv->type, tv->name, tv->tmm_flags, flags);
2122  }
2123  }
2124  SCMutexUnlock(&thread_store_lock);
2125 }
2126 
2127 #define STEP 32
2128 /**
2129  * \retval id thread id, or 0 if not found
2130  */
2132 {
2133  SCMutexLock(&thread_store_lock);
2134  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2135  if (thread_store.threads == NULL) {
2136  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2137  BUG_ON(thread_store.threads == NULL);
2138  thread_store.threads_size = STEP;
2139  }
2140 
2141  size_t s;
2142  for (s = 0; s < thread_store.threads_size; s++) {
2143  if (thread_store.threads[s].in_use == 0) {
2144  Thread *t = &thread_store.threads[s];
2145  SCSpinInit(&t->spin, 0);
2146  SCSpinLock(&t->spin);
2147  t->name = tv->name;
2148  t->type = type;
2149  t->tv = tv;
2150  t->in_use = 1;
2151  SCSpinUnlock(&t->spin);
2152 
2153  SCMutexUnlock(&thread_store_lock);
2154  return (int)(s+1);
2155  }
2156  }
2157 
2158  /* if we get here the array is completely filled */
2159  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2160  BUG_ON(newmem == NULL);
2161  thread_store.threads = newmem;
2162  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2163 
2164  Thread *t = &thread_store.threads[thread_store.threads_size];
2165  SCSpinInit(&t->spin, 0);
2166  SCSpinLock(&t->spin);
2167  t->name = tv->name;
2168  t->type = type;
2169  t->tv = tv;
2170  t->in_use = 1;
2171  SCSpinUnlock(&t->spin);
2172 
2173  s = thread_store.threads_size;
2174  thread_store.threads_size += STEP;
2175 
2176  SCMutexUnlock(&thread_store_lock);
2177  return (int)(s+1);
2178 }
2179 #undef STEP
2180 
2181 void TmThreadsUnregisterThread(const int id)
2182 {
2183  SCMutexLock(&thread_store_lock);
2184  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2185  if (id <= 0 || id > (int)thread_store.threads_size) {
2186  SCMutexUnlock(&thread_store_lock);
2187  return;
2188  }
2189 
2190  /* id is one higher than index */
2191  int idx = id - 1;
2192 
2193  /* reset thread_id, which serves as clearing the record */
2194  thread_store.threads[idx].in_use = 0;
2195 
2196  /* check if we have at least one registered thread left */
2197  size_t s;
2198  for (s = 0; s < thread_store.threads_size; s++) {
2199  Thread *t = &thread_store.threads[s];
2200  if (t->in_use == 1) {
2201  goto end;
2202  }
2203  }
2204 
2205  /* if we get here no threads are registered */
2206  SCFree(thread_store.threads);
2207  thread_store.threads = NULL;
2208  thread_store.threads_size = 0;
2209  thread_store.threads_cnt = 0;
2210 
2211 end:
2212  SCMutexUnlock(&thread_store_lock);
2213 }
2214 
2215 void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2216 {
2217  SCTime_t now = SCTimeGetTime();
2218  int idx = id - 1;
2219  Thread *t = &thread_store.threads[idx];
2220  SCSpinLock(&t->spin);
2221  SC_ATOMIC_SET(t->pktts, ts);
2222 
2223 #ifdef DEBUG
2224  if (t->sys_sec_stamp.secs != 0) {
2225  SCTime_t tmpts = SCTIME_ADD_SECS(t->sys_sec_stamp, 3);
2226  if (SCTIME_CMP_LT(tmpts, now)) {
2227  SCLogDebug("%s: thread slept for %u secs", t->name, (uint32_t)(now.secs - tmpts.secs));
2228  }
2229  }
2230 #endif
2231 
2232  t->sys_sec_stamp = now;
2233  SCSpinUnlock(&t->spin);
2234 }
2235 
2237 {
2238  static SCTime_t nullts = SCTIME_INITIALIZER;
2239  bool ready = true;
2240  for (size_t s = 0; s < thread_store.threads_size; s++) {
2241  Thread *t = &thread_store.threads[s];
2242  if (!t->in_use) {
2243  break;
2244  }
2245  SCSpinLock(&t->spin);
2246  if (t->type != TVT_PPT) {
2247  SCSpinUnlock(&t->spin);
2248  continue;
2249  }
2250  if (SCTIME_CMP_EQ(t->sys_sec_stamp, nullts)) {
2251  ready = false;
2252  SCSpinUnlock(&t->spin);
2253  break;
2254  }
2255  SCSpinUnlock(&t->spin);
2256  }
2257  return ready;
2258 }
2259 
2261 {
2262  SCTime_t now = SCTimeGetTime();
2263  for (size_t s = 0; s < thread_store.threads_size; s++) {
2264  Thread *t = &thread_store.threads[s];
2265  if (!t->in_use) {
2266  break;
2267  }
2268  SCSpinLock(&t->spin);
2269  if (t->type != TVT_PPT) {
2270  SCSpinUnlock(&t->spin);
2271  continue;
2272  }
2273  SC_ATOMIC_SET(t->pktts, ts);
2274  t->sys_sec_stamp = now;
2275  SCSpinUnlock(&t->spin);
2276  }
2277 }
2278 
2280 {
2281  BUG_ON(idx == 0);
2282  const int i = idx - 1;
2283  Thread *t = &thread_store.threads[i];
2284  return SC_ATOMIC_GET(t->pktts);
2285 }
2286 
2287 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2288 {
2289  struct timeval local = { 0 };
2290  static SCTime_t nullts = SCTIME_INITIALIZER;
2291  bool set = false;
2292  SCTime_t now = SCTimeGetTime();
2293 
2294  for (size_t s = 0; s < thread_store.threads_size; s++) {
2295  Thread *t = &thread_store.threads[s];
2296  if (t->in_use == 0) {
2297  break;
2298  }
2299  SCSpinLock(&t->spin);
2300  /* only packet threads set timestamps based on packets */
2301  if (t->type != TVT_PPT) {
2302  SCSpinUnlock(&t->spin);
2303  continue;
2304  }
2305  SCTime_t pktts = SC_ATOMIC_GET(t->pktts);
2306  if (SCTIME_CMP_NEQ(pktts, nullts)) {
2307  SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5);
2308  /* ignore sleeping threads */
2309  if (SCTIME_CMP_LT(sys_sec_stamp, now)) {
2310  SCSpinUnlock(&t->spin);
2311  continue;
2312  }
2313  if (!set) {
2314  SCTIME_TO_TIMEVAL(&local, pktts);
2315  set = true;
2316  } else {
2317  if (SCTIME_CMP_LT(pktts, SCTIME_FROM_TIMEVAL(&local))) {
2318  SCTIME_TO_TIMEVAL(&local, pktts);
2319  }
2320  }
2321  }
2322  SCSpinUnlock(&t->spin);
2323  }
2324  *ts = local;
2325  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2326 }
2327 
2329 {
2330  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2331  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2332  /* always create at least one thread */
2333  if (thread_max == 0)
2334  thread_max = ncpus * threading_detect_ratio;
2335  if (thread_max < 1)
2336  thread_max = 1;
2337  if (thread_max > 1024) {
2338  SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2339  thread_max = 1024;
2340  }
2341  return (uint16_t)thread_max;
2342 }
2343 
2344 /** \brief inject a flow into a threads flow queue
2345  */
2346 void TmThreadsInjectFlowById(Flow *f, const int id)
2347 {
2348  BUG_ON(id <= 0 || id > (int)thread_store.threads_size);
2349 
2350  int idx = id - 1;
2351 
2352  Thread *t = &thread_store.threads[idx];
2353  ThreadVars *tv = t->tv;
2354 
2355  BUG_ON(tv == NULL || tv->flow_queue == NULL);
2356 
2357  FlowEnqueue(tv->flow_queue, f);
2358 
2359  /* wake up listening thread(s) if necessary */
2360  if (tv->inq != NULL) {
2361  SCMutexLock(&tv->inq->pq->mutex_q);
2362  SCCondSignal(&tv->inq->pq->cond_q);
2363  SCMutexUnlock(&tv->inq->pq->mutex_q);
2364  } else if (tv->break_loop) {
2365  TmThreadsCaptureBreakLoop(tv);
2366  }
2367 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:66
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:813
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:73
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:76
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:70
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:135
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2236
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1719
tm-threads.h
TmThreadsSealThreads
void TmThreadsSealThreads(void)
Definition: tm-threads.c:2091
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:70
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
THV_USE
#define THV_USE
Definition: threadvars.h:36
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:133
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1663
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:61
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1101
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
threading_set_cpu_affinity
bool threading_set_cpu_affinity
Definition: runmodes.c:60
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:854
SCTIME_CMP_NEQ
#define SCTIME_CMP_NEQ(a, b)
Definition: util-time.h:107
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:68
Thread_::SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(SCTime_t, pktts)
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:314
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:55
TmThreadContinueThreads
void TmThreadContinueThreads(void)
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1908
CLS
#define CLS
Definition: suricata-common.h:56
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1044
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:56
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:379
TmThreadSetGroupName
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1620
CaptureStatsSetup
void CaptureStatsSetup(ThreadVars *tv)
Definition: decode.c:988
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:105
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:48
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
PKT_SRC_SHUTDOWN_FLUSH
@ PKT_SRC_SHUTDOWN_FLUSH
Definition: decode.h:63
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:101
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1760
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:175
thread-callbacks.h
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:56
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:45
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:82
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:61
Packet_::flags
uint32_t flags
Definition: decode.h:513
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:75
threads.h
Threads
Threads
Definition: tm-threads.c:2085
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:357
TmThreadsUnsealThreads
void TmThreadsUnsealThreads(void)
Definition: tm-threads.c:2099
ThreadVars_::t
pthread_t t
Definition: threadvars.h:59
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:303
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:69
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2010
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:104
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:64
StatsSetupPrivate
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1204
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:117
MIN
#define MIN(x, y)
Definition: suricata-common.h:391
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:82
SCSpinLock
#define SCSpinLock
Definition: threads-debug.h:235
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:74
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1358
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:387
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:121
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:765
SCTIME_CMP_EQ
#define SCTIME_CMP_EQ(a, b)
Definition: util-time.h:108
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:89
m
SCMutex m
Definition: flow-hash.h:6
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:44
TmModule_::ThreadBusy
bool(* ThreadBusy)(ThreadVars *tv, void *thread_data)
Definition: tm-modules.h:63
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:143
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:314
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:81
rww_lock_contention
thread_local uint64_t rww_lock_contention
PacketQueueNoLock_
simple fifo queue for packets
Definition: packet-queue.h:34
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:173
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2127
Thread_::in_use
int in_use
Definition: tm-threads.c:2072
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:38
TmThreadsGetThreadTime
SCTime_t TmThreadsGetThreadTime(const int idx)
Definition: tm-threads.c:2279
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(void)
Disable all packet threads.
Definition: tm-threads.c:1487
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:244
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:80
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2346
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:81
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2287
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:88
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmThreadsProcessDecodePseudoPackets
TmEcode TmThreadsProcessDecodePseudoPackets(ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
Definition: tm-threads.c:114
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:46
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:46
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1324
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:109
util-signal.h
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:79
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:776
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1535
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:89
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2069
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1772
AffinityGetNextCPU
uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
Definition: util-affinity.c:282
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:57
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:1947
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
tv
ThreadVars * tv
Definition: tm-threads.c:2080
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:77
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:96
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:208
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:58
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
threading_set_stack_size
uint64_t threading_set_stack_size
Definition: runmodes.c:61
ThreadVars_::perf_public_ctx
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:128
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1270
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
Definition: tm-threads.c:2215
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:79
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:65
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1638
THV_KILL
#define THV_KILL
Definition: threadvars.h:40
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:913
FlowQueueNew
FlowQueue * FlowQueueNew(void)
Definition: flow-queue.c:35
SCSpinUnlock
#define SCSpinUnlock
Definition: threads-debug.h:237
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:825
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2181
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2131
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
thread-storage.h
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:90
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:88
__attribute__
struct Thread_ __attribute__((aligned(CLS)))
Definition: tm-threads.c:2079
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:125
ThreadVars_::id
int id
Definition: threadvars.h:87
ThreadVars_::type
uint8_t type
Definition: threadvars.h:72
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:300
SCTIME_TO_TIMEVAL
#define SCTIME_TO_TIMEVAL(tv, t)
Definition: util-time.h:97
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:88
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:85
SCReturn
#define SCReturn
Definition: util-debug.h:273
stream.h
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:376
Thread_::sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2076
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1566
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:139
Packet_
Definition: decode.h:476
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:133
ThreadVars_::thread_group_name
char * thread_group_name
Definition: threadvars.h:67
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:69
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1536
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:74
TmSlot_
Definition: tm-threads.h:53
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:67
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:69
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:79
name
const char * name
Definition: tm-threads.c:2081
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:89
runmodes.h
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1073
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:90
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
TmModule_
Definition: tm-modules.h:43
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:255
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
Definition: tm-threads.c:2260
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:116
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
SCTIME_CMP_LT
#define SCTIME_CMP_LT(a, b)
Definition: util-time.h:105
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:39
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:37
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:649
StatsPublicThreadContext_::m
SCMutex m
Definition: counters.h:75
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:378
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1537
Thread_::type
int type
Definition: tm-threads.c:2071
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:90
Packet_::flow
struct Flow_ * flow
Definition: decode.h:515
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1160
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:75
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:54
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:822
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:35
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:34
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:106
TmThreadsWaitForUnpause
bool TmThreadsWaitForUnpause(ThreadVars *tv)
Wait for a thread to become unpaused.
Definition: tm-threads.c:364
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:41
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:136
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:101
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:838
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:72
sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2087
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:77
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
FatalError
#define FatalError(...)
Definition: util-debug.h:502
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax(void)
Definition: tm-threads.c:2328
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:66
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1134
StatsSyncCounters
void StatsSyncCounters(ThreadVars *tv)
Definition: counters.c:444
ThreadStorageSize
unsigned int ThreadStorageSize(void)
Definition: thread-storage.c:25
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:144
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:216
threadvars.h
util-validate.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:232
SCSpinInit
#define SCSpinInit
Definition: threads-debug.h:238
SCThreadRunInitCallbacks
void SCThreadRunInitCallbacks(ThreadVars *tv)
Definition: thread-callbacks.c:48
ThreadsAffinityType_
Definition: util-affinity.h:65
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:37
THV_KILL_PKTACQ
#define THV_KILL_PKTACQ
Definition: threadvars.h:47
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:604
TmSlot_::tm_flags
uint8_t tm_flags
Definition: tm-threads.h:67
Thread_::name
const char * name
Definition: tm-threads.c:2070
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:131
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:50
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:91
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:74
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:67
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:73
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:317
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:84
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:112
SCSpinlock
#define SCSpinlock
Definition: threads-debug.h:234
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
suricata.h
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:87
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:456
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:142
Tmq_
Definition: tm-queues.h:29
Thread_::spin
SCSpinlock spin
Definition: tm-threads.c:2078
TmThreadWaitOnThreadRunning
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
Definition: tm-threads.c:1839
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2107
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1924
type
int type
Definition: tm-threads.c:2082
ThreadFreeStorage
void ThreadFreeStorage(ThreadVars *tv)
Definition: thread-storage.c:50
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:132
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:83
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:53
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:69
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:108
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:274
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:93
SCTIME_INITIALIZER
#define SCTIME_INITIALIZER
Definition: util-time.h:51
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
SCTIME_ADD_SECS
#define SCTIME_ADD_SECS(ts, s)
Definition: util-time.h:64
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:42
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2068
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:275
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
StatsThreadCleanup
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1303
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:63
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:267
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:102
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:76
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:928
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:359
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:169
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:350