suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2024 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "thread-callbacks.h"
33 #include "threadvars.h"
34 #include "thread-storage.h"
35 #include "tm-queues.h"
36 #include "tm-queuehandlers.h"
37 #include "tm-threads.h"
38 #include "tmqh-packetpool.h"
39 #include "threads.h"
40 #include "util-affinity.h"
41 #include "util-debug.h"
42 #include "util-privs.h"
43 #include "util-cpu.h"
44 #include "util-optimize.h"
45 #include "util-profiling.h"
46 #include "util-signal.h"
47 #include "queue.h"
48 #include "util-validate.h"
49 
50 #ifdef PROFILE_LOCKING
51 thread_local uint64_t mutex_lock_contention;
52 thread_local uint64_t mutex_lock_wait_ticks;
53 thread_local uint64_t mutex_lock_cnt;
54 
55 thread_local uint64_t spin_lock_contention;
56 thread_local uint64_t spin_lock_wait_ticks;
57 thread_local uint64_t spin_lock_cnt;
58 
59 thread_local uint64_t rww_lock_contention;
60 thread_local uint64_t rww_lock_wait_ticks;
61 thread_local uint64_t rww_lock_cnt;
62 
63 thread_local uint64_t rwr_lock_contention;
64 thread_local uint64_t rwr_lock_wait_ticks;
65 thread_local uint64_t rwr_lock_cnt;
66 #endif
67 
68 #ifdef OS_FREEBSD
69 #include <sched.h>
70 #include <sys/param.h>
71 #include <sys/resource.h>
72 #include <sys/cpuset.h>
73 #include <sys/thr.h>
74 #define cpu_set_t cpuset_t
75 #endif /* OS_FREEBSD */
76 
77 /* prototypes */
78 static int SetCPUAffinity(uint16_t cpu);
79 static void TmThreadDeinitMC(ThreadVars *tv);
80 
81 /* root of the threadvars list */
82 ThreadVars *tv_root[TVT_MAX] = { NULL };
83 
84 /* lock to protect tv_root */
86 
87 /**
88  * \brief Check if a thread flag is set.
89  *
90  * \retval 1 flag is set.
91  * \retval 0 flag is not set.
92  */
93 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
94 {
95  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
96 }
97 
98 /**
99  * \brief Set a thread flag.
100  */
101 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
102 {
103  SC_ATOMIC_OR(tv->flags, flag);
104 }
105 
106 /**
107  * \brief Unset a thread flag.
108  */
109 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
110 {
111  SC_ATOMIC_AND(tv->flags, ~flag);
112 }
113 
115  ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
116 {
117  while (decode_pq->top != NULL) {
118  Packet *extra_p = PacketDequeueNoLock(decode_pq);
119  if (unlikely(extra_p == NULL))
120  continue;
121  DEBUG_VALIDATE_BUG_ON(extra_p->flow != NULL);
122 
123  if (TmThreadsSlotProcessPkt(tv, slot, extra_p) != TM_ECODE_OK) {
125  }
126  }
128 }
129 
130 /**
131  * \brief Separate run function so we can call it recursively.
132  */
134 {
135  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
136  PACKET_PROFILING_TMM_START(p, s->tm_id);
137  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
138  PACKET_PROFILING_TMM_END(p, s->tm_id);
139  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
140 
141  /* handle error */
142  if (unlikely(r == TM_ECODE_FAILED)) {
143  /* Encountered error. Return packets to packetpool and return */
144  TmThreadsSlotProcessPktFail(tv, NULL);
145  return TM_ECODE_FAILED;
146  }
147  if (s->tm_flags & TM_FLAG_DECODE_TM) {
148  if (TmThreadsProcessDecodePseudoPackets(tv, &tv->decode_pq, s->slot_next) !=
149  TM_ECODE_OK) {
150  return TM_ECODE_FAILED;
151  }
152  }
153  }
154 
155  return TM_ECODE_OK;
156 }
157 
158 /** \internal
159  *
160  * \brief Process flow timeout packets
161  *
162  * Process flow timeout pseudo packets. During shutdown this loop
163  * is run until the flow engine kills the thread and the queue is
164  * empty.
165  */
167 {
168  TmSlot *fw_slot = tv->tm_flowworker;
169  int r = TM_ECODE_OK;
170 
171  if (tv->stream_pq == NULL || fw_slot == NULL) {
172  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
173  return r;
174  }
175 
176  SCLogDebug("flow end loop starting");
177  while (1) {
179  uint32_t len = tv->stream_pq->len;
181  if (len > 0) {
182  while (len--) {
186  if (likely(p)) {
187  DEBUG_VALIDATE_BUG_ON(p->flow != NULL);
188  r = TmThreadsSlotProcessPkt(tv, fw_slot, p);
189  if (r == TM_ECODE_FAILED) {
190  break;
191  }
192  }
193  }
194  } else {
196  break;
197  }
198  SleepUsec(1);
199  }
200  }
201  SCLogDebug("flow end loop complete");
203 
204  return r;
205 }
206 
207 static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv)
208 {
209  TmSlot *s = tv->tm_slots;
210 
212 
213  if (tv->thread_setup_flags != 0)
215 
217  PacketPoolInit();
218 
219  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
220  if (slot->SlotThreadInit != NULL) {
221  void *slot_data = NULL;
222  TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
223  if (r != TM_ECODE_OK) {
224  if (r == TM_ECODE_DONE) {
225  EngineDone();
227  goto error;
228  } else {
230  goto error;
231  }
232  }
233  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
234  }
235 
236  /* if the flowworker module is the first, get the threads input queue */
237  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
238  tv->stream_pq = tv->inq->pq;
239  tv->tm_flowworker = slot;
240  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
242  if (tv->flow_queue == NULL) {
244  goto error;
245  }
246  /* setup a queue */
247  } else if (slot->tm_id == TMM_FLOWWORKER) {
248  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
249  if (tv->stream_pq_local == NULL)
250  FatalError("failed to alloc PacketQueue");
253  tv->tm_flowworker = slot;
254  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
256  if (tv->flow_queue == NULL) {
258  goto error;
259  }
260  }
261  }
262 
264 
266 
267  return true;
268 
269 error:
270  return false;
271 }
272 
274 {
275  TmSlot *s = tv->tm_slots;
276  bool rc = true;
277 
279 
281 
282  /* process all pseudo packets the flow timeout may throw at us */
284 
287 
289 
290  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
291  if (slot->SlotThreadExitPrintStats != NULL) {
292  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
293  }
294 
295  if (slot->SlotThreadDeinit != NULL) {
296  TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
297  if (r != TM_ECODE_OK) {
299  rc = false;
300  break;
301  }
302  }
303  }
304 
305  tv->stream_pq = NULL;
307  return rc;
308 }
309 
310 static void *TmThreadsSlotPktAcqLoop(void *td)
311 {
312  ThreadVars *tv = (ThreadVars *)td;
313  TmSlot *s = tv->tm_slots;
314  TmEcode r = TM_ECODE_OK;
315 
316  /* check if we are setup properly */
317  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
318  SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
319  " PktAcqLoop=%p, tmqh_in=%p,"
320  " tmqh_out=%p",
321  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
323  pthread_exit(NULL);
324  return NULL;
325  }
326 
327  if (!TmThreadsSlotPktAcqLoopInit(td)) {
328  goto error;
329  }
330 
331  bool run = TmThreadsWaitForUnpause(tv);
332 
333  while (run) {
334  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
335 
336  if (r == TM_ECODE_FAILED) {
338  run = false;
339  }
341  run = false;
342  }
343  if (r == TM_ECODE_DONE) {
344  run = false;
345  }
346  }
348  goto error;
349  }
350 
351  SCLogDebug("%s ending", tv->name);
352  pthread_exit((void *) 0);
353  return NULL;
354 
355 error:
356  pthread_exit(NULL);
357  return NULL;
358 }
359 
360 /**
361  * Also returns if the kill flag is set.
362  */
364 {
367 
368  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
369  SleepUsec(100);
370 
372  return false;
373  }
374 
376  }
377 
378  return true;
379 }
380 
381 static void *TmThreadsLib(void *td)
382 {
383  ThreadVars *tv = (ThreadVars *)td;
384  TmSlot *s = tv->tm_slots;
385 
386  /* check if we are setup properly */
387  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
388  SCLogError("TmSlot or ThreadVars badly setup: s=%p, tmqh_in=%p,"
389  " tmqh_out=%p",
390  s, tv->tmqh_in, tv->tmqh_out);
392  return NULL;
393  }
394 
395  if (!TmThreadsSlotPktAcqLoopInit(tv)) {
396  goto error;
397  }
398 
399  if (!TmThreadsWaitForUnpause(tv)) {
400  goto error;
401  }
402 
403  return NULL;
404 
405 error:
406  tv->stream_pq = NULL;
407  return (void *)-1;
408 }
409 
410 static void *TmThreadsSlotVar(void *td)
411 {
412  ThreadVars *tv = (ThreadVars *)td;
413  TmSlot *s = (TmSlot *)tv->tm_slots;
414  Packet *p = NULL;
415  TmEcode r = TM_ECODE_OK;
416 
418  PacketPoolInit();//Empty();
419 
421 
422  if (tv->thread_setup_flags != 0)
424 
425  /* Drop the capabilities for this thread */
426  SCDropCaps(tv);
427 
428  /* check if we are setup properly */
429  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
431  pthread_exit(NULL);
432  return NULL;
433  }
434 
435  for (; s != NULL; s = s->slot_next) {
436  if (s->SlotThreadInit != NULL) {
437  void *slot_data = NULL;
438  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
439  if (r != TM_ECODE_OK) {
441  goto error;
442  }
443  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
444  }
445 
446  /* special case: we need to access the stream queue
447  * from the flow timeout code */
448 
449  /* if the flowworker module is the first, get the threads input queue */
450  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
451  tv->stream_pq = tv->inq->pq;
452  tv->tm_flowworker = s;
453  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
455  if (tv->flow_queue == NULL) {
457  pthread_exit(NULL);
458  return NULL;
459  }
460  /* setup a queue */
461  } else if (s->tm_id == TMM_FLOWWORKER) {
462  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
463  if (tv->stream_pq_local == NULL)
464  FatalError("failed to alloc PacketQueue");
467  tv->tm_flowworker = s;
468  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
470  if (tv->flow_queue == NULL) {
472  pthread_exit(NULL);
473  return NULL;
474  }
475  }
476  }
477 
479 
480  // Each 'worker' thread uses this func to process/decode the packet read.
481  // Each decode method is different to receive methods in that they do not
482  // enter infinite loops. They use this as the core loop. As a result, at this
483  // point the worker threads can be considered both initialized and running.
485  bool run = TmThreadsWaitForUnpause(tv);
486 
487  s = (TmSlot *)tv->tm_slots;
488 
489  while (run) {
490  /* input a packet */
491  p = tv->tmqh_in(tv);
492 
493  /* if we didn't get a packet see if we need to do some housekeeping */
494  if (unlikely(p == NULL)) {
495  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty)) {
497  if (p != NULL) {
498  p->flags |= PKT_PSEUDO_STREAM_END;
500  }
501  }
502  }
503 
504  if (p != NULL) {
505  /* run the thread module(s) */
506  r = TmThreadsSlotVarRun(tv, p, s);
507  if (r == TM_ECODE_FAILED) {
510  break;
511  }
512 
513  /* output the packet */
514  tv->tmqh_out(tv, p);
515 
516  /* now handle the stream pq packets */
517  TmThreadsHandleInjectedPackets(tv);
518  }
519 
521  run = false;
522  }
523  } /* while (run) */
525 
528 
530 
531  s = (TmSlot *)tv->tm_slots;
532 
533  for ( ; s != NULL; s = s->slot_next) {
534  if (s->SlotThreadExitPrintStats != NULL) {
535  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
536  }
537 
538  if (s->SlotThreadDeinit != NULL) {
539  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
540  if (r != TM_ECODE_OK) {
542  goto error;
543  }
544  }
545  }
546 
547  SCLogDebug("%s ending", tv->name);
548  tv->stream_pq = NULL;
550  pthread_exit((void *) 0);
551  return NULL;
552 
553 error:
554  tv->stream_pq = NULL;
555  pthread_exit(NULL);
556  return NULL;
557 }
558 
559 static void *TmThreadsManagement(void *td)
560 {
561  ThreadVars *tv = (ThreadVars *)td;
562  TmSlot *s = (TmSlot *)tv->tm_slots;
563  TmEcode r = TM_ECODE_OK;
564 
565  BUG_ON(s == NULL);
566 
568 
569  if (tv->thread_setup_flags != 0)
571 
572  /* Drop the capabilities for this thread */
573  SCDropCaps(tv);
574 
575  SCLogDebug("%s starting", tv->name);
576 
577  if (s->SlotThreadInit != NULL) {
578  void *slot_data = NULL;
579  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
580  if (r != TM_ECODE_OK) {
582  pthread_exit(NULL);
583  return NULL;
584  }
585  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
586  }
587 
589 
591 
592  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
593  /* handle error */
594  if (r == TM_ECODE_FAILED) {
596  }
597 
600  }
601 
604 
605  if (s->SlotThreadExitPrintStats != NULL) {
606  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
607  }
608 
609  if (s->SlotThreadDeinit != NULL) {
610  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
611  if (r != TM_ECODE_OK) {
613  pthread_exit(NULL);
614  return NULL;
615  }
616  }
617 
619  pthread_exit((void *) 0);
620  return NULL;
621 }
622 
623 /**
624  * \brief We set the slot functions.
625  *
626  * \param tv Pointer to the TV to set the slot function for.
627  * \param name Name of the slot variant.
628  * \param fn_p Pointer to a custom slot function. Used only if slot variant
629  * "name" is "custom".
630  *
631  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
632  */
633 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
634 {
635  if (name == NULL) {
636  if (fn_p == NULL) {
637  printf("Both slot name and function pointer can't be NULL inside "
638  "TmThreadSetSlots\n");
639  goto error;
640  } else {
641  name = "custom";
642  }
643  }
644 
645  if (strcmp(name, "varslot") == 0) {
646  tv->tm_func = TmThreadsSlotVar;
647  } else if (strcmp(name, "pktacqloop") == 0) {
648  tv->tm_func = TmThreadsSlotPktAcqLoop;
649  } else if (strcmp(name, "management") == 0) {
650  tv->tm_func = TmThreadsManagement;
651  } else if (strcmp(name, "command") == 0) {
652  tv->tm_func = TmThreadsManagement;
653  } else if (strcmp(name, "lib") == 0) {
654  tv->tm_func = TmThreadsLib;
655  } else if (strcmp(name, "custom") == 0) {
656  if (fn_p == NULL)
657  goto error;
658  tv->tm_func = fn_p;
659  } else {
660  printf("Error: Slot \"%s\" not supported\n", name);
661  goto error;
662  }
663 
664  return TM_ECODE_OK;
665 
666 error:
667  return TM_ECODE_FAILED;
668 }
669 
670 /**
671  * \brief Appends a new entry to the slots.
672  *
673  * \param tv TV the slot is attached to.
674  * \param tm TM to append.
675  * \param data Data to be passed on to the slot init function.
676  *
677  * \retval The allocated TmSlot or NULL if there is an error
678  */
679 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
680 {
681  TmSlot *slot = SCCalloc(1, sizeof(TmSlot));
682  if (unlikely(slot == NULL))
683  return;
684  SC_ATOMIC_INITPTR(slot->slot_data);
685  slot->SlotThreadInit = tm->ThreadInit;
686  slot->slot_initdata = data;
687  if (tm->Func) {
688  slot->SlotFunc = tm->Func;
689  } else if (tm->PktAcqLoop) {
690  slot->PktAcqLoop = tm->PktAcqLoop;
691  if (tm->PktAcqBreakLoop) {
692  tv->break_loop = true;
693  }
694  } else if (tm->Management) {
695  slot->Management = tm->Management;
696  }
698  slot->SlotThreadDeinit = tm->ThreadDeinit;
699  /* we don't have to check for the return value "-1". We wouldn't have
700  * received a TM as arg, if it didn't exist */
701  slot->tm_id = TmModuleGetIDForTM(tm);
702  slot->tm_flags |= tm->flags;
703 
704  tv->tmm_flags |= tm->flags;
705  tv->cap_flags |= tm->cap_flags;
706 
707  if (tv->tm_slots == NULL) {
708  tv->tm_slots = slot;
709  } else {
710  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
711 
712  /* get the last slot */
713  for ( ; a != NULL; a = a->slot_next) {
714  b = a;
715  }
716  /* append the new slot */
717  if (b != NULL) {
718  b->slot_next = slot;
719  }
720  }
721 }
722 
723 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
724 static int SetCPUAffinitySet(cpu_set_t *cs)
725 {
726 #if defined OS_FREEBSD
727  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
728  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
729 #elif OS_DARWIN
730  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
731  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
732 #else
733  pid_t tid = (pid_t)syscall(SYS_gettid);
734  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
735 #endif /* OS_FREEBSD */
736 
737  if (r != 0) {
738  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
739  strerror(errno));
740  return -1;
741  }
742 
743  return 0;
744 }
745 #endif
746 
747 
748 /**
749  * \brief Set the thread affinity on the calling thread.
750  *
751  * \param cpuid Id of the core/cpu to setup the affinity.
752  *
753  * \retval 0 If all goes well; -1 if something is wrong.
754  */
755 static int SetCPUAffinity(uint16_t cpuid)
756 {
757 #if defined __OpenBSD__ || defined sun
758  return 0;
759 #else
760  int cpu = (int)cpuid;
761 
762 #if defined OS_WIN32 || defined __CYGWIN__
763  DWORD cs = 1 << cpu;
764 
765  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
766  if (r != 0) {
767  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
768  strerror(errno));
769  return -1;
770  }
771  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
772  SCGetThreadIdLong(), cpu);
773 
774  return 0;
775 
776 #else
777  cpu_set_t cs;
778  memset(&cs, 0, sizeof(cs));
779 
780  CPU_ZERO(&cs);
781  CPU_SET(cpu, &cs);
782  return SetCPUAffinitySet(&cs);
783 #endif /* windows */
784 #endif /* not supported */
785 }
786 
787 
788 /**
789  * \brief Set the thread options (thread priority).
790  *
791  * \param tv Pointer to the ThreadVars to setup the thread priority.
792  *
793  * \retval TM_ECODE_OK.
794  */
796 {
798  tv->thread_priority = prio;
799 
800  return TM_ECODE_OK;
801 }
802 
803 /**
804  * \brief Adjusting nice value for threads.
805  */
807 {
808  SCEnter();
809 #ifndef __CYGWIN__
810 #ifdef OS_WIN32
811  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
812  SCLogError("Error setting priority for "
813  "thread %s: %s",
814  tv->name, strerror(errno));
815  } else {
816  SCLogDebug("Priority set to %"PRId32" for thread %s",
818  }
819 #else
820  int ret = nice(tv->thread_priority);
821  if (ret == -1) {
822  SCLogError("Error setting nice value %d "
823  "for thread %s: %s",
824  tv->thread_priority, tv->name, strerror(errno));
825  } else {
826  SCLogDebug("Nice value set to %"PRId32" for thread %s",
828  }
829 #endif /* OS_WIN32 */
830 #endif
831  SCReturn;
832 }
833 
834 
835 /**
836  * \brief Set the thread options (cpu affinity).
837  *
838  * \param tv pointer to the ThreadVars to setup the affinity.
839  * \param cpu cpu on which affinity is set.
840  *
841  * \retval TM_ECODE_OK
842  */
844 {
846  tv->cpu_affinity = cpu;
847 
848  return TM_ECODE_OK;
849 }
850 
851 
853 {
855  return TM_ECODE_OK;
856 
857  if (type > MAX_CPU_SET) {
858  SCLogError("invalid cpu type family");
859  return TM_ECODE_FAILED;
860  }
861 
863  tv->cpu_affinity = type;
864 
865  return TM_ECODE_OK;
866 }
867 
869 {
870  if (type >= MAX_CPU_SET) {
871  SCLogError("invalid cpu type family");
872  return 0;
873  }
874 
876 }
877 
878 /**
879  * \brief Set the thread options (cpu affinitythread).
880  * Priority should be already set by pthread_create.
881  *
882  * \param tv pointer to the ThreadVars of the calling thread.
883  */
885 {
887  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
888  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
890  SetCPUAffinity(tv->cpu_affinity);
891  }
892 
893 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
898  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
899  uint16_t cpu = AffinityGetNextCPU(taf);
900  SetCPUAffinity(cpu);
901  /* If CPU is in a set overwrite the default thread prio */
902  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
904  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
906  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
908  } else {
909  tv->thread_priority = taf->prio;
910  }
911  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
912  "%d, thread id %lu", tv->thread_priority,
913  tv->name, cpu, SCGetThreadIdLong());
914  } else {
915  SetCPUAffinitySet(&taf->cpu_set);
916  tv->thread_priority = taf->prio;
917  SCLogPerf("Setting prio %d for thread \"%s\", "
918  "thread id %lu", tv->thread_priority,
920  }
922  }
923 #endif
924 
925  return TM_ECODE_OK;
926 }
927 
928 /**
929  * \brief Creates and returns the TV instance for a new thread.
930  *
931  * \param name Name of this TV instance
932  * \param inq_name Incoming queue name
933  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
934  * \param outq_name Outgoing queue name
935  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
936  * \param slots String representation for the slot function to be used
937  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
938  * \param mucond Flag to indicate whether to initialize the condition
939  * and the mutex variables for this newly created TV.
940  *
941  * \retval the newly created TV instance, or NULL on error
942  */
943 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
944  const char *outq_name, const char *outqh_name, const char *slots,
945  void * (*fn_p)(void *), int mucond)
946 {
947  ThreadVars *tv = NULL;
948  Tmq *tmq = NULL;
949  Tmqh *tmqh = NULL;
950 
951  SCLogDebug("creating thread \"%s\"...", name);
952 
953  /* XXX create separate function for this: allocate a thread container */
954  tv = SCCalloc(1, sizeof(ThreadVars) + ThreadStorageSize());
955  if (unlikely(tv == NULL))
956  goto error;
957 
958  SC_ATOMIC_INIT(tv->flags);
959  SCMutexInit(&tv->perf_public_ctx.m, NULL);
960 
961  strlcpy(tv->name, name, sizeof(tv->name));
962 
963  /* default state for every newly created thread */
966 
967  /* set the incoming queue */
968  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
969  SCLogDebug("inq_name \"%s\"", inq_name);
970 
971  tmq = TmqGetQueueByName(inq_name);
972  if (tmq == NULL) {
973  tmq = TmqCreateQueue(inq_name);
974  if (tmq == NULL)
975  goto error;
976  }
977  SCLogDebug("tmq %p", tmq);
978 
979  tv->inq = tmq;
980  tv->inq->reader_cnt++;
981  SCLogDebug("tv->inq %p", tv->inq);
982  }
983  if (inqh_name != NULL) {
984  SCLogDebug("inqh_name \"%s\"", inqh_name);
985 
986  int id = TmqhNameToID(inqh_name);
987  if (id <= 0) {
988  goto error;
989  }
990  tmqh = TmqhGetQueueHandlerByName(inqh_name);
991  if (tmqh == NULL)
992  goto error;
993 
994  tv->tmqh_in = tmqh->InHandler;
995  tv->inq_id = (uint8_t)id;
996  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
997  }
998 
999  /* set the outgoing queue */
1000  if (outqh_name != NULL) {
1001  SCLogDebug("outqh_name \"%s\"", outqh_name);
1002 
1003  int id = TmqhNameToID(outqh_name);
1004  if (id <= 0) {
1005  goto error;
1006  }
1007 
1008  tmqh = TmqhGetQueueHandlerByName(outqh_name);
1009  if (tmqh == NULL)
1010  goto error;
1011 
1012  tv->tmqh_out = tmqh->OutHandler;
1013  tv->outq_id = (uint8_t)id;
1014 
1015  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1016  SCLogDebug("outq_name \"%s\"", outq_name);
1017 
1018  if (tmqh->OutHandlerCtxSetup != NULL) {
1019  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1020  if (tv->outctx == NULL)
1021  goto error;
1022  tv->outq = NULL;
1023  } else {
1024  tmq = TmqGetQueueByName(outq_name);
1025  if (tmq == NULL) {
1026  tmq = TmqCreateQueue(outq_name);
1027  if (tmq == NULL)
1028  goto error;
1029  }
1030  SCLogDebug("tmq %p", tmq);
1031 
1032  tv->outq = tmq;
1033  tv->outctx = NULL;
1034  tv->outq->writer_cnt++;
1035  }
1036  }
1037  }
1038 
1039  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1040  goto error;
1041  }
1042 
1043  if (mucond != 0)
1044  TmThreadInitMC(tv);
1045 
1047 
1048  return tv;
1049 
1050 error:
1051  SCLogError("failed to setup a thread");
1052 
1053  if (tv != NULL)
1054  SCFree(tv);
1055  return NULL;
1056 }
1057 
1058 /**
1059  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1060  * This function doesn't support custom slots, and hence shouldn't be
1061  * supplied \"custom\" as its slot type. All PPT threads are created
1062  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1063  * conditional variables are not used to kill the thread.
1064  *
1065  * \param name Name of this TV instance
1066  * \param inq_name Incoming queue name
1067  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1068  * \param outq_name Outgoing queue name
1069  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1070  * \param slots String representation for the slot function to be used
1071  *
1072  * \retval the newly created TV instance, or NULL on error
1073  */
1074 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1075  const char *inqh_name, const char *outq_name,
1076  const char *outqh_name, const char *slots)
1077 {
1078  ThreadVars *tv = NULL;
1079 
1080  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1081  slots, NULL, 0);
1082 
1083  if (tv != NULL) {
1084  tv->type = TVT_PPT;
1086  }
1087 
1088  return tv;
1089 }
1090 
1091 /**
1092  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1093  * This function supports only custom slot functions and hence a
1094  * function pointer should be sent as an argument.
1095  *
1096  * \param name Name of this TV instance
1097  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1098  * \param mucond Flag to indicate whether to initialize the condition
1099  * and the mutex variables for this newly created TV.
1100  *
1101  * \retval the newly created TV instance, or NULL on error
1102  */
1103 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1104  int mucond)
1105 {
1106  ThreadVars *tv = NULL;
1107 
1108  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1109 
1110  if (tv != NULL) {
1111  tv->type = TVT_MGMT;
1114  }
1115 
1116  return tv;
1117 }
1118 
1119 /**
1120  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1121  * This function supports only custom slot functions and hence a
1122  * function pointer should be sent as an argument.
1123  *
1124  * \param name Name of this TV instance
1125  * \param module Name of TmModule with MANAGEMENT flag set.
1126  * \param mucond Flag to indicate whether to initialize the condition
1127  * and the mutex variables for this newly created TV.
1128  *
1129  * \retval the newly created TV instance, or NULL on error
1130  */
1131 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1132  int mucond)
1133 {
1134  ThreadVars *tv = NULL;
1135 
1136  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1137 
1138  if (tv != NULL) {
1139  tv->type = TVT_MGMT;
1142 
1143  TmModule *m = TmModuleGetByName(module);
1144  if (m) {
1145  TmSlotSetFuncAppend(tv, m, NULL);
1146  }
1147  }
1148 
1149  return tv;
1150 }
1151 
1152 /**
1153  * \brief Creates and returns the TV instance for a Command thread (CMD).
1154  * This function supports only custom slot functions and hence a
1155  * function pointer should be sent as an argument.
1156  *
1157  * \param name Name of this TV instance
1158  * \param module Name of TmModule with COMMAND flag set.
1159  * \param mucond Flag to indicate whether to initialize the condition
1160  * and the mutex variables for this newly created TV.
1161  *
1162  * \retval the newly created TV instance, or NULL on error
1163  */
1164 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1165  int mucond)
1166 {
1167  ThreadVars *tv = NULL;
1168 
1169  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1170 
1171  if (tv != NULL) {
1172  tv->type = TVT_CMD;
1175 
1176  TmModule *m = TmModuleGetByName(module);
1177  if (m) {
1178  TmSlotSetFuncAppend(tv, m, NULL);
1179  }
1180  }
1181 
1182  return tv;
1183 }
1184 
1185 /**
1186  * \brief Appends this TV to tv_root based on its type
1187  *
1188  * \param type holds the type this TV belongs to.
1189  */
1191 {
1193 
1194  if (tv_root[type] == NULL) {
1195  tv_root[type] = tv;
1196  tv->next = NULL;
1197 
1199 
1200  return;
1201  }
1202 
1203  ThreadVars *t = tv_root[type];
1204 
1205  while (t) {
1206  if (t->next == NULL) {
1207  t->next = tv;
1208  tv->next = NULL;
1209  break;
1210  }
1211 
1212  t = t->next;
1213  }
1214 
1216 }
1217 
1218 static bool ThreadStillHasPackets(ThreadVars *tv)
1219 {
1220  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1221  /* we wait till we dry out all the inq packets, before we
1222  * kill this thread. Do note that you should have disabled
1223  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1224  PacketQueue *q = tv->inq->pq;
1225  SCMutexLock(&q->mutex_q);
1226  uint32_t len = q->len;
1227  SCMutexUnlock(&q->mutex_q);
1228  if (len != 0) {
1229  return true;
1230  }
1231  }
1232 
1233  if (tv->stream_pq != NULL) {
1235  uint32_t len = tv->stream_pq->len;
1237 
1238  if (len != 0) {
1239  return true;
1240  }
1241  }
1242  return false;
1243 }
1244 
1245 /**
1246  * \brief Kill a thread.
1247  *
1248  * \param tv A ThreadVars instance corresponding to the thread that has to be
1249  * killed.
1250  *
1251  * \retval r 1 killed successfully
1252  * 0 not yet ready, needs another look
1253  */
1254 static int TmThreadKillThread(ThreadVars *tv)
1255 {
1256  BUG_ON(tv == NULL);
1257 
1258  /* kill only once :) */
1259  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1260  return 1;
1261  }
1262 
1263  /* set the thread flag informing the thread that it needs to be
1264  * terminated */
1267 
1268  /* to be sure, signal more */
1269  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1270  if (tv->inq_id != TMQH_NOT_SET) {
1272  if (qh != NULL && qh->InShutdownHandler != NULL) {
1273  qh->InShutdownHandler(tv);
1274  }
1275  }
1276  if (tv->inq != NULL) {
1277  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1278  SCMutexLock(&tv->inq->pq->mutex_q);
1279  SCCondSignal(&tv->inq->pq->cond_q);
1280  SCMutexUnlock(&tv->inq->pq->mutex_q);
1281  }
1282  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1283  }
1284 
1285  if (tv->ctrl_cond != NULL ) {
1287  pthread_cond_broadcast(tv->ctrl_cond);
1289  }
1290  return 0;
1291  }
1292 
1293  if (tv->outctx != NULL) {
1294  if (tv->outq_id != TMQH_NOT_SET) {
1296  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1297  qh->OutHandlerCtxFree(tv->outctx);
1298  tv->outctx = NULL;
1299  }
1300  }
1301  }
1302 
1303  /* Join the thread and flag as dead, unless the thread ID is 0 as
1304  * its not a thread created by Suricata. */
1305  if (tv->t) {
1306  pthread_join(tv->t, NULL);
1307  SCLogDebug("thread %s stopped", tv->name);
1308  }
1310  return 1;
1311 }
1312 
1313 static bool ThreadBusy(ThreadVars *tv)
1314 {
1315  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
1316  TmModule *tm = TmModuleGetById(s->tm_id);
1317  if (tm && tm->ThreadBusy != NULL) {
1318  if (tm->ThreadBusy(tv, SC_ATOMIC_GET(s->slot_data)))
1319  return true;
1320  }
1321  }
1322  return false;
1323 }
1324 
1325 /** \internal
1326  *
1327  * \brief make sure that all packet threads are done processing their
1328  * in-flight packets, including 'injected' flow packets.
1329  */
1330 static void TmThreadDrainPacketThreads(void)
1331 {
1332  ThreadVars *tv = NULL;
1333  struct timeval start_ts;
1334  struct timeval cur_ts;
1335  gettimeofday(&start_ts, NULL);
1336 
1337 again:
1338  gettimeofday(&cur_ts, NULL);
1339  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1340  SCLogWarning("unable to get all packet threads "
1341  "to process their packets in time");
1342  return;
1343  }
1344 
1346 
1347  /* all receive threads are part of packet processing threads */
1348  tv = tv_root[TVT_PPT];
1349  while (tv) {
1350  if (ThreadStillHasPackets(tv)) {
1351  /* we wait till we dry out all the inq packets, before we
1352  * kill this thread. Do note that you should have disabled
1353  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1355 
1356  /* sleep outside lock */
1357  SleepMsec(1);
1358  goto again;
1359  }
1360  if (ThreadBusy(tv)) {
1362 
1363  Packet *p = PacketGetFromAlloc();
1364  if (p != NULL) {
1367  PacketQueue *q = tv->stream_pq;
1368  SCMutexLock(&q->mutex_q);
1369  PacketEnqueue(q, p);
1370  SCCondSignal(&q->cond_q);
1371  SCMutexUnlock(&q->mutex_q);
1372  }
1373 
1374  /* don't sleep while holding a lock */
1375  SleepMsec(1);
1376  goto again;
1377  }
1378  tv = tv->next;
1379  }
1380 
1382 }
1383 
1384 /**
1385  * \brief Disable all threads having the specified TMs.
1386  *
1387  * Breaks out of the packet acquisition loop, and bumps
1388  * into the 'flow loop', where it will process packets
1389  * from the flow engine's shutdown handling.
1390  */
1392 {
1393  ThreadVars *tv = NULL;
1394  struct timeval start_ts;
1395  struct timeval cur_ts;
1396  gettimeofday(&start_ts, NULL);
1397 
1398 again:
1399  gettimeofday(&cur_ts, NULL);
1400  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1401  FatalError("Engine unable to disable detect "
1402  "thread - \"%s\". Killing engine",
1403  tv->name);
1404  }
1405 
1407 
1408  /* all receive threads are part of packet processing threads */
1409  tv = tv_root[TVT_PPT];
1410 
1411  /* we do have to keep in mind that TVs are arranged in the order
1412  * right from receive to log. The moment we fail to find a
1413  * receive TM amongst the slots in a tv, it indicates we are done
1414  * with all receive threads */
1415  while (tv) {
1416  int disable = 0;
1417  TmModule *tm = NULL;
1418  /* obtain the slots for this TV */
1419  TmSlot *slots = tv->tm_slots;
1420  while (slots != NULL) {
1421  tm = TmModuleGetById(slots->tm_id);
1422 
1423  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1424  disable = 1;
1425  break;
1426  }
1427 
1428  slots = slots->slot_next;
1429  continue;
1430  }
1431 
1432  if (disable) {
1433  if (ThreadStillHasPackets(tv)) {
1434  /* we wait till we dry out all the inq packets, before we
1435  * kill this thread. Do note that you should have disabled
1436  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1438  /* don't sleep while holding a lock */
1439  SleepMsec(1);
1440  goto again;
1441  }
1442 
1443  if (ThreadBusy(tv)) {
1445 
1446  Packet *p = PacketGetFromAlloc();
1447  if (p != NULL) {
1450  PacketQueue *q = tv->stream_pq;
1451  SCMutexLock(&q->mutex_q);
1452  PacketEnqueue(q, p);
1453  SCCondSignal(&q->cond_q);
1454  SCMutexUnlock(&q->mutex_q);
1455  }
1456 
1457  /* don't sleep while holding a lock */
1458  SleepMsec(1);
1459  goto again;
1460  }
1461 
1462  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1463  if (tm && tm->PktAcqBreakLoop != NULL) {
1464  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1465  }
1467 
1468  if (tv->inq != NULL) {
1469  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1470  SCMutexLock(&tv->inq->pq->mutex_q);
1471  SCCondSignal(&tv->inq->pq->cond_q);
1472  SCMutexUnlock(&tv->inq->pq->mutex_q);
1473  }
1474  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1475  }
1476 
1477  /* wait for it to enter the 'flow loop' stage */
1478  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1480 
1481  SleepMsec(1);
1482  goto again;
1483  }
1484  }
1485 
1486  tv = tv->next;
1487  }
1488 
1490 
1491  /* finally wait for all packet threads to have
1492  * processed all of their 'live' packets so we
1493  * don't process the last live packets together
1494  * with FFR packets */
1495  TmThreadDrainPacketThreads();
1496 }
1497 
1498 #ifdef DEBUG_VALIDATION
1499 static void TmThreadDumpThreads(void);
1500 #endif
1501 
1502 static void TmThreadDebugValidateNoMorePackets(void)
1503 {
1504 #ifdef DEBUG_VALIDATION
1506  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1507  if (ThreadStillHasPackets(tv)) {
1509  TmThreadDumpThreads();
1510  abort();
1511  }
1512  }
1514 #endif
1515 }
1516 
1517 /**
1518  * \brief Disable all packet threads
1519  */
1521 {
1522  struct timeval start_ts;
1523  struct timeval cur_ts;
1524 
1525  /* first drain all packet threads of their packets */
1526  TmThreadDrainPacketThreads();
1527 
1528  /* since all the threads possibly able to produce more packets
1529  * are now gone or inactive, we should see no packets anywhere
1530  * anymore. */
1531  TmThreadDebugValidateNoMorePackets();
1532 
1533  gettimeofday(&start_ts, NULL);
1534 again:
1535  gettimeofday(&cur_ts, NULL);
1536  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1537  FatalError("Engine unable to disable packet "
1538  "threads. Killing engine");
1539  }
1540 
1541  /* loop through the packet threads and kill them */
1543  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1545 
1546  /* separate worker threads (autofp) will still wait at their
1547  * input queues. So nudge them here so they will observe the
1548  * THV_KILL flag. */
1549  if (tv->inq != NULL) {
1550  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1551  SCMutexLock(&tv->inq->pq->mutex_q);
1552  SCCondSignal(&tv->inq->pq->cond_q);
1553  SCMutexUnlock(&tv->inq->pq->mutex_q);
1554  }
1555  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1556  }
1557 
1560 
1561  SleepMsec(1);
1562  goto again;
1563  }
1564  }
1566 }
1567 
1568 #define MIN_WAIT_TIME 100
1569 #define MAX_WAIT_TIME 999999
1571 {
1572  ThreadVars *tv = NULL;
1573  unsigned int sleep_usec = MIN_WAIT_TIME;
1574 
1575  BUG_ON((family < 0) || (family >= TVT_MAX));
1576 
1577 again:
1579  tv = tv_root[family];
1580 
1581  while (tv) {
1582  int r = TmThreadKillThread(tv);
1583  if (r == 0) {
1585  SleepUsec(sleep_usec);
1586  sleep_usec *= 2; /* slowly back off */
1587  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1588  goto again;
1589  }
1590  sleep_usec = MIN_WAIT_TIME; /* reset */
1591 
1592  tv = tv->next;
1593  }
1595 }
1596 #undef MIN_WAIT_TIME
1597 #undef MAX_WAIT_TIME
1598 
1600 {
1601  int i = 0;
1602 
1603  for (i = 0; i < TVT_MAX; i++) {
1605  }
1606 }
1607 
1608 static void TmThreadFree(ThreadVars *tv)
1609 {
1610  TmSlot *s;
1611  TmSlot *ps;
1612  if (tv == NULL)
1613  return;
1614 
1615  SCLogDebug("Freeing thread '%s'.", tv->name);
1616 
1618 
1619  if (tv->flow_queue) {
1620  BUG_ON(tv->flow_queue->qlen != 0);
1621  SCFree(tv->flow_queue);
1622  }
1623 
1625 
1626  TmThreadDeinitMC(tv);
1627 
1628  if (tv->thread_group_name) {
1630  }
1631 
1632  if (tv->printable_name) {
1634  }
1635 
1636  if (tv->stream_pq_local) {
1640  }
1641 
1642  s = (TmSlot *)tv->tm_slots;
1643  while (s) {
1644  ps = s;
1645  s = s->slot_next;
1646  SCFree(ps);
1647  }
1648 
1650  SCFree(tv);
1651 }
1652 
1654 {
1655  char *thread_group_name = NULL;
1656 
1657  if (name == NULL)
1658  return;
1659 
1660  if (tv == NULL)
1661  return;
1662 
1663  thread_group_name = SCStrdup(name);
1664  if (unlikely(thread_group_name == NULL)) {
1665  SCLogError("error allocating memory");
1666  return;
1667  }
1668  tv->thread_group_name = thread_group_name;
1669 }
1670 
1672 {
1673  ThreadVars *tv = NULL;
1674  ThreadVars *ptv = NULL;
1675 
1676  if ((family < 0) || (family >= TVT_MAX))
1677  return;
1678 
1680  tv = tv_root[family];
1681 
1682  while (tv) {
1683  ptv = tv;
1684  tv = tv->next;
1685  TmThreadFree(ptv);
1686  }
1687  tv_root[family] = NULL;
1689 }
1690 
1691 /**
1692  * \brief Spawns a thread associated with the ThreadVars instance tv
1693  *
1694  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1695  */
1697 {
1698  pthread_attr_t attr;
1699  if (tv->tm_func == NULL) {
1700  FatalError("No thread function set");
1701  }
1702 
1703  /* Initialize and set thread detached attribute */
1704  pthread_attr_init(&attr);
1705 
1706  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1707 
1708  /* Adjust thread stack size if configured */
1710  SCLogDebug("Setting per-thread stack size to %" PRIu64, threading_set_stack_size);
1711  if (pthread_attr_setstacksize(&attr, (size_t)threading_set_stack_size)) {
1712  FatalError("Unable to increase stack size to %" PRIu64 " in thread attributes",
1714  }
1715  }
1716 
1717  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1718  if (rc) {
1719  FatalError("Unable to create thread %s with pthread_create(): retval %d: %s", tv->name, rc,
1720  strerror(errno));
1721  }
1722 
1723 #if DEBUG && HAVE_PTHREAD_GETATTR_NP
1725  if (pthread_getattr_np(tv->t, &attr) == 0) {
1726  size_t stack_size;
1727  void *stack_addr;
1728  pthread_attr_getstack(&attr, &stack_addr, &stack_size);
1729  SCLogDebug("stack: %p; size %" PRIu64, stack_addr, (uintmax_t)stack_size);
1730  } else {
1731  SCLogDebug("Unable to retrieve current stack-size for display; return code from "
1732  "pthread_getattr_np() is %" PRId32,
1733  rc);
1734  }
1735  }
1736 #endif
1737 
1739 
1740  TmThreadAppend(tv, tv->type);
1741  return TM_ECODE_OK;
1742 }
1743 
1744 /**
1745  * \brief Spawns a "fake" lib thread associated with the ThreadVars instance tv
1746  *
1747  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1748  */
1750 {
1751  if (tv->tm_func == NULL) {
1752  printf("ERROR: no thread function set\n");
1753  return TM_ECODE_FAILED;
1754  }
1755 
1756  if (tv->tm_func((void *)tv) == (void *)-1) {
1757  return TM_ECODE_FAILED;
1758  }
1759 
1761 
1762  return TM_ECODE_OK;
1763 }
1764 
1765 /**
1766  * \brief Initializes the mutex and condition variables for this TV
1767  *
1768  * It can be used by a thread to control a wait loop that can also be
1769  * influenced by other threads.
1770  *
1771  * \param tv Pointer to a TV instance
1772  */
1774 {
1775  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1776  FatalError("Fatal error encountered in TmThreadInitMC. "
1777  "Exiting...");
1778  }
1779 
1780  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1781  printf("Error initializing the tv->m mutex\n");
1782  exit(EXIT_FAILURE);
1783  }
1784 
1785  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1786  FatalError("Fatal error encountered in TmThreadInitMC. "
1787  "Exiting...");
1788  }
1789 
1790  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1791  FatalError("Error initializing the tv->cond condition "
1792  "variable");
1793  }
1794 }
1795 
1796 static void TmThreadDeinitMC(ThreadVars *tv)
1797 {
1798  if (tv->ctrl_mutex) {
1800  SCFree(tv->ctrl_mutex);
1801  }
1802  if (tv->ctrl_cond) {
1804  SCFree(tv->ctrl_cond);
1805  }
1806 }
1807 
1808 /**
1809  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1810  * the kill flag has been set or not on the thread.
1811  *
1812  * \param tv Pointer to the TV instance.
1813  */
1815 {
1816  while (!TmThreadsCheckFlag(tv, flags)) {
1817  SleepUsec(100);
1818  }
1819 }
1820 
1821 /**
1822  * \brief Unpauses a thread
1823  *
1824  * \param tv Pointer to a TV instance that has to be unpaused
1825  */
1827 {
1829 }
1830 
1831 static TmEcode WaitOnThreadsRunningByType(const int t)
1832 {
1833  struct timeval start_ts;
1834  struct timeval cur_ts;
1835  uint32_t thread_cnt = 0;
1836 
1837  /* on retries, this will init to the last thread that started up already */
1838  ThreadVars *tv_start = tv_root[t];
1840  for (ThreadVars *tv = tv_start; tv != NULL; tv = tv->next) {
1841  thread_cnt++;
1842  }
1844 
1845  /* give threads a second each to start up, plus a margin of a minute. */
1846  uint32_t time_budget = 60 + thread_cnt;
1847 
1848  gettimeofday(&start_ts, NULL);
1849 again:
1851  ThreadVars *tv = tv_start;
1852  while (tv != NULL) {
1855 
1856  SCLogError("thread \"%s\" failed to "
1857  "start: flags %04x",
1858  tv->name, SC_ATOMIC_GET(tv->flags));
1859  return TM_ECODE_FAILED;
1860  }
1861 
1864 
1865  /* 60 seconds provided for the thread to transition from
1866  * THV_INIT_DONE to THV_RUNNING */
1867  gettimeofday(&cur_ts, NULL);
1868  if (((uint32_t)cur_ts.tv_sec - (uint32_t)start_ts.tv_sec) > time_budget) {
1869  SCLogError("thread \"%s\" failed to "
1870  "start in time: flags %04x. Total threads: %u. Time budget %us",
1871  tv->name, SC_ATOMIC_GET(tv->flags), thread_cnt, time_budget);
1872  return TM_ECODE_FAILED;
1873  }
1874 
1875  /* sleep a little to give the thread some
1876  * time to start running */
1877  SleepUsec(100);
1878  goto again;
1879  }
1880  tv_start = tv;
1881 
1882  tv = tv->next;
1883  }
1885  return TM_ECODE_OK;
1886 }
1887 
1888 /**
1889  * \brief Waits for all threads to be in a running state
1890  *
1891  * \retval TM_ECODE_OK if all are running or error if a thread failed
1892  */
1894 {
1895  uint16_t RX_num = 0;
1896  uint16_t W_num = 0;
1897  uint16_t FM_num = 0;
1898  uint16_t FR_num = 0;
1899  uint16_t TX_num = 0;
1900 
1901  for (int i = 0; i < TVT_MAX; i++) {
1902  if (WaitOnThreadsRunningByType(i) != TM_ECODE_OK)
1903  return TM_ECODE_FAILED;
1904  }
1905 
1907  for (int i = 0; i < TVT_MAX; i++) {
1908  for (ThreadVars *tv = tv_root[i]; tv != NULL; tv = tv->next) {
1909  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1910  RX_num++;
1911  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1912  W_num++;
1913  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1914  TX_num++;
1915  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1916  FM_num++;
1917  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1918  FR_num++;
1919  }
1920  }
1922 
1923  /* Construct a welcome string displaying
1924  * initialized thread types and counts */
1925  uint16_t app_len = 32;
1926  uint16_t buf_len = 256;
1927 
1928  char append_str[app_len];
1929  char thread_counts[buf_len];
1930 
1931  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1932  if (RX_num > 0) {
1933  snprintf(append_str, app_len, "RX: %u ", RX_num);
1934  strlcat(thread_counts, append_str, buf_len);
1935  }
1936  if (W_num > 0) {
1937  snprintf(append_str, app_len, "W: %u ", W_num);
1938  strlcat(thread_counts, append_str, buf_len);
1939  }
1940  if (TX_num > 0) {
1941  snprintf(append_str, app_len, "TX: %u ", TX_num);
1942  strlcat(thread_counts, append_str, buf_len);
1943  }
1944  if (FM_num > 0) {
1945  snprintf(append_str, app_len, "FM: %u ", FM_num);
1946  strlcat(thread_counts, append_str, buf_len);
1947  }
1948  if (FR_num > 0) {
1949  snprintf(append_str, app_len, "FR: %u ", FR_num);
1950  strlcat(thread_counts, append_str, buf_len);
1951  }
1952  snprintf(append_str, app_len, " Engine started.");
1953  strlcat(thread_counts, append_str, buf_len);
1954  SCLogNotice("%s", thread_counts);
1955 
1956  return TM_ECODE_OK;
1957 }
1958 
1959 /**
1960  * \brief Unpauses all threads present in tv_root
1961  */
1963 {
1965  for (int i = 0; i < TVT_MAX; i++) {
1966  ThreadVars *tv = tv_root[i];
1967  while (tv != NULL) {
1969  tv = tv->next;
1970  }
1971  }
1973 }
1974 
1975 /**
1976  * \brief Used to check the thread for certain conditions of failure.
1977  */
1979 {
1981  for (int i = 0; i < TVT_MAX; i++) {
1982  ThreadVars *tv = tv_root[i];
1983  while (tv) {
1985  FatalError("thread %s failed", tv->name);
1986  }
1987  tv = tv->next;
1988  }
1989  }
1991 }
1992 
1993 /**
1994  * \brief Used to check if all threads have finished their initialization. On
1995  * finding an un-initialized thread, it waits till that thread completes
1996  * its initialization, before proceeding to the next thread.
1997  *
1998  * \retval TM_ECODE_OK all initialized properly
1999  * \retval TM_ECODE_FAILED failure
2000  */
2002 {
2003  struct timeval start_ts;
2004  struct timeval cur_ts;
2005  gettimeofday(&start_ts, NULL);
2006 
2007 again:
2009  for (int i = 0; i < TVT_MAX; i++) {
2010  ThreadVars *tv = tv_root[i];
2011  while (tv != NULL) {
2014 
2015  SCLogError("thread \"%s\" failed to "
2016  "initialize: flags %04x",
2017  tv->name, SC_ATOMIC_GET(tv->flags));
2018  return TM_ECODE_FAILED;
2019  }
2020 
2021  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
2023 
2024  gettimeofday(&cur_ts, NULL);
2025  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
2026  SCLogError("thread \"%s\" failed to "
2027  "initialize in time: flags %04x",
2028  tv->name, SC_ATOMIC_GET(tv->flags));
2029  return TM_ECODE_FAILED;
2030  }
2031 
2032  /* sleep a little to give the thread some
2033  * time to finish initialization */
2034  SleepUsec(100);
2035  goto again;
2036  }
2037 
2040  SCLogError("thread \"%s\" failed to "
2041  "initialize.",
2042  tv->name);
2043  return TM_ECODE_FAILED;
2044  }
2047  SCLogError("thread \"%s\" closed on "
2048  "initialization.",
2049  tv->name);
2050  return TM_ECODE_FAILED;
2051  }
2052 
2053  tv = tv->next;
2054  }
2055  }
2057 
2058  return TM_ECODE_OK;
2059 }
2060 
2061 /**
2062  * \brief returns a count of all the threads that match the flag
2063  */
2065 {
2066  uint32_t cnt = 0;
2068  for (int i = 0; i < TVT_MAX; i++) {
2069  ThreadVars *tv = tv_root[i];
2070  while (tv != NULL) {
2071  if ((tv->tmm_flags & flags) == flags)
2072  cnt++;
2073 
2074  tv = tv->next;
2075  }
2076  }
2078  return cnt;
2079 }
2080 
2081 #ifdef DEBUG_VALIDATION
2082 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2083 {
2084  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2086  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2087  tv, s, s->tm_id, m->name);
2088  }
2089 }
2090 
2091 static void TmThreadDumpThreads(void)
2092 {
2094  for (int i = 0; i < TVT_MAX; i++) {
2095  ThreadVars *tv = tv_root[i];
2096  while (tv != NULL) {
2097  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2098  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2099  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2100  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2101  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2102  } else if (tv->stream_pq_local != NULL) {
2103  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2104  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2105  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2106  }
2107  }
2108  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2109  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2110  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2111  }
2112  TmThreadDoDumpSlots(tv);
2113  tv = tv->next;
2114  }
2115  }
2118 }
2119 #endif
2120 
2121 /* Aligned to CLS to avoid false sharing between atomic ops. */
2122 typedef struct Thread_ {
2123  ThreadVars *tv; /**< threadvars structure */
2124  const char *name;
2125  int type;
2126  int in_use; /**< bool to indicate this is in use */
2127 
2128  SC_ATOMIC_DECLARE(SCTime_t, pktts); /**< current packet time of this thread
2129  * (offline mode) */
2130  SCTime_t sys_sec_stamp; /**< timestamp in real system
2131  * time when the pktts was last updated. */
2133 } __attribute__((aligned(CLS))) Thread;
2135 typedef struct Threads_ {
2136  Thread *threads;
2137  size_t threads_size;
2138  int threads_cnt;
2140 
2141 static bool thread_store_sealed = false;
2142 static Threads thread_store = { NULL, 0, 0 };
2143 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2144 
2146 {
2147  SCMutexLock(&thread_store_lock);
2148  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2149  thread_store_sealed = true;
2150  SCMutexUnlock(&thread_store_lock);
2151 }
2152 
2154 {
2155  SCMutexLock(&thread_store_lock);
2156  DEBUG_VALIDATE_BUG_ON(!thread_store_sealed);
2157  thread_store_sealed = false;
2158  SCMutexUnlock(&thread_store_lock);
2159 }
2160 
2162 {
2163  SCMutexLock(&thread_store_lock);
2164  for (size_t s = 0; s < thread_store.threads_size; s++) {
2165  Thread *t = &thread_store.threads[s];
2166  if (t == NULL || t->in_use == 0)
2167  continue;
2168 
2169  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2170  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2171  if (t->tv) {
2172  ThreadVars *tv = t->tv;
2173  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2174  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2175  tv, tv->type, tv->name, tv->tmm_flags, flags);
2176  }
2177  }
2178  SCMutexUnlock(&thread_store_lock);
2179 }
2180 
2181 #define STEP 32
2182 /**
2183  * \retval id thread id, or 0 if not found
2184  */
2186 {
2187  SCMutexLock(&thread_store_lock);
2188  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2189  if (thread_store.threads == NULL) {
2190  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2191  BUG_ON(thread_store.threads == NULL);
2192  thread_store.threads_size = STEP;
2193  }
2194 
2195  size_t s;
2196  for (s = 0; s < thread_store.threads_size; s++) {
2197  if (thread_store.threads[s].in_use == 0) {
2198  Thread *t = &thread_store.threads[s];
2199  SCSpinInit(&t->spin, 0);
2200  SCSpinLock(&t->spin);
2201  t->name = tv->name;
2202  t->type = type;
2203  t->tv = tv;
2204  t->in_use = 1;
2205  SCSpinUnlock(&t->spin);
2206 
2207  SCMutexUnlock(&thread_store_lock);
2208  return (int)(s+1);
2209  }
2210  }
2211 
2212  /* if we get here the array is completely filled */
2213  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2214  BUG_ON(newmem == NULL);
2215  thread_store.threads = newmem;
2216  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2217 
2218  Thread *t = &thread_store.threads[thread_store.threads_size];
2219  SCSpinInit(&t->spin, 0);
2220  SCSpinLock(&t->spin);
2221  t->name = tv->name;
2222  t->type = type;
2223  t->tv = tv;
2224  t->in_use = 1;
2225  SCSpinUnlock(&t->spin);
2226 
2227  s = thread_store.threads_size;
2228  thread_store.threads_size += STEP;
2229 
2230  SCMutexUnlock(&thread_store_lock);
2231  return (int)(s+1);
2232 }
2233 #undef STEP
2234 
2235 void TmThreadsUnregisterThread(const int id)
2236 {
2237  SCMutexLock(&thread_store_lock);
2238  DEBUG_VALIDATE_BUG_ON(thread_store_sealed);
2239  if (id <= 0 || id > (int)thread_store.threads_size) {
2240  SCMutexUnlock(&thread_store_lock);
2241  return;
2242  }
2243 
2244  /* id is one higher than index */
2245  int idx = id - 1;
2246 
2247  /* reset thread_id, which serves as clearing the record */
2248  thread_store.threads[idx].in_use = 0;
2249 
2250  /* check if we have at least one registered thread left */
2251  size_t s;
2252  for (s = 0; s < thread_store.threads_size; s++) {
2253  Thread *t = &thread_store.threads[s];
2254  if (t->in_use == 1) {
2255  goto end;
2256  }
2257  }
2258 
2259  /* if we get here no threads are registered */
2260  SCFree(thread_store.threads);
2261  thread_store.threads = NULL;
2262  thread_store.threads_size = 0;
2263  thread_store.threads_cnt = 0;
2264 
2265 end:
2266  SCMutexUnlock(&thread_store_lock);
2267 }
2268 
2269 void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
2270 {
2271  SCTime_t now = SCTimeGetTime();
2272  int idx = id - 1;
2273  Thread *t = &thread_store.threads[idx];
2274  SCSpinLock(&t->spin);
2275  SC_ATOMIC_SET(t->pktts, ts);
2276 
2277 #ifdef DEBUG
2278  if (t->sys_sec_stamp.secs != 0) {
2279  SCTime_t tmpts = SCTIME_ADD_SECS(t->sys_sec_stamp, 3);
2280  if (SCTIME_CMP_LT(tmpts, now)) {
2281  SCLogDebug("%s: thread slept for %u secs", t->name, (uint32_t)(now.secs - tmpts.secs));
2282  }
2283  }
2284 #endif
2285 
2286  t->sys_sec_stamp = now;
2287  SCSpinUnlock(&t->spin);
2288 }
2289 
2291 {
2292  static SCTime_t nullts = SCTIME_INITIALIZER;
2293  bool ready = true;
2294  for (size_t s = 0; s < thread_store.threads_size; s++) {
2295  Thread *t = &thread_store.threads[s];
2296  if (!t->in_use) {
2297  break;
2298  }
2299  SCSpinLock(&t->spin);
2300  if (t->type != TVT_PPT) {
2301  SCSpinUnlock(&t->spin);
2302  continue;
2303  }
2304  if (SCTIME_CMP_EQ(t->sys_sec_stamp, nullts)) {
2305  ready = false;
2306  SCSpinUnlock(&t->spin);
2307  break;
2308  }
2309  SCSpinUnlock(&t->spin);
2310  }
2311  return ready;
2312 }
2313 
2315 {
2316  SCTime_t now = SCTimeGetTime();
2317  for (size_t s = 0; s < thread_store.threads_size; s++) {
2318  Thread *t = &thread_store.threads[s];
2319  if (!t->in_use) {
2320  break;
2321  }
2322  SCSpinLock(&t->spin);
2323  if (t->type != TVT_PPT) {
2324  SCSpinUnlock(&t->spin);
2325  continue;
2326  }
2327  SC_ATOMIC_SET(t->pktts, ts);
2328  t->sys_sec_stamp = now;
2329  SCSpinUnlock(&t->spin);
2330  }
2331 }
2332 
2334 {
2335  BUG_ON(idx == 0);
2336  const int i = idx - 1;
2337  Thread *t = &thread_store.threads[i];
2338  return SC_ATOMIC_GET(t->pktts);
2339 }
2340 
2341 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2342 {
2343  struct timeval local = { 0 };
2344  static SCTime_t nullts = SCTIME_INITIALIZER;
2345  bool set = false;
2346  SCTime_t now = SCTimeGetTime();
2347 
2348  for (size_t s = 0; s < thread_store.threads_size; s++) {
2349  Thread *t = &thread_store.threads[s];
2350  if (t->in_use == 0) {
2351  break;
2352  }
2353  SCSpinLock(&t->spin);
2354  /* only packet threads set timestamps based on packets */
2355  if (t->type != TVT_PPT) {
2356  SCSpinUnlock(&t->spin);
2357  continue;
2358  }
2359  SCTime_t pktts = SC_ATOMIC_GET(t->pktts);
2360  if (SCTIME_CMP_NEQ(pktts, nullts)) {
2361  SCTime_t sys_sec_stamp = SCTIME_ADD_SECS(t->sys_sec_stamp, 5);
2362  /* ignore sleeping threads */
2363  if (SCTIME_CMP_LT(sys_sec_stamp, now)) {
2364  SCSpinUnlock(&t->spin);
2365  continue;
2366  }
2367  if (!set) {
2368  SCTIME_TO_TIMEVAL(&local, pktts);
2369  set = true;
2370  } else {
2371  if (SCTIME_CMP_LT(pktts, SCTIME_FROM_TIMEVAL(&local))) {
2372  SCTIME_TO_TIMEVAL(&local, pktts);
2373  }
2374  }
2375  }
2376  SCSpinUnlock(&t->spin);
2377  }
2378  *ts = local;
2379  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2380 }
2381 
2383 {
2384  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2385  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2386  /* always create at least one thread */
2387  if (thread_max == 0)
2388  thread_max = ncpus * threading_detect_ratio;
2389  if (thread_max < 1)
2390  thread_max = 1;
2391  if (thread_max > 1024) {
2392  SCLogWarning("limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2393  thread_max = 1024;
2394  }
2395  return (uint16_t)thread_max;
2396 }
2397 
2398 /** \brief inject a flow into a threads flow queue
2399  */
2400 void TmThreadsInjectFlowById(Flow *f, const int id)
2401 {
2402  if (id > 0 && id <= (int)thread_store.threads_size) {
2403  int idx = id - 1;
2404  Thread *t = &thread_store.threads[idx];
2405  ThreadVars *tv = t->tv;
2406  if (tv != NULL && tv->flow_queue != NULL) {
2407  FlowEnqueue(tv->flow_queue, f);
2408 
2409  /* wake up listening thread(s) if necessary */
2410  if (tv->inq != NULL) {
2411  SCMutexLock(&tv->inq->pq->mutex_q);
2412  SCCondSignal(&tv->inq->pq->cond_q);
2413  SCMutexUnlock(&tv->inq->pq->mutex_q);
2414  } else if (tv->break_loop) {
2415  TmThreadsCaptureBreakLoop(tv);
2416  }
2417  return;
2418  }
2419  }
2420  BUG_ON(1);
2421 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:68
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:843
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:73
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:76
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:70
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:135
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2290
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1773
tm-threads.h
TmThreadsSealThreads
void TmThreadsSealThreads(void)
Definition: tm-threads.c:2145
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:70
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:55
TmThreadLibSpawn
TmEcode TmThreadLibSpawn(ThreadVars *tv)
Spawns a "fake" lib thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1749
THV_USE
#define THV_USE
Definition: threadvars.h:36
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:133
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1696
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1131
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
threading_set_cpu_affinity
bool threading_set_cpu_affinity
Definition: runmodes.c:62
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:884
SCTIME_CMP_NEQ
#define SCTIME_CMP_NEQ(a, b)
Definition: util-time.h:107
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:70
Thread_::SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(SCTime_t, pktts)
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:314
TmThreadContinueThreads
void TmThreadContinueThreads(void)
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1962
CLS
#define CLS
Definition: suricata-common.h:61
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1074
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:55
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:379
TmThreadSetGroupName
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1653
CaptureStatsSetup
void CaptureStatsSetup(ThreadVars *tv)
Definition: decode.c:997
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:105
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:48
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
PKT_SRC_SHUTDOWN_FLUSH
@ PKT_SRC_SHUTDOWN_FLUSH
Definition: decode.h:64
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:101
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:88
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1814
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:175
thread-callbacks.h
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:49
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:56
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:45
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:83
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:62
Packet_::flags
uint32_t flags
Definition: decode.h:527
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:75
threads.h
Threads
Threads
Definition: tm-threads.c:2139
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:356
TmThreadsUnsealThreads
void TmThreadsUnsealThreads(void)
Definition: tm-threads.c:2153
ThreadVars_::t
pthread_t t
Definition: threadvars.h:59
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:303
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:71
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:61
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2064
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:104
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:66
StatsSetupPrivate
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1205
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:53
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:117
MIN
#define MIN(x, y)
Definition: suricata-common.h:400
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:82
SCSpinLock
#define SCSpinLock
Definition: threads-debug.h:235
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:74
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1391
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:387
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:121
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:795
SCTIME_CMP_EQ
#define SCTIME_CMP_EQ(a, b)
Definition: util-time.h:108
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:89
m
SCMutex m
Definition: flow-hash.h:6
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:44
TmModule_::ThreadBusy
bool(* ThreadBusy)(ThreadVars *tv, void *thread_data)
Definition: tm-modules.h:63
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:143
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:314
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:82
rww_lock_contention
thread_local uint64_t rww_lock_contention
PacketQueueNoLock_
simple fifo queue for packets
Definition: packet-queue.h:34
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:173
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2181
Thread_::in_use
int in_use
Definition: tm-threads.c:2126
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:38
TmThreadsGetThreadTime
SCTime_t TmThreadsGetThreadTime(const int idx)
Definition: tm-threads.c:2333
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(void)
Disable all packet threads.
Definition: tm-threads.c:1520
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:89
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:244
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:81
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2400
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:81
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2341
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmThreadsProcessDecodePseudoPackets
TmEcode TmThreadsProcessDecodePseudoPackets(ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot)
Definition: tm-threads.c:114
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:56
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:46
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:46
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1301
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:109
util-signal.h
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:79
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:806
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1568
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2123
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1826
AffinityGetNextCPU
uint16_t AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
Definition: util-affinity.c:279
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:90
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:57
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:2001
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
tv
ThreadVars * tv
Definition: tm-threads.c:2134
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:77
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:96
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:208
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:58
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
threading_set_stack_size
uint64_t threading_set_stack_size
Definition: runmodes.c:63
ThreadVars_::perf_public_ctx
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:128
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1244
SCTime_t::secs
uint64_t secs
Definition: util-time.h:41
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts)
Definition: tm-threads.c:2269
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
util-affinity.h
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
SCTIME_FROM_TIMEVAL
#define SCTIME_FROM_TIMEVAL(tv)
Definition: util-time.h:79
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:65
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1671
THV_KILL
#define THV_KILL
Definition: threadvars.h:40
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:943
FlowQueueNew
FlowQueue * FlowQueueNew(void)
Definition: flow-queue.c:35
SCSpinUnlock
#define SCSpinUnlock
Definition: threads-debug.h:237
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:826
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2235
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2185
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
thread-storage.h
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
__attribute__
struct Thread_ __attribute__((aligned(CLS)))
Definition: tm-threads.c:2133
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:125
ThreadVars_::id
int id
Definition: threadvars.h:87
ThreadVars_::type
uint8_t type
Definition: threadvars.h:72
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:309
SCTIME_TO_TIMEVAL
#define SCTIME_TO_TIMEVAL(tv, t)
Definition: util-time.h:97
TmThreadTimeoutLoop
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
Definition: tm-threads.c:166
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:88
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:85
SCReturn
#define SCReturn
Definition: util-debug.h:273
stream.h
SCCtrlMutexLock
#define SCCtrlMutexLock(mut)
Definition: threads-debug.h:376
Thread_::sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2130
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1599
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:139
Packet_
Definition: decode.h:484
TM_FLAG_DECODE_TM
#define TM_FLAG_DECODE_TM
Definition: tm-modules.h:33
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:133
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:91
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:56
ThreadVars_::thread_group_name
char * thread_group_name
Definition: threadvars.h:67
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:69
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1569
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:74
TmSlot_
Definition: tm-threads.h:53
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:67
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:69
SCTime_t
Definition: util-time.h:40
TmEcode
TmEcode
Definition: tm-threads-common.h:80
name
const char * name
Definition: tm-threads.c:2135
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:90
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
runmodes.h
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:57
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1103
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:32
TmModule_
Definition: tm-modules.h:43
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:255
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const SCTime_t ts)
Definition: tm-threads.c:2314
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:116
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
SCTIME_CMP_LT
#define SCTIME_CMP_LT(a, b)
Definition: util-time.h:105
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:39
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:37
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:679
StatsPublicThreadContext_::m
SCMutex m
Definition: counters.h:75
SCCtrlMutexUnlock
#define SCCtrlMutexUnlock(mut)
Definition: threads-debug.h:378
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1570
Thread_::type
int type
Definition: tm-threads.c:2125
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:90
Packet_::flow
struct Flow_ * flow
Definition: decode.h:529
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1190
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:75
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:54
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:852
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:35
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:34
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:106
TmThreadsWaitForUnpause
bool TmThreadsWaitForUnpause(ThreadVars *tv)
Wait for a thread to become unpaused.
Definition: tm-threads.c:363
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:41
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:136
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:101
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:868
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:230
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:52
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:72
sys_sec_stamp
SCTime_t sys_sec_stamp
Definition: tm-threads.c:2141
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:77
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
FatalError
#define FatalError(...)
Definition: util-debug.h:502
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax(void)
Definition: tm-threads.c:2382
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:66
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1164
StatsSyncCounters
void StatsSyncCounters(ThreadVars *tv)
Definition: counters.c:445
ThreadStorageSize
unsigned int ThreadStorageSize(void)
Definition: thread-storage.c:25
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:144
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:216
threadvars.h
util-validate.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:232
SCSpinInit
#define SCSpinInit
Definition: threads-debug.h:238
SCThreadRunInitCallbacks
void SCThreadRunInitCallbacks(ThreadVars *tv)
Definition: thread-callbacks.c:48
ThreadsAffinityType_
Definition: util-affinity.h:65
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:37
THV_KILL_PKTACQ
#define THV_KILL_PKTACQ
Definition: threadvars.h:47
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:618
TmSlot_::tm_flags
uint8_t tm_flags
Definition: tm-threads.h:67
Thread_::name
const char * name
Definition: tm-threads.c:2124
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:131
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:50
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:91
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:74
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:69
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:73
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:317
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:84
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:112
SCSpinlock
#define SCSpinlock
Definition: threads-debug.h:234
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:88
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:89
suricata.h
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:463
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:142
Tmq_
Definition: tm-queues.h:29
Thread_::spin
SCSpinlock spin
Definition: tm-threads.c:2132
TmThreadWaitOnThreadRunning
TmEcode TmThreadWaitOnThreadRunning(void)
Waits for all threads to be in a running state.
Definition: tm-threads.c:1893
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2161
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1978
type
int type
Definition: tm-threads.c:2136
ThreadFreeStorage
void ThreadFreeStorage(ThreadVars *tv)
Definition: thread-storage.c:50
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:132
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:83
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:69
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:108
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:274
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:93
SCTIME_INITIALIZER
#define SCTIME_INITIALIZER
Definition: util-time.h:51
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
SCTIME_ADD_SECS
#define SCTIME_ADD_SECS(ts, s)
Definition: util-time.h:64
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:42
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2122
SCTmThreadsSlotPktAcqLoopFinish
bool SCTmThreadsSlotPktAcqLoopFinish(ThreadVars *tv)
Definition: tm-threads.c:273
SCReturnInt
#define SCReturnInt(x)
Definition: util-debug.h:275
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
StatsThreadCleanup
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1304
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:63
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:267
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:102
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:76
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:939
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:359
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:171
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:350