suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2020 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-debug.h"
39 #include "util-privs.h"
40 #include "util-cpu.h"
41 #include "util-optimize.h"
42 #include "util-profiling.h"
43 #include "util-signal.h"
44 #include "queue.h"
45 
46 #ifdef PROFILE_LOCKING
47 thread_local uint64_t mutex_lock_contention;
48 thread_local uint64_t mutex_lock_wait_ticks;
49 thread_local uint64_t mutex_lock_cnt;
50 
51 thread_local uint64_t spin_lock_contention;
52 thread_local uint64_t spin_lock_wait_ticks;
53 thread_local uint64_t spin_lock_cnt;
54 
55 thread_local uint64_t rww_lock_contention;
56 thread_local uint64_t rww_lock_wait_ticks;
57 thread_local uint64_t rww_lock_cnt;
58 
59 thread_local uint64_t rwr_lock_contention;
60 thread_local uint64_t rwr_lock_wait_ticks;
61 thread_local uint64_t rwr_lock_cnt;
62 #endif
63 
64 #ifdef OS_FREEBSD
65 #include <sched.h>
66 #include <sys/param.h>
67 #include <sys/resource.h>
68 #include <sys/cpuset.h>
69 #include <sys/thr.h>
70 #define cpu_set_t cpuset_t
71 #endif /* OS_FREEBSD */
72 
73 /* prototypes */
74 static int SetCPUAffinity(uint16_t cpu);
75 static void TmThreadDeinitMC(ThreadVars *tv);
76 
77 /* root of the threadvars list */
78 ThreadVars *tv_root[TVT_MAX] = { NULL };
79 
80 /* lock to protect tv_root */
82 
83 /**
84  * \brief Check if a thread flag is set.
85  *
86  * \retval 1 flag is set.
87  * \retval 0 flag is not set.
88  */
89 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
90 {
91  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
92 }
93 
94 /**
95  * \brief Set a thread flag.
96  */
97 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
98 {
99  SC_ATOMIC_OR(tv->flags, flag);
100 }
101 
102 /**
103  * \brief Unset a thread flag.
104  */
105 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
106 {
107  SC_ATOMIC_AND(tv->flags, ~flag);
108 }
109 
110 /**
111  * \brief Separate run function so we can call it recursively.
112  */
114 {
115  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
116  PACKET_PROFILING_TMM_START(p, s->tm_id);
117  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
118  PACKET_PROFILING_TMM_END(p, s->tm_id);
119 
120  /* handle error */
121  if (unlikely(r == TM_ECODE_FAILED)) {
122  /* Encountered error. Return packets to packetpool and return */
123  TmThreadsSlotProcessPktFail(tv, s, NULL);
124  return TM_ECODE_FAILED;
125  }
126 
127  /* handle new packets */
128  while (tv->decode_pq.top != NULL) {
129  Packet *extra_p = PacketDequeueNoLock(&tv->decode_pq);
130  if (unlikely(extra_p == NULL))
131  continue;
132 
133  /* see if we need to process the packet */
134  if (s->slot_next != NULL) {
135  r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
136  if (unlikely(r == TM_ECODE_FAILED)) {
137  TmThreadsSlotProcessPktFail(tv, s, extra_p);
138  return TM_ECODE_FAILED;
139  }
140  }
141  tv->tmqh_out(tv, extra_p);
142  }
143  }
144 
145  return TM_ECODE_OK;
146 }
147 
148 /** \internal
149  *
150  * \brief Process flow timeout packets
151  *
152  * Process flow timeout pseudo packets. During shutdown this loop
153  * is run until the flow engine kills the thread and the queue is
154  * empty.
155  */
156 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
157 {
158  TmSlot *fw_slot = tv->tm_flowworker;
159  int r = TM_ECODE_OK;
160 
161  if (tv->stream_pq == NULL || fw_slot == NULL) {
162  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
163  return r;
164  }
165 
166  SCLogDebug("flow end loop starting");
167  while (1) {
169  uint32_t len = tv->stream_pq->len;
171  if (len > 0) {
172  while (len--) {
176  if (likely(p)) {
177  if ((r = TmThreadsSlotProcessPkt(tv, fw_slot, p) != TM_ECODE_OK)) {
178  if (r == TM_ECODE_FAILED)
179  break;
180  }
181  }
182  }
183  } else {
185  break;
186  }
187  SleepUsec(1);
188  }
189  }
190  SCLogDebug("flow end loop complete");
192 
193  return r;
194 }
195 
196 /*
197 
198  pcap/nfq
199 
200  pkt read
201  callback
202  process_pkt
203 
204  pfring
205 
206  pkt read
207  process_pkt
208 
209  slot:
210  setup
211 
212  pkt_ack_loop(tv, slot_data)
213 
214  deinit
215 
216  process_pkt:
217  while(s)
218  run s;
219  queue;
220 
221  */
222 
223 static void *TmThreadsSlotPktAcqLoop(void *td)
224 {
225  ThreadVars *tv = (ThreadVars *)td;
226  TmSlot *s = tv->tm_slots;
227  char run = 1;
228  TmEcode r = TM_ECODE_OK;
229  TmSlot *slot = NULL;
230 
231  /* Set the thread name */
232  if (SCSetThreadName(tv->name) < 0) {
233  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
234  }
235 
236  if (tv->thread_setup_flags != 0)
238 
239  /* Drop the capabilities for this thread */
240  SCDropCaps(tv);
241 
242  PacketPoolInit();
243 
244  /* check if we are setup properly */
245  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
246  SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
247  " PktAcqLoop=%p, tmqh_in=%p,"
248  " tmqh_out=%p",
249  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
251  pthread_exit((void *) -1);
252  return NULL;
253  }
254 
255  for (slot = s; slot != NULL; slot = slot->slot_next) {
256  if (slot->SlotThreadInit != NULL) {
257  void *slot_data = NULL;
258  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
259  if (r != TM_ECODE_OK) {
260  if (r == TM_ECODE_DONE) {
261  EngineDone();
263  goto error;
264  } else {
266  goto error;
267  }
268  }
269  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
270  }
271 
272  /* if the flowworker module is the first, get the threads input queue */
273  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
274  tv->stream_pq = tv->inq->pq;
275  tv->tm_flowworker = slot;
276  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
278  if (tv->flow_queue == NULL) {
280  pthread_exit((void *) -1);
281  return NULL;
282  }
283  /* setup a queue */
284  } else if (slot->tm_id == TMM_FLOWWORKER) {
285  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
286  if (tv->stream_pq_local == NULL)
287  FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
290  tv->tm_flowworker = slot;
291  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
293  if (tv->flow_queue == NULL) {
295  pthread_exit((void *) -1);
296  return NULL;
297  }
298  }
299  }
300 
302 
304 
305  while(run) {
310  }
311 
312  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
313 
314  if (r == TM_ECODE_FAILED) {
316  run = 0;
317  }
319  run = 0;
320  }
321  if (r == TM_ECODE_DONE) {
322  run = 0;
323  }
324  }
326 
328 
329  /* process all pseudo packets the flow timeout may throw at us */
330  TmThreadTimeoutLoop(tv, s);
331 
334 
336 
337  for (slot = s; slot != NULL; slot = slot->slot_next) {
338  if (slot->SlotThreadExitPrintStats != NULL) {
339  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
340  }
341 
342  if (slot->SlotThreadDeinit != NULL) {
343  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
344  if (r != TM_ECODE_OK) {
346  goto error;
347  }
348  }
349  }
350 
351  tv->stream_pq = NULL;
352  SCLogDebug("%s ending", tv->name);
354  pthread_exit((void *) 0);
355  return NULL;
356 
357 error:
358  tv->stream_pq = NULL;
359  pthread_exit((void *) -1);
360  return NULL;
361 }
362 
363 static void *TmThreadsSlotVar(void *td)
364 {
365  ThreadVars *tv = (ThreadVars *)td;
366  TmSlot *s = (TmSlot *)tv->tm_slots;
367  Packet *p = NULL;
368  char run = 1;
369  TmEcode r = TM_ECODE_OK;
370 
371  PacketPoolInit();//Empty();
372 
373  /* Set the thread name */
374  if (SCSetThreadName(tv->name) < 0) {
375  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
376  }
377 
378  if (tv->thread_setup_flags != 0)
380 
381  /* Drop the capabilities for this thread */
382  SCDropCaps(tv);
383 
384  /* check if we are setup properly */
385  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
387  pthread_exit((void *) -1);
388  return NULL;
389  }
390 
391  for (; s != NULL; s = s->slot_next) {
392  if (s->SlotThreadInit != NULL) {
393  void *slot_data = NULL;
394  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
395  if (r != TM_ECODE_OK) {
397  goto error;
398  }
399  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
400  }
401 
402  /* special case: we need to access the stream queue
403  * from the flow timeout code */
404 
405  /* if the flowworker module is the first, get the threads input queue */
406  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
407  tv->stream_pq = tv->inq->pq;
408  tv->tm_flowworker = s;
409  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
411  if (tv->flow_queue == NULL) {
413  pthread_exit((void *) -1);
414  return NULL;
415  }
416  /* setup a queue */
417  } else if (s->tm_id == TMM_FLOWWORKER) {
418  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
419  if (tv->stream_pq_local == NULL)
420  FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
423  tv->tm_flowworker = s;
424  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
426  if (tv->flow_queue == NULL) {
428  pthread_exit((void *) -1);
429  return NULL;
430  }
431  }
432  }
433 
435 
437 
438  s = (TmSlot *)tv->tm_slots;
439 
440  while (run) {
445  }
446 
447  /* input a packet */
448  p = tv->tmqh_in(tv);
449 
450  /* if we didn't get a packet see if we need to do some housekeeping */
451  if (unlikely(p == NULL)) {
452  if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
454  if (p != NULL) {
457  }
458  }
459  }
460 
461  if (p != NULL) {
462  /* run the thread module(s) */
463  r = TmThreadsSlotVarRun(tv, p, s);
464  if (r == TM_ECODE_FAILED) {
467  break;
468  }
469 
470  /* output the packet */
471  tv->tmqh_out(tv, p);
472 
473  /* now handle the stream pq packets */
474  TmThreadsHandleInjectedPackets(tv);
475  }
476 
478  run = 0;
479  }
480  } /* while (run) */
482 
485 
487 
488  s = (TmSlot *)tv->tm_slots;
489 
490  for ( ; s != NULL; s = s->slot_next) {
491  if (s->SlotThreadExitPrintStats != NULL) {
492  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
493  }
494 
495  if (s->SlotThreadDeinit != NULL) {
496  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
497  if (r != TM_ECODE_OK) {
499  goto error;
500  }
501  }
502  }
503 
504  SCLogDebug("%s ending", tv->name);
505  tv->stream_pq = NULL;
507  pthread_exit((void *) 0);
508  return NULL;
509 
510 error:
511  tv->stream_pq = NULL;
512  pthread_exit((void *) -1);
513  return NULL;
514 }
515 
516 static void *TmThreadsManagement(void *td)
517 {
518  ThreadVars *tv = (ThreadVars *)td;
519  TmSlot *s = (TmSlot *)tv->tm_slots;
520  TmEcode r = TM_ECODE_OK;
521 
522  BUG_ON(s == NULL);
523 
524  /* Set the thread name */
525  if (SCSetThreadName(tv->name) < 0) {
526  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
527  }
528 
529  if (tv->thread_setup_flags != 0)
531 
532  /* Drop the capabilities for this thread */
533  SCDropCaps(tv);
534 
535  SCLogDebug("%s starting", tv->name);
536 
537  if (s->SlotThreadInit != NULL) {
538  void *slot_data = NULL;
539  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
540  if (r != TM_ECODE_OK) {
542  pthread_exit((void *) -1);
543  return NULL;
544  }
545  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
546  }
547 
549 
551 
552  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
553  /* handle error */
554  if (r == TM_ECODE_FAILED) {
556  }
557 
560  }
561 
564 
565  if (s->SlotThreadExitPrintStats != NULL) {
566  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
567  }
568 
569  if (s->SlotThreadDeinit != NULL) {
570  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
571  if (r != TM_ECODE_OK) {
573  pthread_exit((void *) -1);
574  return NULL;
575  }
576  }
577 
579  pthread_exit((void *) 0);
580  return NULL;
581 }
582 
583 /**
584  * \brief We set the slot functions.
585  *
586  * \param tv Pointer to the TV to set the slot function for.
587  * \param name Name of the slot variant.
588  * \param fn_p Pointer to a custom slot function. Used only if slot variant
589  * "name" is "custom".
590  *
591  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
592  */
593 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
594 {
595  if (name == NULL) {
596  if (fn_p == NULL) {
597  printf("Both slot name and function pointer can't be NULL inside "
598  "TmThreadSetSlots\n");
599  goto error;
600  } else {
601  name = "custom";
602  }
603  }
604 
605  if (strcmp(name, "varslot") == 0) {
606  tv->tm_func = TmThreadsSlotVar;
607  } else if (strcmp(name, "pktacqloop") == 0) {
608  tv->tm_func = TmThreadsSlotPktAcqLoop;
609  } else if (strcmp(name, "management") == 0) {
610  tv->tm_func = TmThreadsManagement;
611  } else if (strcmp(name, "command") == 0) {
612  tv->tm_func = TmThreadsManagement;
613  } else if (strcmp(name, "custom") == 0) {
614  if (fn_p == NULL)
615  goto error;
616  tv->tm_func = fn_p;
617  } else {
618  printf("Error: Slot \"%s\" not supported\n", name);
619  goto error;
620  }
621 
622  return TM_ECODE_OK;
623 
624 error:
625  return TM_ECODE_FAILED;
626 }
627 
629 {
631  for (int i = 0; i < TVT_MAX; i++) {
632  ThreadVars *tv = tv_root[i];
633  while (tv) {
634  TmSlot *slots = tv->tm_slots;
635  while (slots != NULL) {
636  if (slots == tm_slot) {
638  return tv;
639  }
640  slots = slots->slot_next;
641  }
642  tv = tv->next;
643  }
644  }
646  return NULL;
647 }
648 
649 /**
650  * \brief Appends a new entry to the slots.
651  *
652  * \param tv TV the slot is attached to.
653  * \param tm TM to append.
654  * \param data Data to be passed on to the slot init function.
655  *
656  * \retval The allocated TmSlot or NULL if there is an error
657  */
658 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
659 {
660  TmSlot *slot = SCMalloc(sizeof(TmSlot));
661  if (unlikely(slot == NULL))
662  return;
663  memset(slot, 0, sizeof(TmSlot));
664  SC_ATOMIC_INITPTR(slot->slot_data);
665  slot->SlotThreadInit = tm->ThreadInit;
666  slot->slot_initdata = data;
667  if (tm->Func) {
668  slot->SlotFunc = tm->Func;
669  } else if (tm->PktAcqLoop) {
670  slot->PktAcqLoop = tm->PktAcqLoop;
671  if (tm->PktAcqBreakLoop) {
672  tv->break_loop = true;
673  }
674  } else if (tm->Management) {
675  slot->Management = tm->Management;
676  }
678  slot->SlotThreadDeinit = tm->ThreadDeinit;
679  /* we don't have to check for the return value "-1". We wouldn't have
680  * received a TM as arg, if it didn't exist */
681  slot->tm_id = TmModuleGetIDForTM(tm);
682 
683  tv->tmm_flags |= tm->flags;
684  tv->cap_flags |= tm->cap_flags;
685 
686  if (tv->tm_slots == NULL) {
687  tv->tm_slots = slot;
688  } else {
689  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
690 
691  /* get the last slot */
692  for ( ; a != NULL; a = a->slot_next) {
693  b = a;
694  }
695  /* append the new slot */
696  if (b != NULL) {
697  b->slot_next = slot;
698  }
699  }
700  return;
701 }
702 
703 /**
704  * \brief Returns the slot holding a TM with the particular tm_id.
705  *
706  * \param tm_id TM id of the TM whose slot has to be returned.
707  *
708  * \retval slots Pointer to the slot.
709  */
711 {
713  for (int i = 0; i < TVT_MAX; i++) {
714  ThreadVars *tv = tv_root[i];
715  while (tv) {
716  TmSlot *slots = tv->tm_slots;
717  while (slots != NULL) {
718  if (slots->tm_id == tm_id) {
720  return slots;
721  }
722  slots = slots->slot_next;
723  }
724  tv = tv->next;
725  }
726  }
728  return NULL;
729 }
730 
731 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
732 static int SetCPUAffinitySet(cpu_set_t *cs)
733 {
734 #if defined OS_FREEBSD
735  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
736  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
737 #elif OS_DARWIN
738  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
739  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
740 #else
741  pid_t tid = syscall(SYS_gettid);
742  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
743 #endif /* OS_FREEBSD */
744 
745  if (r != 0) {
746  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
747  strerror(errno));
748  return -1;
749  }
750 
751  return 0;
752 }
753 #endif
754 
755 
756 /**
757  * \brief Set the thread affinity on the calling thread.
758  *
759  * \param cpuid Id of the core/cpu to setup the affinity.
760  *
761  * \retval 0 If all goes well; -1 if something is wrong.
762  */
763 static int SetCPUAffinity(uint16_t cpuid)
764 {
765 #if defined __OpenBSD__ || defined sun
766  return 0;
767 #else
768  int cpu = (int)cpuid;
769 
770 #if defined OS_WIN32 || defined __CYGWIN__
771  DWORD cs = 1 << cpu;
772 
773  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
774  if (r != 0) {
775  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
776  strerror(errno));
777  return -1;
778  }
779  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
780  SCGetThreadIdLong(), cpu);
781 
782  return 0;
783 
784 #else
785  cpu_set_t cs;
786 
787  CPU_ZERO(&cs);
788  CPU_SET(cpu, &cs);
789  return SetCPUAffinitySet(&cs);
790 #endif /* windows */
791 #endif /* not supported */
792 }
793 
794 
795 /**
796  * \brief Set the thread options (thread priority).
797  *
798  * \param tv Pointer to the ThreadVars to setup the thread priority.
799  *
800  * \retval TM_ECODE_OK.
801  */
803 {
805  tv->thread_priority = prio;
806 
807  return TM_ECODE_OK;
808 }
809 
810 /**
811  * \brief Adjusting nice value for threads.
812  */
814 {
815  SCEnter();
816 #ifndef __CYGWIN__
817 #ifdef OS_WIN32
818  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
819  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for "
820  "thread %s: %s", tv->name, strerror(errno));
821  } else {
822  SCLogDebug("Priority set to %"PRId32" for thread %s",
824  }
825 #else
826  int ret = nice(tv->thread_priority);
827  if (ret == -1) {
828  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value %d "
829  "for thread %s: %s", tv->thread_priority, tv->name,
830  strerror(errno));
831  } else {
832  SCLogDebug("Nice value set to %"PRId32" for thread %s",
834  }
835 #endif /* OS_WIN32 */
836 #endif
837  SCReturn;
838 }
839 
840 
841 /**
842  * \brief Set the thread options (cpu affinity).
843  *
844  * \param tv pointer to the ThreadVars to setup the affinity.
845  * \param cpu cpu on which affinity is set.
846  *
847  * \retval TM_ECODE_OK
848  */
850 {
852  tv->cpu_affinity = cpu;
853 
854  return TM_ECODE_OK;
855 }
856 
857 
859 {
861  return TM_ECODE_OK;
862 
863  if (type > MAX_CPU_SET) {
864  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
865  return TM_ECODE_FAILED;
866  }
867 
869  tv->cpu_affinity = type;
870 
871  return TM_ECODE_OK;
872 }
873 
875 {
876  if (type >= MAX_CPU_SET) {
877  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
878  return 0;
879  }
880 
882 }
883 
884 /**
885  * \brief Set the thread options (cpu affinitythread).
886  * Priority should be already set by pthread_create.
887  *
888  * \param tv pointer to the ThreadVars of the calling thread.
889  */
891 {
893  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
894  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
896  SetCPUAffinity(tv->cpu_affinity);
897  }
898 
899 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
904  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
905  int cpu = AffinityGetNextCPU(taf);
906  SetCPUAffinity(cpu);
907  /* If CPU is in a set overwrite the default thread prio */
908  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
910  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
912  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
914  } else {
915  tv->thread_priority = taf->prio;
916  }
917  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
918  "%d, thread id %lu", tv->thread_priority,
919  tv->name, cpu, SCGetThreadIdLong());
920  } else {
921  SetCPUAffinitySet(&taf->cpu_set);
922  tv->thread_priority = taf->prio;
923  SCLogPerf("Setting prio %d for thread \"%s\", "
924  "thread id %lu", tv->thread_priority,
926  }
928  }
929 #endif
930 
931  return TM_ECODE_OK;
932 }
933 
934 /**
935  * \brief Creates and returns the TV instance for a new thread.
936  *
937  * \param name Name of this TV instance
938  * \param inq_name Incoming queue name
939  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
940  * \param outq_name Outgoing queue name
941  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
942  * \param slots String representation for the slot function to be used
943  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
944  * \param mucond Flag to indicate whether to initialize the condition
945  * and the mutex variables for this newly created TV.
946  *
947  * \retval the newly created TV instance, or NULL on error
948  */
949 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
950  const char *outq_name, const char *outqh_name, const char *slots,
951  void * (*fn_p)(void *), int mucond)
952 {
953  ThreadVars *tv = NULL;
954  Tmq *tmq = NULL;
955  Tmqh *tmqh = NULL;
956 
957  SCLogDebug("creating thread \"%s\"...", name);
958 
959  /* XXX create separate function for this: allocate a thread container */
960  tv = SCMalloc(sizeof(ThreadVars));
961  if (unlikely(tv == NULL))
962  goto error;
963  memset(tv, 0, sizeof(ThreadVars));
964 
965  SC_ATOMIC_INIT(tv->flags);
966  SCMutexInit(&tv->perf_public_ctx.m, NULL);
967 
968  strlcpy(tv->name, name, sizeof(tv->name));
969 
970  /* default state for every newly created thread */
973 
974  /* set the incoming queue */
975  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
976  SCLogDebug("inq_name \"%s\"", inq_name);
977 
978  tmq = TmqGetQueueByName(inq_name);
979  if (tmq == NULL) {
980  tmq = TmqCreateQueue(inq_name);
981  if (tmq == NULL)
982  goto error;
983  }
984  SCLogDebug("tmq %p", tmq);
985 
986  tv->inq = tmq;
987  tv->inq->reader_cnt++;
988  SCLogDebug("tv->inq %p", tv->inq);
989  }
990  if (inqh_name != NULL) {
991  SCLogDebug("inqh_name \"%s\"", inqh_name);
992 
993  int id = TmqhNameToID(inqh_name);
994  if (id <= 0) {
995  goto error;
996  }
997  tmqh = TmqhGetQueueHandlerByName(inqh_name);
998  if (tmqh == NULL)
999  goto error;
1000 
1001  tv->tmqh_in = tmqh->InHandler;
1002  tv->inq_id = (uint8_t)id;
1003  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
1004  }
1005 
1006  /* set the outgoing queue */
1007  if (outqh_name != NULL) {
1008  SCLogDebug("outqh_name \"%s\"", outqh_name);
1009 
1010  int id = TmqhNameToID(outqh_name);
1011  if (id <= 0) {
1012  goto error;
1013  }
1014 
1015  tmqh = TmqhGetQueueHandlerByName(outqh_name);
1016  if (tmqh == NULL)
1017  goto error;
1018 
1019  tv->tmqh_out = tmqh->OutHandler;
1020  tv->outq_id = (uint8_t)id;
1021 
1022  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1023  SCLogDebug("outq_name \"%s\"", outq_name);
1024 
1025  if (tmqh->OutHandlerCtxSetup != NULL) {
1026  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1027  if (tv->outctx == NULL)
1028  goto error;
1029  tv->outq = NULL;
1030  } else {
1031  tmq = TmqGetQueueByName(outq_name);
1032  if (tmq == NULL) {
1033  tmq = TmqCreateQueue(outq_name);
1034  if (tmq == NULL)
1035  goto error;
1036  }
1037  SCLogDebug("tmq %p", tmq);
1038 
1039  tv->outq = tmq;
1040  tv->outctx = NULL;
1041  tv->outq->writer_cnt++;
1042  }
1043  }
1044  }
1045 
1046  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1047  goto error;
1048  }
1049 
1050  if (mucond != 0)
1051  TmThreadInitMC(tv);
1052 
1053  return tv;
1054 
1055 error:
1056  SCLogError(SC_ERR_THREAD_CREATE, "failed to setup a thread");
1057 
1058  if (tv != NULL)
1059  SCFree(tv);
1060  return NULL;
1061 }
1062 
1063 /**
1064  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1065  * This function doesn't support custom slots, and hence shouldn't be
1066  * supplied \"custom\" as its slot type. All PPT threads are created
1067  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1068  * conditional variables are not used to kill the thread.
1069  *
1070  * \param name Name of this TV instance
1071  * \param inq_name Incoming queue name
1072  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1073  * \param outq_name Outgoing queue name
1074  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1075  * \param slots String representation for the slot function to be used
1076  *
1077  * \retval the newly created TV instance, or NULL on error
1078  */
1079 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1080  const char *inqh_name, const char *outq_name,
1081  const char *outqh_name, const char *slots)
1082 {
1083  ThreadVars *tv = NULL;
1084 
1085  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1086  slots, NULL, 0);
1087 
1088  if (tv != NULL) {
1089  tv->type = TVT_PPT;
1091  }
1092 
1093 
1094  return tv;
1095 }
1096 
1097 /**
1098  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1099  * This function supports only custom slot functions and hence a
1100  * function pointer should be sent as an argument.
1101  *
1102  * \param name Name of this TV instance
1103  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1104  * \param mucond Flag to indicate whether to initialize the condition
1105  * and the mutex variables for this newly created TV.
1106  *
1107  * \retval the newly created TV instance, or NULL on error
1108  */
1109 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1110  int mucond)
1111 {
1112  ThreadVars *tv = NULL;
1113 
1114  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1115 
1116  if (tv != NULL) {
1117  tv->type = TVT_MGMT;
1120  }
1121 
1122  return tv;
1123 }
1124 
1125 /**
1126  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1127  * This function supports only custom slot functions and hence a
1128  * function pointer should be sent as an argument.
1129  *
1130  * \param name Name of this TV instance
1131  * \param module Name of TmModule with MANAGEMENT flag set.
1132  * \param mucond Flag to indicate whether to initialize the condition
1133  * and the mutex variables for this newly created TV.
1134  *
1135  * \retval the newly created TV instance, or NULL on error
1136  */
1137 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1138  int mucond)
1139 {
1140  ThreadVars *tv = NULL;
1141 
1142  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1143 
1144  if (tv != NULL) {
1145  tv->type = TVT_MGMT;
1148 
1149  TmModule *m = TmModuleGetByName(module);
1150  if (m) {
1151  TmSlotSetFuncAppend(tv, m, NULL);
1152  }
1153  }
1154 
1155  return tv;
1156 }
1157 
1158 /**
1159  * \brief Creates and returns the TV instance for a Command thread (CMD).
1160  * This function supports only custom slot functions and hence a
1161  * function pointer should be sent as an argument.
1162  *
1163  * \param name Name of this TV instance
1164  * \param module Name of TmModule with COMMAND flag set.
1165  * \param mucond Flag to indicate whether to initialize the condition
1166  * and the mutex variables for this newly created TV.
1167  *
1168  * \retval the newly created TV instance, or NULL on error
1169  */
1170 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1171  int mucond)
1172 {
1173  ThreadVars *tv = NULL;
1174 
1175  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1176 
1177  if (tv != NULL) {
1178  tv->type = TVT_CMD;
1181 
1182  TmModule *m = TmModuleGetByName(module);
1183  if (m) {
1184  TmSlotSetFuncAppend(tv, m, NULL);
1185  }
1186  }
1187 
1188  return tv;
1189 }
1190 
1191 /**
1192  * \brief Appends this TV to tv_root based on its type
1193  *
1194  * \param type holds the type this TV belongs to.
1195  */
1197 {
1199 
1200  if (tv_root[type] == NULL) {
1201  tv_root[type] = tv;
1202  tv->next = NULL;
1203 
1205 
1206  return;
1207  }
1208 
1209  ThreadVars *t = tv_root[type];
1210 
1211  while (t) {
1212  if (t->next == NULL) {
1213  t->next = tv;
1214  tv->next = NULL;
1215  break;
1216  }
1217 
1218  t = t->next;
1219  }
1220 
1222 
1223  return;
1224 }
1225 
1226 static bool ThreadStillHasPackets(ThreadVars *tv)
1227 {
1228  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1229  /* we wait till we dry out all the inq packets, before we
1230  * kill this thread. Do note that you should have disabled
1231  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1232  PacketQueue *q = tv->inq->pq;
1233  SCMutexLock(&q->mutex_q);
1234  uint32_t len = q->len;
1235  SCMutexUnlock(&q->mutex_q);
1236  if (len != 0) {
1237  return true;
1238  }
1239  }
1240 
1241  if (tv->stream_pq != NULL) {
1243  uint32_t len = tv->stream_pq->len;
1245 
1246  if (len != 0) {
1247  return true;
1248  }
1249  }
1250  return false;
1251 }
1252 
1253 /**
1254  * \brief Kill a thread.
1255  *
1256  * \param tv A ThreadVars instance corresponding to the thread that has to be
1257  * killed.
1258  *
1259  * \retval r 1 killed succesfully
1260  * 0 not yet ready, needs another look
1261  */
1262 static int TmThreadKillThread(ThreadVars *tv)
1263 {
1264  BUG_ON(tv == NULL);
1265 
1266  /* kill only once :) */
1267  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1268  return 1;
1269  }
1270 
1271  /* set the thread flag informing the thread that it needs to be
1272  * terminated */
1275 
1276  /* to be sure, signal more */
1277  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1278  if (tv->inq_id != TMQH_NOT_SET) {
1280  if (qh != NULL && qh->InShutdownHandler != NULL) {
1281  qh->InShutdownHandler(tv);
1282  }
1283  }
1284  if (tv->inq != NULL) {
1285  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1286  SCCondSignal(&tv->inq->pq->cond_q);
1287  }
1288  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1289  }
1290 
1291  if (tv->ctrl_cond != NULL ) {
1292  pthread_cond_broadcast(tv->ctrl_cond);
1293  }
1294  return 0;
1295  }
1296 
1297  if (tv->outctx != NULL) {
1298  if (tv->outq_id != TMQH_NOT_SET) {
1300  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1301  qh->OutHandlerCtxFree(tv->outctx);
1302  tv->outctx = NULL;
1303  }
1304  }
1305  }
1306 
1307  /* join it and flag it as dead */
1308  pthread_join(tv->t, NULL);
1309  SCLogDebug("thread %s stopped", tv->name);
1311  return 1;
1312 }
1313 
1314 /** \internal
1315  *
1316  * \brief make sure that all packet threads are done processing their
1317  * in-flight packets, including 'injected' flow packets.
1318  */
1319 static void TmThreadDrainPacketThreads(void)
1320 {
1321  ThreadVars *tv = NULL;
1322  struct timeval start_ts;
1323  struct timeval cur_ts;
1324  gettimeofday(&start_ts, NULL);
1325 
1326 again:
1327  gettimeofday(&cur_ts, NULL);
1328  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1329  SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads "
1330  "to process their packets in time");
1331  return;
1332  }
1333 
1335 
1336  /* all receive threads are part of packet processing threads */
1337  tv = tv_root[TVT_PPT];
1338  while (tv) {
1339  if (ThreadStillHasPackets(tv)) {
1340  /* we wait till we dry out all the inq packets, before we
1341  * kill this thread. Do note that you should have disabled
1342  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1344 
1345  /* sleep outside lock */
1346  SleepMsec(1);
1347  goto again;
1348  }
1349  if (tv->flow_queue) {
1351  bool fq_done = (tv->flow_queue->qlen == 0);
1353  if (!fq_done) {
1355 
1356  Packet *p = PacketGetFromAlloc();
1357  if (p != NULL) {
1360  PacketQueue *q = tv->stream_pq;
1361  SCMutexLock(&q->mutex_q);
1362  PacketEnqueue(q, p);
1363  SCCondSignal(&q->cond_q);
1364  SCMutexUnlock(&q->mutex_q);
1365  }
1366 
1367  /* don't sleep while holding a lock */
1368  SleepMsec(1);
1369  goto again;
1370  }
1371  }
1372  tv = tv->next;
1373  }
1374 
1376  return;
1377 }
1378 
1379 /**
1380  * \brief Disable all threads having the specified TMs.
1381  *
1382  * Breaks out of the packet acquisition loop, and bumps
1383  * into the 'flow loop', where it will process packets
1384  * from the flow engine's shutdown handling.
1385  */
1387 {
1388  ThreadVars *tv = NULL;
1389  struct timeval start_ts;
1390  struct timeval cur_ts;
1391  gettimeofday(&start_ts, NULL);
1392 
1393 again:
1394  gettimeofday(&cur_ts, NULL);
1395  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1396  FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1397  "thread - \"%s\". Killing engine", tv->name);
1398  }
1399 
1401 
1402  /* all receive threads are part of packet processing threads */
1403  tv = tv_root[TVT_PPT];
1404 
1405  /* we do have to keep in mind that TVs are arranged in the order
1406  * right from receive to log. The moment we fail to find a
1407  * receive TM amongst the slots in a tv, it indicates we are done
1408  * with all receive threads */
1409  while (tv) {
1410  int disable = 0;
1411  TmModule *tm = NULL;
1412  /* obtain the slots for this TV */
1413  TmSlot *slots = tv->tm_slots;
1414  while (slots != NULL) {
1415  tm = TmModuleGetById(slots->tm_id);
1416 
1417  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1418  disable = 1;
1419  break;
1420  }
1421 
1422  slots = slots->slot_next;
1423  continue;
1424  }
1425 
1426  if (disable) {
1427  if (ThreadStillHasPackets(tv)) {
1428  /* we wait till we dry out all the inq packets, before we
1429  * kill this thread. Do note that you should have disabled
1430  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1432  /* don't sleep while holding a lock */
1433  SleepMsec(1);
1434  goto again;
1435  }
1436 
1437  if (tv->flow_queue) {
1439  bool fq_done = (tv->flow_queue->qlen == 0);
1441  if (!fq_done) {
1443 
1444  Packet *p = PacketGetFromAlloc();
1445  if (p != NULL) {
1448  PacketQueue *q = tv->stream_pq;
1449  SCMutexLock(&q->mutex_q);
1450  PacketEnqueue(q, p);
1451  SCCondSignal(&q->cond_q);
1452  SCMutexUnlock(&q->mutex_q);
1453  }
1454 
1455  /* don't sleep while holding a lock */
1456  SleepMsec(1);
1457  goto again;
1458  }
1459  }
1460 
1461  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1462  if (tm && tm->PktAcqBreakLoop != NULL) {
1463  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1464  }
1466 
1467  if (tv->inq != NULL) {
1468  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1469  SCCondSignal(&tv->inq->pq->cond_q);
1470  }
1471  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1472  }
1473 
1474  /* wait for it to enter the 'flow loop' stage */
1475  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1477 
1478  SleepMsec(1);
1479  goto again;
1480  }
1481  }
1482 
1483  tv = tv->next;
1484  }
1485 
1487 
1488  /* finally wait for all packet threads to have
1489  * processed all of their 'live' packets so we
1490  * don't process the last live packets together
1491  * with FFR packets */
1492  TmThreadDrainPacketThreads();
1493  return;
1494 }
1495 
1496 static void TmThreadDebugValidateNoMorePackets(void)
1497 {
1498 #ifdef DEBUG_VALIDATION
1500  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1501  if (ThreadStillHasPackets(tv)) {
1504  abort();
1505  }
1506  }
1508 #endif
1509 }
1510 
1511 /**
1512  * \brief Disable all packet threads
1513  */
1515 {
1516  struct timeval start_ts;
1517  struct timeval cur_ts;
1518 
1519  /* first drain all packet threads of their packets */
1520  TmThreadDrainPacketThreads();
1521 
1522  /* since all the threads possibly able to produce more packets
1523  * are now gone or inactive, we should see no packets anywhere
1524  * anymore. */
1525  TmThreadDebugValidateNoMorePackets();
1526 
1527  gettimeofday(&start_ts, NULL);
1528 again:
1529  gettimeofday(&cur_ts, NULL);
1530  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1531  FatalError(SC_ERR_FATAL, "Engine unable to disable packet "
1532  "threads. Killing engine");
1533  }
1534 
1535  /* loop through the packet threads and kill them */
1537  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1539 
1540  /* separate worker threads (autofp) will still wait at their
1541  * input queues. So nudge them here so they will observe the
1542  * THV_KILL flag. */
1543  if (tv->inq != NULL) {
1544  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1545  SCCondSignal(&tv->inq->pq->cond_q);
1546  }
1547  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1548  }
1549 
1552 
1553  SleepMsec(1);
1554  goto again;
1555  }
1556  }
1558  return;
1559 }
1560 
1562 {
1563  ThreadVars *tv = NULL;
1564  TmSlot *slots = NULL;
1565 
1567 
1568  /* all receive threads are part of packet processing threads */
1569  tv = tv_root[TVT_PPT];
1570 
1571  while (tv) {
1572  slots = tv->tm_slots;
1573 
1574  while (slots != NULL) {
1575  TmModule *tm = TmModuleGetById(slots->tm_id);
1576 
1577  char *found = strstr(tm->name, tm_name);
1578  if (found != NULL)
1579  goto end;
1580 
1581  slots = slots->slot_next;
1582  }
1583 
1584  tv = tv->next;
1585  }
1586 
1587  end:
1589  return slots;
1590 }
1591 
1592 #define MIN_WAIT_TIME 100
1593 #define MAX_WAIT_TIME 999999
1595 {
1596  ThreadVars *tv = NULL;
1597  unsigned int sleep_usec = MIN_WAIT_TIME;
1598 
1599  BUG_ON((family < 0) || (family >= TVT_MAX));
1600 
1601 again:
1603  tv = tv_root[family];
1604 
1605  while (tv) {
1606  int r = TmThreadKillThread(tv);
1607  if (r == 0) {
1609  SleepUsec(sleep_usec);
1610  sleep_usec *= 2; /* slowly back off */
1611  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1612  goto again;
1613  }
1614  sleep_usec = MIN_WAIT_TIME; /* reset */
1615 
1616  tv = tv->next;
1617  }
1619 }
1620 #undef MIN_WAIT_TIME
1621 #undef MAX_WAIT_TIME
1622 
1624 {
1625  int i = 0;
1626 
1627  for (i = 0; i < TVT_MAX; i++) {
1629  }
1630 
1631  return;
1632 }
1633 
1634 static void TmThreadFree(ThreadVars *tv)
1635 {
1636  TmSlot *s;
1637  TmSlot *ps;
1638  if (tv == NULL)
1639  return;
1640 
1641  SCLogDebug("Freeing thread '%s'.", tv->name);
1642 
1643  if (tv->flow_queue) {
1644  BUG_ON(tv->flow_queue->qlen != 0);
1645  SCFree(tv->flow_queue);
1646  }
1647 
1649 
1650  TmThreadDeinitMC(tv);
1651 
1652  if (tv->thread_group_name) {
1654  }
1655 
1656  if (tv->printable_name) {
1658  }
1659 
1660  if (tv->stream_pq_local) {
1664  }
1665 
1666  s = (TmSlot *)tv->tm_slots;
1667  while (s) {
1668  ps = s;
1669  s = s->slot_next;
1670  SCFree(ps);
1671  }
1672 
1674  SCFree(tv);
1675 }
1676 
1677 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1678 {
1679  char *thread_group_name = NULL;
1680 
1681  if (name == NULL)
1682  return;
1683 
1684  if (tv == NULL)
1685  return;
1686 
1687  thread_group_name = SCStrdup(name);
1688  if (unlikely(thread_group_name == NULL)) {
1689  SCLogError(SC_ERR_RUNMODE, "error allocating memory");
1690  return;
1691  }
1692  tv->thread_group_name = thread_group_name;
1693 }
1694 
1696 {
1697  ThreadVars *tv = NULL;
1698  ThreadVars *ptv = NULL;
1699 
1700  if ((family < 0) || (family >= TVT_MAX))
1701  return;
1702 
1704  tv = tv_root[family];
1705 
1706  while (tv) {
1707  ptv = tv;
1708  tv = tv->next;
1709  TmThreadFree(ptv);
1710  }
1711  tv_root[family] = NULL;
1713 }
1714 
1715 /**
1716  * \brief Spawns a thread associated with the ThreadVars instance tv
1717  *
1718  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1719  */
1721 {
1722  pthread_attr_t attr;
1723  if (tv->tm_func == NULL) {
1724  printf("ERROR: no thread function set\n");
1725  return TM_ECODE_FAILED;
1726  }
1727 
1728  /* Initialize and set thread detached attribute */
1729  pthread_attr_init(&attr);
1730 
1731  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1732 
1733  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1734  if (rc) {
1735  printf("ERROR; return code from pthread_create() is %" PRId32 "\n", rc);
1736  return TM_ECODE_FAILED;
1737  }
1738 
1740 
1741  TmThreadAppend(tv, tv->type);
1742  return TM_ECODE_OK;
1743 }
1744 
1745 /**
1746  * \brief Initializes the mutex and condition variables for this TV
1747  *
1748  * It can be used by a thread to control a wait loop that can also be
1749  * influenced by other threads.
1750  *
1751  * \param tv Pointer to a TV instance
1752  */
1754 {
1755  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1757  "Fatal error encountered in TmThreadInitMC. "
1758  "Exiting...");
1759  }
1760 
1761  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1762  printf("Error initializing the tv->m mutex\n");
1763  exit(EXIT_FAILURE);
1764  }
1765 
1766  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1768  "Fatal error encountered in TmThreadInitMC. "
1769  "Exiting...");
1770  }
1771 
1772  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1773  FatalError(SC_ERR_FATAL, "Error initializing the tv->cond condition "
1774  "variable");
1775  }
1776 
1777  return;
1778 }
1779 
1780 static void TmThreadDeinitMC(ThreadVars *tv)
1781 {
1782  if (tv->ctrl_mutex) {
1784  SCFree(tv->ctrl_mutex);
1785  }
1786  if (tv->ctrl_cond) {
1788  SCFree(tv->ctrl_cond);
1789  }
1790  return;
1791 }
1792 
1793 /**
1794  * \brief Tests if the thread represented in the arg has been unpaused or not.
1795  *
1796  * The function would return if the thread tv has been unpaused or if the
1797  * kill flag for the thread has been set.
1798  *
1799  * \param tv Pointer to the TV instance.
1800  */
1802 {
1803  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1804  SleepUsec(100);
1805 
1807  break;
1808  }
1809 
1810  return;
1811 }
1812 
1813 /**
1814  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1815  * the kill flag has been set or not on the thread.
1816  *
1817  * \param tv Pointer to the TV instance.
1818  */
1820 {
1821  while (!TmThreadsCheckFlag(tv, flags)) {
1822  SleepUsec(100);
1823  }
1824 
1825  return;
1826 }
1827 
1828 /**
1829  * \brief Unpauses a thread
1830  *
1831  * \param tv Pointer to a TV instance that has to be unpaused
1832  */
1834 {
1836 
1837  return;
1838 }
1839 
1840 /**
1841  * \brief Unpauses all threads present in tv_root
1842  */
1844 {
1846  for (int i = 0; i < TVT_MAX; i++) {
1847  ThreadVars *tv = tv_root[i];
1848  while (tv != NULL) {
1850  tv = tv->next;
1851  }
1852  }
1854  return;
1855 }
1856 
1857 /**
1858  * \brief Pauses a thread
1859  *
1860  * \param tv Pointer to a TV instance that has to be paused
1861  */
1863 {
1865  return;
1866 }
1867 
1868 /**
1869  * \brief Pauses all threads present in tv_root
1870  */
1872 {
1874 
1876  for (int i = 0; i < TVT_MAX; i++) {
1877  ThreadVars *tv = tv_root[i];
1878  while (tv != NULL) {
1879  TmThreadPause(tv);
1880  tv = tv->next;
1881  }
1882  }
1884 }
1885 
1886 /**
1887  * \brief Used to check the thread for certain conditions of failure.
1888  */
1890 {
1892  for (int i = 0; i < TVT_MAX; i++) {
1893  ThreadVars *tv = tv_root[i];
1894  while (tv) {
1896  FatalError(SC_ERR_FATAL, "thread %s failed", tv->name);
1897  }
1898  tv = tv->next;
1899  }
1900  }
1902  return;
1903 }
1904 
1905 /**
1906  * \brief Used to check if all threads have finished their initialization. On
1907  * finding an un-initialized thread, it waits till that thread completes
1908  * its initialization, before proceeding to the next thread.
1909  *
1910  * \retval TM_ECODE_OK all initialized properly
1911  * \retval TM_ECODE_FAILED failure
1912  */
1914 {
1915  uint16_t RX_num = 0;
1916  uint16_t W_num = 0;
1917  uint16_t FM_num = 0;
1918  uint16_t FR_num = 0;
1919  uint16_t TX_num = 0;
1920 
1921  struct timeval start_ts;
1922  struct timeval cur_ts;
1923  gettimeofday(&start_ts, NULL);
1924 
1925 again:
1927  for (int i = 0; i < TVT_MAX; i++) {
1928  ThreadVars *tv = tv_root[i];
1929  while (tv != NULL) {
1932 
1933  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1934  "initialize: flags %04x", tv->name,
1935  SC_ATOMIC_GET(tv->flags));
1936  return TM_ECODE_FAILED;
1937  }
1938 
1939  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
1941 
1942  gettimeofday(&cur_ts, NULL);
1943  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
1944  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1945  "initialize in time: flags %04x", tv->name,
1946  SC_ATOMIC_GET(tv->flags));
1947  return TM_ECODE_FAILED;
1948  }
1949 
1950  /* sleep a little to give the thread some
1951  * time to finish initialization */
1952  SleepUsec(100);
1953  goto again;
1954  }
1955 
1958  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1959  "initialize.", tv->name);
1960  return TM_ECODE_FAILED;
1961  }
1964  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" closed on "
1965  "initialization.", tv->name);
1966  return TM_ECODE_FAILED;
1967  }
1968 
1969  if (strncmp(thread_name_autofp, tv->name, strlen(thread_name_autofp)) == 0)
1970  RX_num++;
1971  else if (strncmp(thread_name_workers, tv->name, strlen(thread_name_workers)) == 0)
1972  W_num++;
1973  else if (strncmp(thread_name_verdict, tv->name, strlen(thread_name_verdict)) == 0)
1974  TX_num++;
1975  else if (strncmp(thread_name_flow_mgr, tv->name, strlen(thread_name_flow_mgr)) == 0)
1976  FM_num++;
1977  else if (strncmp(thread_name_flow_rec, tv->name, strlen(thread_name_flow_rec)) == 0)
1978  FR_num++;
1979 
1980  tv = tv->next;
1981  }
1982  }
1984 
1985  /* Construct a welcome string displaying
1986  * initialized thread types and counts */
1987  uint16_t app_len = 32;
1988  uint16_t buf_len = 256;
1989 
1990  char append_str[app_len];
1991  char thread_counts[buf_len];
1992 
1993  strlcpy(thread_counts, "Threads created -> ", strlen("Threads created -> ") + 1);
1994  if (RX_num > 0) {
1995  snprintf(append_str, app_len, "RX: %u ", RX_num);
1996  strlcat(thread_counts, append_str, buf_len);
1997  }
1998  if (W_num > 0) {
1999  snprintf(append_str, app_len, "W: %u ", W_num);
2000  strlcat(thread_counts, append_str, buf_len);
2001  }
2002  if (TX_num > 0) {
2003  snprintf(append_str, app_len, "TX: %u ", TX_num);
2004  strlcat(thread_counts, append_str, buf_len);
2005  }
2006  if (FM_num > 0) {
2007  snprintf(append_str, app_len, "FM: %u ", FM_num);
2008  strlcat(thread_counts, append_str, buf_len);
2009  }
2010  if (FR_num > 0) {
2011  snprintf(append_str, app_len, "FR: %u ", FR_num);
2012  strlcat(thread_counts, append_str, buf_len);
2013  }
2014  snprintf(append_str, app_len, " Engine started.");
2015  strlcat(thread_counts, append_str, buf_len);
2016  SCLogNotice("%s", thread_counts);
2017 
2018  return TM_ECODE_OK;
2019 }
2020 
2021 /**
2022  * \brief Returns the TV for the calling thread.
2023  *
2024  * \retval tv Pointer to the ThreadVars instance for the calling thread;
2025  * NULL on no match
2026  */
2028 {
2029  pthread_t self = pthread_self();
2030 
2032  for (int i = 0; i < TVT_MAX; i++) {
2033  ThreadVars *tv = tv_root[i];
2034  while (tv) {
2035  if (pthread_equal(self, tv->t)) {
2037  return tv;
2038  }
2039  tv = tv->next;
2040  }
2041  }
2043  return NULL;
2044 }
2045 
2046 /**
2047  * \brief returns a count of all the threads that match the flag
2048  */
2050 {
2051  uint32_t cnt = 0;
2053  for (int i = 0; i < TVT_MAX; i++) {
2054  ThreadVars *tv = tv_root[i];
2055  while (tv != NULL) {
2056  if ((tv->tmm_flags & flags) == flags)
2057  cnt++;
2058 
2059  tv = tv->next;
2060  }
2061  }
2063  return cnt;
2064 }
2065 
2066 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2067 {
2068  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2070  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
2071  tv, s, s->tm_id, m->name);
2072  }
2073 }
2074 
2076 {
2078  for (int i = 0; i < TVT_MAX; i++) {
2079  ThreadVars *tv = tv_root[i];
2080  while (tv != NULL) {
2081  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2082  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2083  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2084  if (tv->inq && tv->stream_pq == tv->inq->pq) {
2085  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2086  } else if (tv->stream_pq_local != NULL) {
2087  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
2088  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
2089  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
2090  }
2091  }
2092  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
2093  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
2094  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
2095  }
2096  TmThreadDoDumpSlots(tv);
2097  tv = tv->next;
2098  }
2099  }
2102 }
2103 
2104 typedef struct Thread_ {
2105  ThreadVars *tv; /**< threadvars structure */
2106  const char *name;
2107  int type;
2108  int in_use; /**< bool to indicate this is in use */
2109 
2110  struct timeval pktts; /**< current packet time of this thread
2111  * (offline mode) */
2112  uint32_t sys_sec_stamp; /**< timestamp in seconds of the real system
2113  * time when the pktts was last updated. */
2115 
2116 typedef struct Threads_ {
2121 
2122 static Threads thread_store = { NULL, 0, 0 };
2123 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2124 
2126 {
2127  SCMutexLock(&thread_store_lock);
2128  for (size_t s = 0; s < thread_store.threads_size; s++) {
2129  Thread *t = &thread_store.threads[s];
2130  if (t == NULL || t->in_use == 0)
2131  continue;
2132 
2133  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2134  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2135  if (t->tv) {
2136  ThreadVars *tv = t->tv;
2137  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2138  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2139  tv, tv->type, tv->name, tv->tmm_flags, flags);
2140  }
2141  }
2142  SCMutexUnlock(&thread_store_lock);
2143 }
2144 
2145 #define STEP 32
2146 /**
2147  * \retval id thread id, or 0 if not found
2148  */
2150 {
2151  SCMutexLock(&thread_store_lock);
2152  if (thread_store.threads == NULL) {
2153  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2154  BUG_ON(thread_store.threads == NULL);
2155  thread_store.threads_size = STEP;
2156  }
2157 
2158  size_t s;
2159  for (s = 0; s < thread_store.threads_size; s++) {
2160  if (thread_store.threads[s].in_use == 0) {
2161  Thread *t = &thread_store.threads[s];
2162  t->name = tv->name;
2163  t->type = type;
2164  t->tv = tv;
2165  t->in_use = 1;
2166 
2167  SCMutexUnlock(&thread_store_lock);
2168  return (int)(s+1);
2169  }
2170  }
2171 
2172  /* if we get here the array is completely filled */
2173  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2174  BUG_ON(newmem == NULL);
2175  thread_store.threads = newmem;
2176  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2177 
2178  Thread *t = &thread_store.threads[thread_store.threads_size];
2179  t->name = tv->name;
2180  t->type = type;
2181  t->tv = tv;
2182  t->in_use = 1;
2183 
2184  s = thread_store.threads_size;
2185  thread_store.threads_size += STEP;
2186 
2187  SCMutexUnlock(&thread_store_lock);
2188  return (int)(s+1);
2189 }
2190 #undef STEP
2191 
2192 void TmThreadsUnregisterThread(const int id)
2193 {
2194  SCMutexLock(&thread_store_lock);
2195  if (id <= 0 || id > (int)thread_store.threads_size) {
2196  SCMutexUnlock(&thread_store_lock);
2197  return;
2198  }
2199 
2200  /* id is one higher than index */
2201  int idx = id - 1;
2202 
2203  /* reset thread_id, which serves as clearing the record */
2204  thread_store.threads[idx].in_use = 0;
2205 
2206  /* check if we have at least one registered thread left */
2207  size_t s;
2208  for (s = 0; s < thread_store.threads_size; s++) {
2209  Thread *t = &thread_store.threads[s];
2210  if (t->in_use == 1) {
2211  goto end;
2212  }
2213  }
2214 
2215  /* if we get here no threads are registered */
2216  SCFree(thread_store.threads);
2217  thread_store.threads = NULL;
2218  thread_store.threads_size = 0;
2219  thread_store.threads_cnt = 0;
2220 
2221 end:
2222  SCMutexUnlock(&thread_store_lock);
2223 }
2224 
2225 void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
2226 {
2227  SCMutexLock(&thread_store_lock);
2228  if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2229  SCMutexUnlock(&thread_store_lock);
2230  return;
2231  }
2232 
2233  int idx = id - 1;
2234  Thread *t = &thread_store.threads[idx];
2235  t->pktts = *ts;
2236  struct timeval systs;
2237  gettimeofday(&systs, NULL);
2238  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2239  SCMutexUnlock(&thread_store_lock);
2240 }
2241 
2243 {
2244  bool ready = true;
2245  SCMutexLock(&thread_store_lock);
2246  for (size_t s = 0; s < thread_store.threads_size; s++) {
2247  Thread *t = &thread_store.threads[s];
2248  if (!t->in_use)
2249  break;
2250  if (t->sys_sec_stamp == 0) {
2251  ready = false;
2252  break;
2253  }
2254  }
2255  SCMutexUnlock(&thread_store_lock);
2256  return ready;
2257 }
2258 
2259 void TmThreadsInitThreadsTimestamp(const struct timeval *ts)
2260 {
2261  struct timeval systs;
2262  gettimeofday(&systs, NULL);
2263  SCMutexLock(&thread_store_lock);
2264  for (size_t s = 0; s < thread_store.threads_size; s++) {
2265  Thread *t = &thread_store.threads[s];
2266  if (!t->in_use)
2267  break;
2268  t->pktts = *ts;
2269  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2270  }
2271  SCMutexUnlock(&thread_store_lock);
2272 }
2273 
2274 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2275 {
2276  struct timeval local, nullts;
2277  memset(&local, 0, sizeof(local));
2278  memset(&nullts, 0, sizeof(nullts));
2279  int set = 0;
2280  size_t s;
2281  struct timeval systs;
2282  gettimeofday(&systs, NULL);
2283 
2284  SCMutexLock(&thread_store_lock);
2285  for (s = 0; s < thread_store.threads_size; s++) {
2286  Thread *t = &thread_store.threads[s];
2287  if (t->in_use == 0)
2288  break;
2289  if (!(timercmp(&t->pktts, &nullts, ==))) {
2290  /* ignore sleeping threads */
2291  if (t->sys_sec_stamp + 1 < (uint32_t)systs.tv_sec)
2292  continue;
2293 
2294  if (!set) {
2295  local = t->pktts;
2296  set = 1;
2297  } else {
2298  if (timercmp(&t->pktts, &local, <)) {
2299  local = t->pktts;
2300  }
2301  }
2302  }
2303  }
2304  SCMutexUnlock(&thread_store_lock);
2305  *ts = local;
2306  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2307 }
2308 
2310 {
2311  uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
2312  int thread_max = TmThreadGetNbThreads(WORKER_CPU_SET);
2313  /* always create at least one thread */
2314  if (thread_max == 0)
2315  thread_max = ncpus * threading_detect_ratio;
2316  if (thread_max < 1)
2317  thread_max = 1;
2318  if (thread_max > 1024) {
2319  SCLogWarning(SC_ERR_RUNMODE, "limited number of 'worker' threads to 1024. Wanted %d", thread_max);
2320  thread_max = 1024;
2321  }
2322  return thread_max;
2323 }
2324 
2325 static inline void ThreadBreakLoop(ThreadVars *tv)
2326 {
2327  if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
2328  return;
2329  }
2330  /* find the correct slot */
2331  TmSlot *s = tv->tm_slots;
2332  TmModule *tm = TmModuleGetById(s->tm_id);
2333  if (tm->flags & TM_FLAG_RECEIVE_TM) {
2334  /* if the method supports it, BreakLoop. Otherwise we rely on
2335  * the capture method's recv timeout */
2336  if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
2337  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
2338  }
2339  }
2340 }
2341 
2342 /**
2343  * \retval r 1 if packet was accepted, 0 otherwise
2344  * \note if packet was not accepted, it's still the responsibility
2345  * of the caller.
2346  */
2347 int TmThreadsInjectPacketsById(Packet **packets, const int id)
2348 {
2349  if (id <= 0 || id > (int)thread_store.threads_size)
2350  return 0;
2351 
2352  int idx = id - 1;
2353 
2354  Thread *t = &thread_store.threads[idx];
2355  ThreadVars *tv = t->tv;
2356 
2357  if (tv == NULL || tv->stream_pq == NULL)
2358  return 0;
2359 
2361  while (*packets != NULL) {
2362  PacketEnqueue(tv->stream_pq, *packets);
2363  packets++;
2364  }
2366 
2367  /* wake up listening thread(s) if necessary */
2368  if (tv->inq != NULL) {
2369  SCCondSignal(&tv->inq->pq->cond_q);
2370  } else if (tv->break_loop) {
2371  ThreadBreakLoop(tv);
2372  }
2373  return 1;
2374 }
2375 
2376 /** \brief inject a flow into a threads flow queue
2377  */
2378 void TmThreadsInjectFlowById(Flow *f, const int id)
2379 {
2380  BUG_ON(id <= 0 || id > (int)thread_store.threads_size);
2381 
2382  int idx = id - 1;
2383 
2384  Thread *t = &thread_store.threads[idx];
2385  ThreadVars *tv = t->tv;
2386 
2387  BUG_ON(tv == NULL || tv->flow_queue == NULL);
2388 
2389  FlowEnqueue(tv->flow_queue, f);
2390 
2391  /* wake up listening thread(s) if necessary */
2392  if (tv->inq != NULL) {
2393  SCCondSignal(&tv->inq->pq->cond_q);
2394  } else if (tv->break_loop) {
2395  ThreadBreakLoop(tv);
2396  }
2397 }
thread_name_workers
const char * thread_name_workers
Definition: runmodes.c:65
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:849
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:67
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:75
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:72
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:133
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2242
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1753
tm-threads.h
spin_lock_cnt
thread_local uint64_t spin_lock_cnt
ThreadsAffinityType_::nb_threads
uint32_t nb_threads
Definition: util-affinity.h:68
len
uint8_t len
Definition: app-layer-dnp3.h:2
ts
uint64_t ts
Definition: source-erf-file.c:54
THV_USE
#define THV_USE
Definition: threadvars.h:37
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:113
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1720
Threads
struct Threads_ Threads
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1137
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:890
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
thread_name_flow_mgr
const char * thread_name_flow_mgr
Definition: runmodes.c:67
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
wrapper for initializing an atomic variable.
Definition: util-atomic.h:315
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:55
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:60
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1079
Thread_::pktts
struct timeval pktts
Definition: tm-threads.c:2110
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:387
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:379
TmThreadSetGroupName
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1677
TmThreadsGetCallingThread
ThreadVars * TmThreadsGetCallingThread(void)
Returns the TV for the calling thread.
Definition: tm-threads.c:2027
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:105
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:49
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:298
rwr_lock_cnt
thread_local uint64_t rwr_lock_cnt
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:97
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1819
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:173
TmThreadsGetTVContainingSlot
ThreadVars * TmThreadsGetTVContainingSlot(TmSlot *tm_slot)
Definition: tm-threads.c:628
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:47
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:55
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:46
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:82
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:61
Packet_::flags
uint32_t flags
Definition: decode.h:449
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:74
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
Flow_
Flow data structure.
Definition: flow.h:353
ThreadVars_::t
pthread_t t
Definition: threadvars.h:59
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:303
thread_name_flow_rec
const char * thread_name_flow_rec
Definition: runmodes.c:68
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
AffinityGetNextCPU
int AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
Definition: util-affinity.c:297
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2049
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:104
thread_name_autofp
const char * thread_name_autofp
Definition: runmodes.c:63
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const struct timeval *ts)
Definition: tm-threads.c:2259
StatsSetupPrivate
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1197
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:115
MIN
#define MIN(x, y)
Definition: suricata-common.h:368
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:78
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:74
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1386
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:387
SC_ERR_THREAD_CREATE
@ SC_ERR_THREAD_CREATE
Definition: util-error.h:78
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:121
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:802
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:37
m
SCMutex m
Definition: flow-hash.h:6
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:43
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:140
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:375
SC_ERR_SHUTDOWN
@ SC_ERR_SHUTDOWN
Definition: util-error.h:220
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:81
rww_lock_contention
thread_local uint64_t rww_lock_contention
SC_ERR_RUNMODE
@ SC_ERR_RUNMODE
Definition: util-error.h:219
Threads_::threads
Thread * threads
Definition: tm-threads.c:2117
FlowEnqueue
void FlowEnqueue(FlowQueue *q, Flow *f)
add a flow to a queue
Definition: flow-queue.c:174
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2145
Thread_::in_use
int in_use
Definition: tm-threads.c:2108
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:39
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(void)
Disable all packet threads.
Definition: tm-threads.c:1514
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:302
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:80
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
TmThreadsInjectFlowById
void TmThreadsInjectFlowById(Flow *f, const int id)
inject a flow into a threads flow queue
Definition: tm-threads.c:2378
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:81
TmThreadsInjectPacketsById
int TmThreadsInjectPacketsById(Packet **packets, const int id)
Definition: tm-threads.c:2347
rwr_lock_contention
thread_local uint64_t rwr_lock_contention
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2274
FQLOCK_LOCK
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:73
TmThreadContinueThreads
void TmThreadContinueThreads()
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1843
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:87
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:54
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:53
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:90
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:47
spin_lock_wait_ticks
thread_local uint64_t spin_lock_wait_ticks
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1204
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:105
util-signal.h
SC_ERR_THREAD_INIT
@ SC_ERR_THREAD_INIT
Definition: util-error.h:79
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:79
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:813
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1592
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:2105
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1833
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:56
type
uint8_t type
Definition: decode-icmpv4.h:0
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:1913
Threads_::threads_cnt
int threads_cnt
Definition: tm-threads.c:2119
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
strlcat
size_t strlcat(char *, const char *src, size_t siz)
Definition: util-strlcatu.c:45
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:76
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:96
TmSlotGetSlotForTM
TmSlot * TmSlotGetSlotForTM(int tm_id)
Returns the slot holding a TM with the particular tm_id.
Definition: tm-threads.c:710
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:91
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:206
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:57
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
TmThreadPauseThreads
void TmThreadPauseThreads()
Pauses all threads present in tv_root.
Definition: tm-threads.c:1871
ThreadVars_::perf_public_ctx
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:126
PKT_PSEUDO_STREAM_END
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1154
TmThreadDumpThreads
void TmThreadDumpThreads(void)
Definition: tm-threads.c:2075
SCEnter
#define SCEnter(...)
Definition: util-debug.h:300
rww_lock_cnt
thread_local uint64_t rww_lock_cnt
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
mutex_lock_contention
thread_local uint64_t mutex_lock_contention
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1801
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
spin_lock_contention
thread_local uint64_t spin_lock_contention
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1695
THV_KILL
#define THV_KILL
Definition: threadvars.h:41
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:949
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:701
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2192
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2149
threading_set_cpu_affinity
int threading_set_cpu_affinity
Definition: runmodes.c:60
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
SC_ERR_INVALID_ARGUMENT
@ SC_ERR_INVALID_ARGUMENT
Definition: util-error.h:43
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:123
ThreadVars_::id
int id
Definition: threadvars.h:87
ThreadVars_::type
uint8_t type
Definition: threadvars.h:72
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:277
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:109
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:81
SCReturn
#define SCReturn
Definition: util-debug.h:302
stream.h
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1623
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:158
Packet_
Definition: decode.h:414
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:131
ThreadVars_::thread_group_name
char * thread_group_name
Definition: threadvars.h:67
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:90
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1593
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:73
TmSlot_
Definition: tm-threads.h:52
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:66
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:69
TmEcode
TmEcode
Definition: tm-threads-common.h:79
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:90
rww_lock_wait_ticks
thread_local uint64_t rww_lock_wait_ticks
queue.h
TmModule_::name
const char * name
Definition: tm-modules.h:44
runmodes.h
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:55
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1109
rwr_lock_wait_ticks
thread_local uint64_t rwr_lock_wait_ticks
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:31
TmThreadPause
void TmThreadPause(ThreadVars *tv)
Pauses a thread.
Definition: tm-threads.c:1862
TmModule_
Definition: tm-modules.h:43
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:257
SCRealloc
#define SCRealloc(ptr, sz)
Definition: util-mem.h:50
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:114
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:40
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:38
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:658
StatsPublicThreadContext_::m
SCMutex m
Definition: counters.h:72
TmThreadGetFirstTmSlotForPartialPattern
TmSlot * TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name)
Definition: tm-threads.c:1561
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1594
Thread_::sys_sec_stamp
uint32_t sys_sec_stamp
Definition: tm-threads.c:2112
Thread_::type
int type
Definition: tm-threads.c:2107
Thread
struct Thread_ Thread
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:90
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1196
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:75
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:44
flags
uint8_t flags
Definition: decode-gre.h:0
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:55
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:858
Threads_
Definition: tm-threads.c:2116
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:33
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:35
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:106
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:42
ThreadVars_::break_loop
bool break_loop
Definition: threadvars.h:134
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:101
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:874
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:224
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:50
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:89
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:65
SCLogError
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:257
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
Definition: tm-threads.c:2225
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:70
SCStrdup
#define SCStrdup(s)
Definition: util-mem.h:56
FatalError
#define FatalError(x,...)
Definition: util-debug.h:532
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:92
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:66
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1170
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:29
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:141
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:212
threadvars.h
PacketGetFromAlloc
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:150
ThreadsAffinityType_
Definition: util-affinity.h:64
mutex_lock_wait_ticks
thread_local uint64_t mutex_lock_wait_ticks
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:35
THV_KILL_PKTACQ
#define THV_KILL_PKTACQ
Definition: threadvars.h:48
SCMalloc
#define SCMalloc(sz)
Definition: util-mem.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:581
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:88
Thread_::name
const char * name
Definition: tm-threads.c:2106
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:150
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:48
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:91
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCLogWarning
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:244
PKT_SRC_DETECT_RELOAD_FLUSH
@ PKT_SRC_DETECT_RELOAD_FLUSH
Definition: decode.h:60
SCFree
#define SCFree(p)
Definition: util-mem.h:61
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:67
SC_ERR_FATAL
@ SC_ERR_FATAL
Definition: util-error.h:203
thread_name_verdict
const char * thread_name_verdict
Definition: runmodes.c:66
SC_ERR_THREAD_NICE_PRIO
@ SC_ERR_THREAD_NICE_PRIO
Definition: util-error.h:77
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:66
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:318
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:84
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:110
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
SC_ERR_MEM_ALLOC
@ SC_ERR_MEM_ALLOC
Definition: util-error.h:31
FlowQueueNew
FlowQueue * FlowQueueNew()
Definition: flow-queue.c:36
suricata.h
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:54
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:400
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:139
Tmq_
Definition: tm-queues.h:29
StatsSyncCounters
#define StatsSyncCounters(tv)
Definition: counters.h:134
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:2125
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:61
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1889
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:130
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:83
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:67
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:376
UtilCpuGetNumProcessorsOnline
uint16_t UtilCpuGetNumProcessorsOnline(void)
Get the number of cpus online in the system.
Definition: util-cpu.c:106
TmThreadsGetWorkerThreadMax
uint16_t TmThreadsGetWorkerThreadMax()
Definition: tm-threads.c:2309
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:335
mutex_lock_cnt
thread_local uint64_t mutex_lock_cnt
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:89
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:232
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:43
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
Thread_
Definition: tm-threads.c:2104
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
Threads_::threads_size
size_t threads_size
Definition: tm-threads.c:2118
StatsThreadCleanup
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1296
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:63
SCMutex
#define SCMutex
Definition: threads-debug.h:114
PacketGetFromQueueOrAlloc
Packet * PacketGetFromQueueOrAlloc(void)
Get a packet. We try to get a packet from the packetpool first, but if that is empty we alloc a packe...
Definition: decode.c:186
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:70
threading_detect_ratio
float threading_detect_ratio
Definition: runmodes.c:913
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value to our atomic variable.
Definition: util-atomic.h:360
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:196
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:351
WORKER_CPU_SET
@ WORKER_CPU_SET
Definition: util-affinity.h:52
FQLOCK_UNLOCK
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:75