suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2017 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-debug.h"
39 #include "util-privs.h"
40 #include "util-cpu.h"
41 #include "util-optimize.h"
42 #include "util-profiling.h"
43 #include "util-signal.h"
44 #include "queue.h"
45 
46 #ifdef PROFILE_LOCKING
47 __thread uint64_t mutex_lock_contention;
48 __thread uint64_t mutex_lock_wait_ticks;
49 __thread uint64_t mutex_lock_cnt;
50 
51 __thread uint64_t spin_lock_contention;
52 __thread uint64_t spin_lock_wait_ticks;
53 __thread uint64_t spin_lock_cnt;
54 
55 __thread uint64_t rww_lock_contention;
56 __thread uint64_t rww_lock_wait_ticks;
57 __thread uint64_t rww_lock_cnt;
58 
59 __thread uint64_t rwr_lock_contention;
60 __thread uint64_t rwr_lock_wait_ticks;
61 __thread uint64_t rwr_lock_cnt;
62 #endif
63 
64 #ifdef OS_FREEBSD
65 #include <sched.h>
66 #include <sys/param.h>
67 #include <sys/resource.h>
68 #include <sys/cpuset.h>
69 #include <sys/thr.h>
70 #define cpu_set_t cpuset_t
71 #endif /* OS_FREEBSD */
72 
73 /* prototypes */
74 static int SetCPUAffinity(uint16_t cpu);
75 static void TmThreadDeinitMC(ThreadVars *tv);
76 
77 /* root of the threadvars list */
78 ThreadVars *tv_root[TVT_MAX] = { NULL };
79 
80 /* lock to protect tv_root */
82 
83 /**
84  * \brief Check if a thread flag is set.
85  *
86  * \retval 1 flag is set.
87  * \retval 0 flag is not set.
88  */
89 int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
90 {
91  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
92 }
93 
94 /**
95  * \brief Set a thread flag.
96  */
97 void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
98 {
99  SC_ATOMIC_OR(tv->flags, flag);
100 }
101 
102 /**
103  * \brief Unset a thread flag.
104  */
105 void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
106 {
107  SC_ATOMIC_AND(tv->flags, ~flag);
108 }
109 
110 /**
111  * \brief Separate run function so we can call it recursively.
112  */
114 {
115  for (TmSlot *s = slot; s != NULL; s = s->slot_next) {
116  PACKET_PROFILING_TMM_START(p, s->tm_id);
117  TmEcode r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data));
118  PACKET_PROFILING_TMM_END(p, s->tm_id);
119 
120  /* handle error */
121  if (unlikely(r == TM_ECODE_FAILED)) {
122  /* Encountered error. Return packets to packetpool and return */
123  TmThreadsSlotProcessPktFail(tv, s, NULL);
124  return TM_ECODE_FAILED;
125  }
126 
127  /* handle new packets */
128  while (tv->decode_pq.top != NULL) {
129  Packet *extra_p = PacketDequeueNoLock(&tv->decode_pq);
130  if (unlikely(extra_p == NULL))
131  continue;
132 
133  /* see if we need to process the packet */
134  if (s->slot_next != NULL) {
135  r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
136  if (unlikely(r == TM_ECODE_FAILED)) {
137  TmThreadsSlotProcessPktFail(tv, s, extra_p);
138  return TM_ECODE_FAILED;
139  }
140  }
141  tv->tmqh_out(tv, extra_p);
142  }
143  }
144 
145  return TM_ECODE_OK;
146 }
147 
148 /** \internal
149  *
150  * \brief Process flow timeout packets
151  *
152  * Process flow timeout pseudo packets. During shutdown this loop
153  * is run until the flow engine kills the thread and the queue is
154  * empty.
155  */
156 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
157 {
158  TmSlot *fw_slot = tv->tm_flowworker;
159  int r = TM_ECODE_OK;
160 
161  if (tv->stream_pq == NULL || fw_slot == NULL) {
162  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
163  return r;
164  }
165 
166  SCLogDebug("flow end loop starting");
167  while (1) {
169  uint32_t len = tv->stream_pq->len;
171  if (len > 0) {
172  while (len--) {
176  if (likely(p)) {
177  if ((r = TmThreadsSlotProcessPkt(tv, fw_slot, p) != TM_ECODE_OK)) {
178  if (r == TM_ECODE_FAILED)
179  break;
180  }
181  }
182  }
183  } else {
185  break;
186  }
187  SleepUsec(1);
188  }
189  }
190  SCLogDebug("flow end loop complete");
192 
193  return r;
194 }
195 
196 /*
197 
198  pcap/nfq
199 
200  pkt read
201  callback
202  process_pkt
203 
204  pfring
205 
206  pkt read
207  process_pkt
208 
209  slot:
210  setup
211 
212  pkt_ack_loop(tv, slot_data)
213 
214  deinit
215 
216  process_pkt:
217  while(s)
218  run s;
219  queue;
220 
221  */
222 
223 static void *TmThreadsSlotPktAcqLoop(void *td)
224 {
225  ThreadVars *tv = (ThreadVars *)td;
226  TmSlot *s = tv->tm_slots;
227  char run = 1;
228  TmEcode r = TM_ECODE_OK;
229  TmSlot *slot = NULL;
230 
231  /* Set the thread name */
232  if (SCSetThreadName(tv->name) < 0) {
233  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
234  }
235 
236  if (tv->thread_setup_flags != 0)
238 
239  /* Drop the capabilities for this thread */
240  SCDropCaps(tv);
241 
242  PacketPoolInit();
243 
244  /* check if we are setup properly */
245  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
246  SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
247  " PktAcqLoop=%p, tmqh_in=%p,"
248  " tmqh_out=%p",
249  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
251  pthread_exit((void *) -1);
252  return NULL;
253  }
254 
255  for (slot = s; slot != NULL; slot = slot->slot_next) {
256  if (slot->SlotThreadInit != NULL) {
257  void *slot_data = NULL;
258  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
259  if (r != TM_ECODE_OK) {
260  if (r == TM_ECODE_DONE) {
261  EngineDone();
263  goto error;
264  } else {
266  goto error;
267  }
268  }
269  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
270  }
271 
272  /* if the flowworker module is the first, get the threads input queue */
273  if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
274  tv->stream_pq = tv->inq->pq;
275  tv->tm_flowworker = slot;
276  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
277  /* setup a queue */
278  } else if (slot->tm_id == TMM_FLOWWORKER) {
279  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
280  if (tv->stream_pq_local == NULL)
281  FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
284  tv->tm_flowworker = slot;
285  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
286  }
287  }
288 
290 
292 
293  while(run) {
298  }
299 
300  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
301 
302  if (r == TM_ECODE_FAILED) {
304  run = 0;
305  }
307  run = 0;
308  }
309  if (r == TM_ECODE_DONE) {
310  run = 0;
311  }
312  }
314 
316 
317  /* process all pseudo packets the flow timeout may throw at us */
318  TmThreadTimeoutLoop(tv, s);
319 
322 
324 
325  for (slot = s; slot != NULL; slot = slot->slot_next) {
326  if (slot->SlotThreadExitPrintStats != NULL) {
327  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
328  }
329 
330  if (slot->SlotThreadDeinit != NULL) {
331  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
332  if (r != TM_ECODE_OK) {
334  goto error;
335  }
336  }
337  }
338 
339  tv->stream_pq = NULL;
340  SCLogDebug("%s ending", tv->name);
342  pthread_exit((void *) 0);
343  return NULL;
344 
345 error:
346  tv->stream_pq = NULL;
347  pthread_exit((void *) -1);
348  return NULL;
349 }
350 
351 static void *TmThreadsSlotVar(void *td)
352 {
353  ThreadVars *tv = (ThreadVars *)td;
354  TmSlot *s = (TmSlot *)tv->tm_slots;
355  Packet *p = NULL;
356  char run = 1;
357  TmEcode r = TM_ECODE_OK;
358 
360 
361  /* Set the thread name */
362  if (SCSetThreadName(tv->name) < 0) {
363  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
364  }
365 
366  if (tv->thread_setup_flags != 0)
368 
369  /* Drop the capabilities for this thread */
370  SCDropCaps(tv);
371 
372  /* check if we are setup properly */
373  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
375  pthread_exit((void *) -1);
376  return NULL;
377  }
378 
379  for (; s != NULL; s = s->slot_next) {
380  if (s->SlotThreadInit != NULL) {
381  void *slot_data = NULL;
382  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
383  if (r != TM_ECODE_OK) {
385  goto error;
386  }
387  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
388  }
389 
390  /* special case: we need to access the stream queue
391  * from the flow timeout code */
392 
393  /* if the flowworker module is the first, get the threads input queue */
394  if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
395  tv->stream_pq = tv->inq->pq;
396  tv->tm_flowworker = s;
397  SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
398  /* setup a queue */
399  } else if (s->tm_id == TMM_FLOWWORKER) {
400  tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
401  if (tv->stream_pq_local == NULL)
402  FatalError(SC_ERR_MEM_ALLOC, "failed to alloc PacketQueue");
405  tv->tm_flowworker = s;
406  SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
407  }
408  }
409 
411 
413 
414  s = (TmSlot *)tv->tm_slots;
415 
416  while (run) {
421  }
422 
423  /* input a packet */
424  p = tv->tmqh_in(tv);
425 
426  if (p != NULL) {
427  /* run the thread module(s) */
428  r = TmThreadsSlotVarRun(tv, p, s);
429  if (r == TM_ECODE_FAILED) {
432  break;
433  }
434 
435  /* output the packet */
436  tv->tmqh_out(tv, p);
437 
438  /* now handle the stream pq packets */
439  TmThreadsHandleInjectedPackets(tv);
440  }
441 
443  run = 0;
444  }
445  } /* while (run) */
447 
450 
452 
453  s = (TmSlot *)tv->tm_slots;
454 
455  for ( ; s != NULL; s = s->slot_next) {
456  if (s->SlotThreadExitPrintStats != NULL) {
457  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
458  }
459 
460  if (s->SlotThreadDeinit != NULL) {
461  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
462  if (r != TM_ECODE_OK) {
464  goto error;
465  }
466  }
467  }
468 
469  SCLogDebug("%s ending", tv->name);
470  tv->stream_pq = NULL;
472  pthread_exit((void *) 0);
473  return NULL;
474 
475 error:
476  tv->stream_pq = NULL;
477  pthread_exit((void *) -1);
478  return NULL;
479 }
480 
481 static void *TmThreadsManagement(void *td)
482 {
483  ThreadVars *tv = (ThreadVars *)td;
484  TmSlot *s = (TmSlot *)tv->tm_slots;
485  TmEcode r = TM_ECODE_OK;
486 
487  BUG_ON(s == NULL);
488 
489  /* Set the thread name */
490  if (SCSetThreadName(tv->name) < 0) {
491  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
492  }
493 
494  if (tv->thread_setup_flags != 0)
496 
497  /* Drop the capabilities for this thread */
498  SCDropCaps(tv);
499 
500  SCLogDebug("%s starting", tv->name);
501 
502  if (s->SlotThreadInit != NULL) {
503  void *slot_data = NULL;
504  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
505  if (r != TM_ECODE_OK) {
507  pthread_exit((void *) -1);
508  return NULL;
509  }
510  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
511  }
512 
514 
516 
517  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
518  /* handle error */
519  if (r == TM_ECODE_FAILED) {
521  }
522 
525  }
526 
529 
530  if (s->SlotThreadExitPrintStats != NULL) {
531  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
532  }
533 
534  if (s->SlotThreadDeinit != NULL) {
535  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
536  if (r != TM_ECODE_OK) {
538  pthread_exit((void *) -1);
539  return NULL;
540  }
541  }
542 
544  pthread_exit((void *) 0);
545  return NULL;
546 }
547 
548 /**
549  * \brief We set the slot functions.
550  *
551  * \param tv Pointer to the TV to set the slot function for.
552  * \param name Name of the slot variant.
553  * \param fn_p Pointer to a custom slot function. Used only if slot variant
554  * "name" is "custom".
555  *
556  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
557  */
558 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
559 {
560  if (name == NULL) {
561  if (fn_p == NULL) {
562  printf("Both slot name and function pointer can't be NULL inside "
563  "TmThreadSetSlots\n");
564  goto error;
565  } else {
566  name = "custom";
567  }
568  }
569 
570  if (strcmp(name, "varslot") == 0) {
571  tv->tm_func = TmThreadsSlotVar;
572  } else if (strcmp(name, "pktacqloop") == 0) {
573  tv->tm_func = TmThreadsSlotPktAcqLoop;
574  } else if (strcmp(name, "management") == 0) {
575  tv->tm_func = TmThreadsManagement;
576  } else if (strcmp(name, "command") == 0) {
577  tv->tm_func = TmThreadsManagement;
578  } else if (strcmp(name, "custom") == 0) {
579  if (fn_p == NULL)
580  goto error;
581  tv->tm_func = fn_p;
582  } else {
583  printf("Error: Slot \"%s\" not supported\n", name);
584  goto error;
585  }
586 
587  return TM_ECODE_OK;
588 
589 error:
590  return TM_ECODE_FAILED;
591 }
592 
594 {
596  for (int i = 0; i < TVT_MAX; i++) {
597  ThreadVars *tv = tv_root[i];
598  while (tv) {
599  TmSlot *slots = tv->tm_slots;
600  while (slots != NULL) {
601  if (slots == tm_slot) {
603  return tv;
604  }
605  slots = slots->slot_next;
606  }
607  tv = tv->next;
608  }
609  }
611  return NULL;
612 }
613 
614 /**
615  * \brief Appends a new entry to the slots.
616  *
617  * \param tv TV the slot is attached to.
618  * \param tm TM to append.
619  * \param data Data to be passed on to the slot init function.
620  *
621  * \retval The allocated TmSlot or NULL if there is an error
622  */
623 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
624 {
625  TmSlot *slot = SCMalloc(sizeof(TmSlot));
626  if (unlikely(slot == NULL))
627  return;
628  memset(slot, 0, sizeof(TmSlot));
629  SC_ATOMIC_INIT(slot->slot_data);
630  slot->SlotThreadInit = tm->ThreadInit;
631  slot->slot_initdata = data;
632  if (tm->Func) {
633  slot->SlotFunc = tm->Func;
634  } else if (tm->PktAcqLoop) {
635  slot->PktAcqLoop = tm->PktAcqLoop;
636  } else if (tm->Management) {
637  slot->Management = tm->Management;
638  }
640  slot->SlotThreadDeinit = tm->ThreadDeinit;
641  /* we don't have to check for the return value "-1". We wouldn't have
642  * received a TM as arg, if it didn't exist */
643  slot->tm_id = TmModuleGetIDForTM(tm);
644 
645  tv->tmm_flags |= tm->flags;
646  tv->cap_flags |= tm->cap_flags;
647 
648  if (tv->tm_slots == NULL) {
649  tv->tm_slots = slot;
650  } else {
651  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
652 
653  /* get the last slot */
654  for ( ; a != NULL; a = a->slot_next) {
655  b = a;
656  }
657  /* append the new slot */
658  if (b != NULL) {
659  b->slot_next = slot;
660  }
661  }
662  return;
663 }
664 
665 /**
666  * \brief Returns the slot holding a TM with the particular tm_id.
667  *
668  * \param tm_id TM id of the TM whose slot has to be returned.
669  *
670  * \retval slots Pointer to the slot.
671  */
673 {
675  for (int i = 0; i < TVT_MAX; i++) {
676  ThreadVars *tv = tv_root[i];
677  while (tv) {
678  TmSlot *slots = tv->tm_slots;
679  while (slots != NULL) {
680  if (slots->tm_id == tm_id) {
682  return slots;
683  }
684  slots = slots->slot_next;
685  }
686  tv = tv->next;
687  }
688  }
690  return NULL;
691 }
692 
693 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
694 static int SetCPUAffinitySet(cpu_set_t *cs)
695 {
696 #if defined OS_FREEBSD
697  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
698  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
699 #elif OS_DARWIN
700  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
701  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
702 #else
703  pid_t tid = syscall(SYS_gettid);
704  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
705 #endif /* OS_FREEBSD */
706 
707  if (r != 0) {
708  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
709  strerror(errno));
710  return -1;
711  }
712 
713  return 0;
714 }
715 #endif
716 
717 
718 /**
719  * \brief Set the thread affinity on the calling thread.
720  *
721  * \param cpuid Id of the core/cpu to setup the affinity.
722  *
723  * \retval 0 If all goes well; -1 if something is wrong.
724  */
725 static int SetCPUAffinity(uint16_t cpuid)
726 {
727 #if defined __OpenBSD__ || defined sun
728  return 0;
729 #else
730  int cpu = (int)cpuid;
731 
732 #if defined OS_WIN32 || defined __CYGWIN__
733  DWORD cs = 1 << cpu;
734 
735  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
736  if (r != 0) {
737  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
738  strerror(errno));
739  return -1;
740  }
741  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
742  SCGetThreadIdLong(), cpu);
743 
744  return 0;
745 
746 #else
747  cpu_set_t cs;
748 
749  CPU_ZERO(&cs);
750  CPU_SET(cpu, &cs);
751  return SetCPUAffinitySet(&cs);
752 #endif /* windows */
753 #endif /* not supported */
754 }
755 
756 
757 /**
758  * \brief Set the thread options (thread priority).
759  *
760  * \param tv Pointer to the ThreadVars to setup the thread priority.
761  *
762  * \retval TM_ECODE_OK.
763  */
765 {
767  tv->thread_priority = prio;
768 
769  return TM_ECODE_OK;
770 }
771 
772 /**
773  * \brief Adjusting nice value for threads.
774  */
776 {
777  SCEnter();
778 #ifndef __CYGWIN__
779 #ifdef OS_WIN32
780  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
781  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for "
782  "thread %s: %s", tv->name, strerror(errno));
783  } else {
784  SCLogDebug("Priority set to %"PRId32" for thread %s",
786  }
787 #else
788  int ret = nice(tv->thread_priority);
789  if (ret == -1) {
790  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value %d "
791  "for thread %s: %s", tv->thread_priority, tv->name,
792  strerror(errno));
793  } else {
794  SCLogDebug("Nice value set to %"PRId32" for thread %s",
796  }
797 #endif /* OS_WIN32 */
798 #endif
799  SCReturn;
800 }
801 
802 
803 /**
804  * \brief Set the thread options (cpu affinity).
805  *
806  * \param tv pointer to the ThreadVars to setup the affinity.
807  * \param cpu cpu on which affinity is set.
808  *
809  * \retval TM_ECODE_OK
810  */
812 {
814  tv->cpu_affinity = cpu;
815 
816  return TM_ECODE_OK;
817 }
818 
819 
821 {
823  return TM_ECODE_OK;
824 
825  if (type > MAX_CPU_SET) {
826  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
827  return TM_ECODE_FAILED;
828  }
829 
831  tv->cpu_affinity = type;
832 
833  return TM_ECODE_OK;
834 }
835 
837 {
838  if (type >= MAX_CPU_SET) {
839  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
840  return 0;
841  }
842 
844 }
845 
846 /**
847  * \brief Set the thread options (cpu affinitythread).
848  * Priority should be already set by pthread_create.
849  *
850  * \param tv pointer to the ThreadVars of the calling thread.
851  */
853 {
855  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
856  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
858  SetCPUAffinity(tv->cpu_affinity);
859  }
860 
861 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
866  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
867  int cpu = AffinityGetNextCPU(taf);
868  SetCPUAffinity(cpu);
869  /* If CPU is in a set overwrite the default thread prio */
870  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
872  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
874  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
876  } else {
877  tv->thread_priority = taf->prio;
878  }
879  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
880  "%d, thread id %lu", tv->thread_priority,
881  tv->name, cpu, SCGetThreadIdLong());
882  } else {
883  SetCPUAffinitySet(&taf->cpu_set);
884  tv->thread_priority = taf->prio;
885  SCLogPerf("Setting prio %d for thread \"%s\", "
886  "thread id %lu", tv->thread_priority,
888  }
890  }
891 #endif
892 
893  return TM_ECODE_OK;
894 }
895 
896 /**
897  * \brief Creates and returns the TV instance for a new thread.
898  *
899  * \param name Name of this TV instance
900  * \param inq_name Incoming queue name
901  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
902  * \param outq_name Outgoing queue name
903  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
904  * \param slots String representation for the slot function to be used
905  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
906  * \param mucond Flag to indicate whether to initialize the condition
907  * and the mutex variables for this newly created TV.
908  *
909  * \retval the newly created TV instance, or NULL on error
910  */
911 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
912  const char *outq_name, const char *outqh_name, const char *slots,
913  void * (*fn_p)(void *), int mucond)
914 {
915  ThreadVars *tv = NULL;
916  Tmq *tmq = NULL;
917  Tmqh *tmqh = NULL;
918 
919  SCLogDebug("creating thread \"%s\"...", name);
920 
921  /* XXX create separate function for this: allocate a thread container */
922  tv = SCMalloc(sizeof(ThreadVars));
923  if (unlikely(tv == NULL))
924  goto error;
925  memset(tv, 0, sizeof(ThreadVars));
926 
927  SC_ATOMIC_INIT(tv->flags);
928  SCMutexInit(&tv->perf_public_ctx.m, NULL);
929 
930  strlcpy(tv->name, name, sizeof(tv->name));
931 
932  /* default state for every newly created thread */
935 
936  /* set the incoming queue */
937  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
938  SCLogDebug("inq_name \"%s\"", inq_name);
939 
940  tmq = TmqGetQueueByName(inq_name);
941  if (tmq == NULL) {
942  tmq = TmqCreateQueue(inq_name);
943  if (tmq == NULL)
944  goto error;
945  }
946  SCLogDebug("tmq %p", tmq);
947 
948  tv->inq = tmq;
949  tv->inq->reader_cnt++;
950  SCLogDebug("tv->inq %p", tv->inq);
951  }
952  if (inqh_name != NULL) {
953  SCLogDebug("inqh_name \"%s\"", inqh_name);
954 
955  int id = TmqhNameToID(inqh_name);
956  if (id <= 0) {
957  goto error;
958  }
959  tmqh = TmqhGetQueueHandlerByName(inqh_name);
960  if (tmqh == NULL)
961  goto error;
962 
963  tv->tmqh_in = tmqh->InHandler;
964  tv->inq_id = (uint8_t)id;
965  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
966  }
967 
968  /* set the outgoing queue */
969  if (outqh_name != NULL) {
970  SCLogDebug("outqh_name \"%s\"", outqh_name);
971 
972  int id = TmqhNameToID(outqh_name);
973  if (id <= 0) {
974  goto error;
975  }
976 
977  tmqh = TmqhGetQueueHandlerByName(outqh_name);
978  if (tmqh == NULL)
979  goto error;
980 
981  tv->tmqh_out = tmqh->OutHandler;
982  tv->outq_id = (uint8_t)id;
983 
984  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
985  SCLogDebug("outq_name \"%s\"", outq_name);
986 
987  if (tmqh->OutHandlerCtxSetup != NULL) {
988  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
989  if (tv->outctx == NULL)
990  goto error;
991  tv->outq = NULL;
992  } else {
993  tmq = TmqGetQueueByName(outq_name);
994  if (tmq == NULL) {
995  tmq = TmqCreateQueue(outq_name);
996  if (tmq == NULL)
997  goto error;
998  }
999  SCLogDebug("tmq %p", tmq);
1000 
1001  tv->outq = tmq;
1002  tv->outctx = NULL;
1003  tv->outq->writer_cnt++;
1004  }
1005  }
1006  }
1007 
1008  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1009  goto error;
1010  }
1011 
1012  if (mucond != 0)
1013  TmThreadInitMC(tv);
1014 
1015  return tv;
1016 
1017 error:
1018  SCLogError(SC_ERR_THREAD_CREATE, "failed to setup a thread");
1019 
1020  if (tv != NULL)
1021  SCFree(tv);
1022  return NULL;
1023 }
1024 
1025 /**
1026  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1027  * This function doesn't support custom slots, and hence shouldn't be
1028  * supplied \"custom\" as its slot type. All PPT threads are created
1029  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1030  * conditional variables are not used to kill the thread.
1031  *
1032  * \param name Name of this TV instance
1033  * \param inq_name Incoming queue name
1034  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1035  * \param outq_name Outgoing queue name
1036  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1037  * \param slots String representation for the slot function to be used
1038  *
1039  * \retval the newly created TV instance, or NULL on error
1040  */
1041 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1042  const char *inqh_name, const char *outq_name,
1043  const char *outqh_name, const char *slots)
1044 {
1045  ThreadVars *tv = NULL;
1046 
1047  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1048  slots, NULL, 0);
1049 
1050  if (tv != NULL) {
1051  tv->type = TVT_PPT;
1053  }
1054 
1055 
1056  return tv;
1057 }
1058 
1059 /**
1060  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1061  * This function supports only custom slot functions and hence a
1062  * function pointer should be sent as an argument.
1063  *
1064  * \param name Name of this TV instance
1065  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1066  * \param mucond Flag to indicate whether to initialize the condition
1067  * and the mutex variables for this newly created TV.
1068  *
1069  * \retval the newly created TV instance, or NULL on error
1070  */
1071 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1072  int mucond)
1073 {
1074  ThreadVars *tv = NULL;
1075 
1076  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1077 
1078  if (tv != NULL) {
1079  tv->type = TVT_MGMT;
1082  }
1083 
1084  return tv;
1085 }
1086 
1087 /**
1088  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1089  * This function supports only custom slot functions and hence a
1090  * function pointer should be sent as an argument.
1091  *
1092  * \param name Name of this TV instance
1093  * \param module Name of TmModule with MANAGEMENT flag set.
1094  * \param mucond Flag to indicate whether to initialize the condition
1095  * and the mutex variables for this newly created TV.
1096  *
1097  * \retval the newly created TV instance, or NULL on error
1098  */
1099 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1100  int mucond)
1101 {
1102  ThreadVars *tv = NULL;
1103 
1104  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1105 
1106  if (tv != NULL) {
1107  tv->type = TVT_MGMT;
1110 
1111  TmModule *m = TmModuleGetByName(module);
1112  if (m) {
1113  TmSlotSetFuncAppend(tv, m, NULL);
1114  }
1115  }
1116 
1117  return tv;
1118 }
1119 
1120 /**
1121  * \brief Creates and returns the TV instance for a Command thread (CMD).
1122  * This function supports only custom slot functions and hence a
1123  * function pointer should be sent as an argument.
1124  *
1125  * \param name Name of this TV instance
1126  * \param module Name of TmModule with COMMAND flag set.
1127  * \param mucond Flag to indicate whether to initialize the condition
1128  * and the mutex variables for this newly created TV.
1129  *
1130  * \retval the newly created TV instance, or NULL on error
1131  */
1132 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1133  int mucond)
1134 {
1135  ThreadVars *tv = NULL;
1136 
1137  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1138 
1139  if (tv != NULL) {
1140  tv->type = TVT_CMD;
1143 
1144  TmModule *m = TmModuleGetByName(module);
1145  if (m) {
1146  TmSlotSetFuncAppend(tv, m, NULL);
1147  }
1148  }
1149 
1150  return tv;
1151 }
1152 
1153 /**
1154  * \brief Appends this TV to tv_root based on its type
1155  *
1156  * \param type holds the type this TV belongs to.
1157  */
1159 {
1161 
1162  if (tv_root[type] == NULL) {
1163  tv_root[type] = tv;
1164  tv->next = NULL;
1165 
1167 
1168  return;
1169  }
1170 
1171  ThreadVars *t = tv_root[type];
1172 
1173  while (t) {
1174  if (t->next == NULL) {
1175  t->next = tv;
1176  tv->next = NULL;
1177  break;
1178  }
1179 
1180  t = t->next;
1181  }
1182 
1184 
1185  return;
1186 }
1187 
1188 static bool ThreadStillHasPackets(ThreadVars *tv)
1189 {
1190  if (tv->inq != NULL && !tv->inq->is_packet_pool) {
1191  /* we wait till we dry out all the inq packets, before we
1192  * kill this thread. Do note that you should have disabled
1193  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1194  PacketQueue *q = tv->inq->pq;
1195  SCMutexLock(&q->mutex_q);
1196  uint32_t len = q->len;
1197  SCMutexUnlock(&q->mutex_q);
1198  if (len != 0) {
1199  return true;
1200  }
1201  }
1202 
1203  if (tv->stream_pq != NULL) {
1205  uint32_t len = tv->stream_pq->len;
1207 
1208  if (len != 0) {
1209  return true;
1210  }
1211  }
1212  return false;
1213 }
1214 
1215 /**
1216  * \brief Kill a thread.
1217  *
1218  * \param tv A ThreadVars instance corresponding to the thread that has to be
1219  * killed.
1220  *
1221  * \retval r 1 killed succesfully
1222  * 0 not yet ready, needs another look
1223  */
1224 static int TmThreadKillThread(ThreadVars *tv)
1225 {
1226  BUG_ON(tv == NULL);
1227 
1228  /* kill only once :) */
1229  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1230  return 1;
1231  }
1232 
1233  /* set the thread flag informing the thread that it needs to be
1234  * terminated */
1237 
1238  /* to be sure, signal more */
1239  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1240  if (tv->inq_id != TMQH_NOT_SET) {
1242  if (qh != NULL && qh->InShutdownHandler != NULL) {
1243  qh->InShutdownHandler(tv);
1244  }
1245  }
1246  if (tv->inq != NULL) {
1247  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1248  SCCondSignal(&tv->inq->pq->cond_q);
1249  }
1250  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1251  }
1252 
1253  if (tv->ctrl_cond != NULL ) {
1254  pthread_cond_broadcast(tv->ctrl_cond);
1255  }
1256  return 0;
1257  }
1258 
1259  if (tv->outctx != NULL) {
1260  if (tv->outq_id != TMQH_NOT_SET) {
1262  if (qh != NULL && qh->OutHandlerCtxFree != NULL) {
1263  qh->OutHandlerCtxFree(tv->outctx);
1264  tv->outctx = NULL;
1265  }
1266  }
1267  }
1268 
1269  /* join it and flag it as dead */
1270  pthread_join(tv->t, NULL);
1271  SCLogDebug("thread %s stopped", tv->name);
1273  return 1;
1274 }
1275 
1276 /** \internal
1277  *
1278  * \brief make sure that all packet threads are done processing their
1279  * in-flight packets, including 'injected' flow packets.
1280  */
1281 static void TmThreadDrainPacketThreads(void)
1282 {
1283  ThreadVars *tv = NULL;
1284  struct timeval start_ts;
1285  struct timeval cur_ts;
1286  gettimeofday(&start_ts, NULL);
1287 
1288 again:
1289  gettimeofday(&cur_ts, NULL);
1290  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1291  SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads "
1292  "to process their packets in time");
1293  return;
1294  }
1295 
1297 
1298  /* all receive threads are part of packet processing threads */
1299  tv = tv_root[TVT_PPT];
1300  while (tv) {
1301  if (ThreadStillHasPackets(tv)) {
1302  /* we wait till we dry out all the inq packets, before we
1303  * kill this thread. Do note that you should have disabled
1304  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1306 
1307  /* sleep outside lock */
1308  SleepMsec(1);
1309  goto again;
1310  }
1311  tv = tv->next;
1312  }
1313 
1315  return;
1316 }
1317 
1318 /**
1319  * \brief Disable all threads having the specified TMs.
1320  *
1321  * Breaks out of the packet acquisition loop, and bumps
1322  * into the 'flow loop', where it will process packets
1323  * from the flow engine's shutdown handling.
1324  */
1326 {
1327  ThreadVars *tv = NULL;
1328  struct timeval start_ts;
1329  struct timeval cur_ts;
1330  gettimeofday(&start_ts, NULL);
1331 
1332 again:
1333  gettimeofday(&cur_ts, NULL);
1334  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1335  FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1336  "thread - \"%s\". Killing engine", tv->name);
1337  }
1338 
1340 
1341  /* all receive threads are part of packet processing threads */
1342  tv = tv_root[TVT_PPT];
1343 
1344  /* we do have to keep in mind that TVs are arranged in the order
1345  * right from receive to log. The moment we fail to find a
1346  * receive TM amongst the slots in a tv, it indicates we are done
1347  * with all receive threads */
1348  while (tv) {
1349  int disable = 0;
1350  TmModule *tm = NULL;
1351  /* obtain the slots for this TV */
1352  TmSlot *slots = tv->tm_slots;
1353  while (slots != NULL) {
1354  tm = TmModuleGetById(slots->tm_id);
1355 
1356  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1357  disable = 1;
1358  break;
1359  }
1360 
1361  slots = slots->slot_next;
1362  continue;
1363  }
1364 
1365  if (disable) {
1366  if (ThreadStillHasPackets(tv)) {
1367  /* we wait till we dry out all the inq packets, before we
1368  * kill this thread. Do note that you should have disabled
1369  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1371  /* don't sleep while holding a lock */
1372  SleepMsec(1);
1373  goto again;
1374  }
1375 
1376  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1377  if (tm && tm->PktAcqBreakLoop != NULL) {
1378  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1379  }
1381 
1382  if (tv->inq != NULL) {
1383  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1384  SCCondSignal(&tv->inq->pq->cond_q);
1385  }
1386  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1387  }
1388 
1389  /* wait for it to enter the 'flow loop' stage */
1390  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1392 
1393  SleepMsec(1);
1394  goto again;
1395  }
1396  }
1397 
1398  tv = tv->next;
1399  }
1400 
1402 
1403  /* finally wait for all packet threads to have
1404  * processed all of their 'live' packets so we
1405  * don't process the last live packets together
1406  * with FFR packets */
1407  TmThreadDrainPacketThreads();
1408  return;
1409 }
1410 
1411 static void TmThreadDebugValidateNoMorePackets(void)
1412 {
1413 #ifdef DEBUG_VALIDATION
1415  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1416  if (ThreadStillHasPackets(tv)) {
1419  abort();
1420  }
1421  }
1423 #endif
1424 }
1425 
1426 /**
1427  * \brief Disable all packet threads
1428  */
1430 {
1431  struct timeval start_ts;
1432  struct timeval cur_ts;
1433 
1434  /* first drain all packet threads of their packets */
1435  TmThreadDrainPacketThreads();
1436 
1437  /* since all the threads possibly able to produce more packets
1438  * are now gone or inactive, we should see no packets anywhere
1439  * anymore. */
1440  TmThreadDebugValidateNoMorePackets();
1441 
1442  gettimeofday(&start_ts, NULL);
1443 again:
1444  gettimeofday(&cur_ts, NULL);
1445  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1446  FatalError(SC_ERR_FATAL, "Engine unable to disable packet "
1447  "threads. Killing engine");
1448  }
1449 
1450  /* loop through the packet threads and kill them */
1452  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1454 
1455  /* separate worker threads (autofp) will still wait at their
1456  * input queues. So nudge them here so they will observe the
1457  * THV_KILL flag. */
1458  if (tv->inq != NULL) {
1459  for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1460  SCCondSignal(&tv->inq->pq->cond_q);
1461  }
1462  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1463  }
1464 
1467 
1468  SleepMsec(1);
1469  goto again;
1470  }
1471  }
1473  return;
1474 }
1475 
1477 {
1478  ThreadVars *tv = NULL;
1479  TmSlot *slots = NULL;
1480 
1482 
1483  /* all receive threads are part of packet processing threads */
1484  tv = tv_root[TVT_PPT];
1485 
1486  while (tv) {
1487  slots = tv->tm_slots;
1488 
1489  while (slots != NULL) {
1490  TmModule *tm = TmModuleGetById(slots->tm_id);
1491 
1492  char *found = strstr(tm->name, tm_name);
1493  if (found != NULL)
1494  goto end;
1495 
1496  slots = slots->slot_next;
1497  }
1498 
1499  tv = tv->next;
1500  }
1501 
1502  end:
1504  return slots;
1505 }
1506 
1507 #define MIN_WAIT_TIME 100
1508 #define MAX_WAIT_TIME 999999
1510 {
1511  ThreadVars *tv = NULL;
1512  unsigned int sleep_usec = MIN_WAIT_TIME;
1513 
1514  BUG_ON((family < 0) || (family >= TVT_MAX));
1515 
1516 again:
1518  tv = tv_root[family];
1519 
1520  while (tv) {
1521  int r = TmThreadKillThread(tv);
1522  if (r == 0) {
1524  SleepUsec(sleep_usec);
1525  sleep_usec *= 2; /* slowly back off */
1526  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1527  goto again;
1528  }
1529  sleep_usec = MIN_WAIT_TIME; /* reset */
1530 
1531  tv = tv->next;
1532  }
1534 }
1535 #undef MIN_WAIT_TIME
1536 #undef MAX_WAIT_TIME
1537 
1539 {
1540  int i = 0;
1541 
1542  for (i = 0; i < TVT_MAX; i++) {
1544  }
1545 
1546  return;
1547 }
1548 
1549 static void TmThreadFree(ThreadVars *tv)
1550 {
1551  TmSlot *s;
1552  TmSlot *ps;
1553  if (tv == NULL)
1554  return;
1555 
1556  SCLogDebug("Freeing thread '%s'.", tv->name);
1557 
1559 
1560  TmThreadDeinitMC(tv);
1561 
1562  if (tv->thread_group_name) {
1564  }
1565 
1566  if (tv->printable_name) {
1568  }
1569 
1570  if (tv->stream_pq_local) {
1574  }
1575 
1576  s = (TmSlot *)tv->tm_slots;
1577  while (s) {
1578  ps = s;
1579  s = s->slot_next;
1580  SCFree(ps);
1581  }
1582 
1584  SCFree(tv);
1585 }
1586 
1587 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1588 {
1589  char *thread_group_name = NULL;
1590 
1591  if (name == NULL)
1592  return;
1593 
1594  if (tv == NULL)
1595  return;
1596 
1597  thread_group_name = SCStrdup(name);
1598  if (unlikely(thread_group_name == NULL)) {
1599  SCLogError(SC_ERR_RUNMODE, "error allocating memory");
1600  return;
1601  }
1602  tv->thread_group_name = thread_group_name;
1603 }
1604 
1606 {
1607  ThreadVars *tv = NULL;
1608  ThreadVars *ptv = NULL;
1609 
1610  if ((family < 0) || (family >= TVT_MAX))
1611  return;
1612 
1614  tv = tv_root[family];
1615 
1616  while (tv) {
1617  ptv = tv;
1618  tv = tv->next;
1619  TmThreadFree(ptv);
1620  }
1621  tv_root[family] = NULL;
1623 }
1624 
1625 /**
1626  * \brief Spawns a thread associated with the ThreadVars instance tv
1627  *
1628  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1629  */
1631 {
1632  pthread_attr_t attr;
1633  if (tv->tm_func == NULL) {
1634  printf("ERROR: no thread function set\n");
1635  return TM_ECODE_FAILED;
1636  }
1637 
1638  /* Initialize and set thread detached attribute */
1639  pthread_attr_init(&attr);
1640 
1641  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1642 
1643  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1644  if (rc) {
1645  printf("ERROR; return code from pthread_create() is %" PRId32 "\n", rc);
1646  return TM_ECODE_FAILED;
1647  }
1648 
1650 
1651  TmThreadAppend(tv, tv->type);
1652  return TM_ECODE_OK;
1653 }
1654 
1655 /**
1656  * \brief Initializes the mutex and condition variables for this TV
1657  *
1658  * It can be used by a thread to control a wait loop that can also be
1659  * influenced by other threads.
1660  *
1661  * \param tv Pointer to a TV instance
1662  */
1664 {
1665  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1666  SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. "
1667  "Exiting...");
1668  exit(EXIT_FAILURE);
1669  }
1670 
1671  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1672  printf("Error initializing the tv->m mutex\n");
1673  exit(EXIT_FAILURE);
1674  }
1675 
1676  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1677  SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. "
1678  "Exiting...");
1679  exit(EXIT_FAILURE);
1680  }
1681 
1682  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1683  SCLogError(SC_ERR_FATAL, "Error initializing the tv->cond condition "
1684  "variable");
1685  exit(EXIT_FAILURE);
1686  }
1687 
1688  return;
1689 }
1690 
1691 static void TmThreadDeinitMC(ThreadVars *tv)
1692 {
1693  if (tv->ctrl_mutex) {
1695  SCFree(tv->ctrl_mutex);
1696  }
1697  if (tv->ctrl_cond) {
1699  SCFree(tv->ctrl_cond);
1700  }
1701  return;
1702 }
1703 
1704 /**
1705  * \brief Tests if the thread represented in the arg has been unpaused or not.
1706  *
1707  * The function would return if the thread tv has been unpaused or if the
1708  * kill flag for the thread has been set.
1709  *
1710  * \param tv Pointer to the TV instance.
1711  */
1713 {
1714  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1715  SleepUsec(100);
1716 
1718  break;
1719  }
1720 
1721  return;
1722 }
1723 
1724 /**
1725  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1726  * the kill flag has been set or not on the thread.
1727  *
1728  * \param tv Pointer to the TV instance.
1729  */
1731 {
1732  while (!TmThreadsCheckFlag(tv, flags)) {
1733  SleepUsec(100);
1734  }
1735 
1736  return;
1737 }
1738 
1739 /**
1740  * \brief Unpauses a thread
1741  *
1742  * \param tv Pointer to a TV instance that has to be unpaused
1743  */
1745 {
1747 
1748  return;
1749 }
1750 
1751 /**
1752  * \brief Unpauses all threads present in tv_root
1753  */
1755 {
1757  for (int i = 0; i < TVT_MAX; i++) {
1758  ThreadVars *tv = tv_root[i];
1759  while (tv != NULL) {
1761  tv = tv->next;
1762  }
1763  }
1765  return;
1766 }
1767 
1768 /**
1769  * \brief Pauses a thread
1770  *
1771  * \param tv Pointer to a TV instance that has to be paused
1772  */
1774 {
1776  return;
1777 }
1778 
1779 /**
1780  * \brief Pauses all threads present in tv_root
1781  */
1783 {
1785 
1787  for (int i = 0; i < TVT_MAX; i++) {
1788  ThreadVars *tv = tv_root[i];
1789  while (tv != NULL) {
1790  TmThreadPause(tv);
1791  tv = tv->next;
1792  }
1793  }
1795 }
1796 
1797 /**
1798  * \brief Used to check the thread for certain conditions of failure.
1799  */
1801 {
1803  for (int i = 0; i < TVT_MAX; i++) {
1804  ThreadVars *tv = tv_root[i];
1805  while (tv) {
1807  FatalError(SC_ERR_FATAL, "thread %s failed", tv->name);
1808  }
1809  tv = tv->next;
1810  }
1811  }
1813  return;
1814 }
1815 
1816 /**
1817  * \brief Used to check if all threads have finished their initialization. On
1818  * finding an un-initialized thread, it waits till that thread completes
1819  * its initialization, before proceeding to the next thread.
1820  *
1821  * \retval TM_ECODE_OK all initialized properly
1822  * \retval TM_ECODE_FAILED failure
1823  */
1825 {
1826  uint16_t mgt_num = 0;
1827  uint16_t ppt_num = 0;
1828 
1829  struct timeval start_ts;
1830  struct timeval cur_ts;
1831  gettimeofday(&start_ts, NULL);
1832 
1833 again:
1835  for (int i = 0; i < TVT_MAX; i++) {
1836  ThreadVars *tv = tv_root[i];
1837  while (tv != NULL) {
1840 
1841  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1842  "initialize: flags %04x", tv->name,
1843  SC_ATOMIC_GET(tv->flags));
1844  return TM_ECODE_FAILED;
1845  }
1846 
1847  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
1849 
1850  gettimeofday(&cur_ts, NULL);
1851  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
1852  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1853  "initialize in time: flags %04x", tv->name,
1854  SC_ATOMIC_GET(tv->flags));
1855  return TM_ECODE_FAILED;
1856  }
1857 
1858  /* sleep a little to give the thread some
1859  * time to finish initialization */
1860  SleepUsec(100);
1861  goto again;
1862  }
1863 
1866  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
1867  "initialize.", tv->name);
1868  return TM_ECODE_FAILED;
1869  }
1872  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" closed on "
1873  "initialization.", tv->name);
1874  return TM_ECODE_FAILED;
1875  }
1876 
1877  if (i == TVT_MGMT)
1878  mgt_num++;
1879  else if (i == TVT_PPT)
1880  ppt_num++;
1881 
1882  tv = tv->next;
1883  }
1884  }
1886 
1887  SCLogNotice("all %"PRIu16" packet processing threads, %"PRIu16" management "
1888  "threads initialized, engine started.", ppt_num, mgt_num);
1889 
1890  return TM_ECODE_OK;
1891 }
1892 
1893 /**
1894  * \brief Returns the TV for the calling thread.
1895  *
1896  * \retval tv Pointer to the ThreadVars instance for the calling thread;
1897  * NULL on no match
1898  */
1900 {
1901  pthread_t self = pthread_self();
1902 
1904  for (int i = 0; i < TVT_MAX; i++) {
1905  ThreadVars *tv = tv_root[i];
1906  while (tv) {
1907  if (pthread_equal(self, tv->t)) {
1909  return tv;
1910  }
1911  tv = tv->next;
1912  }
1913  }
1915  return NULL;
1916 }
1917 
1918 /**
1919  * \brief returns a count of all the threads that match the flag
1920  */
1922 {
1923  uint32_t cnt = 0;
1925  for (int i = 0; i < TVT_MAX; i++) {
1926  ThreadVars *tv = tv_root[i];
1927  while (tv != NULL) {
1928  if ((tv->tmm_flags & flags) == flags)
1929  cnt++;
1930 
1931  tv = tv->next;
1932  }
1933  }
1935  return cnt;
1936 }
1937 
1938 static void TmThreadDoDumpSlots(const ThreadVars *tv)
1939 {
1940  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
1942  SCLogNotice("tv %p: -> slot %p tm_id %d name %s",
1943  tv, s, s->tm_id, m->name);
1944  }
1945 }
1946 
1948 {
1950  for (int i = 0; i < TVT_MAX; i++) {
1951  ThreadVars *tv = tv_root[i];
1952  while (tv != NULL) {
1953  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
1954  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
1955  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
1956  if (tv->inq && tv->stream_pq == tv->inq->pq) {
1957  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
1958  } else if (tv->stream_pq_local != NULL) {
1959  for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
1960  SCLogNotice("tv %p: ==> stream_pq_local: pq.len %u packet src %s",
1961  tv, tv->stream_pq_local->len, PktSrcToString(xp->pkt_src));
1962  }
1963  }
1964  for (Packet *xp = tv->decode_pq.top; xp != NULL; xp = xp->next) {
1965  SCLogNotice("tv %p: ==> decode_pq: decode_pq.len %u packet src %s",
1966  tv, tv->decode_pq.len, PktSrcToString(xp->pkt_src));
1967  }
1968  TmThreadDoDumpSlots(tv);
1969  tv = tv->next;
1970  }
1971  }
1974 }
1975 
1976 typedef struct Thread_ {
1977  ThreadVars *tv; /**< threadvars structure */
1978  const char *name;
1979  int type;
1980  int in_use; /**< bool to indicate this is in use */
1981 
1982  struct timeval pktts; /**< current packet time of this thread
1983  * (offline mode) */
1984  uint32_t sys_sec_stamp; /**< timestamp in seconds of the real system
1985  * time when the pktts was last updated. */
1986 } Thread;
1987 
1988 typedef struct Threads_ {
1992 } Threads;
1993 
1994 static Threads thread_store = { NULL, 0, 0 };
1995 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
1996 
1998 {
1999  SCMutexLock(&thread_store_lock);
2000  for (size_t s = 0; s < thread_store.threads_size; s++) {
2001  Thread *t = &thread_store.threads[s];
2002  if (t == NULL || t->in_use == 0)
2003  continue;
2004 
2005  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2006  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2007  if (t->tv) {
2008  ThreadVars *tv = t->tv;
2009  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2010  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2011  tv, tv->type, tv->name, tv->tmm_flags, flags);
2012  }
2013  }
2014  SCMutexUnlock(&thread_store_lock);
2015 }
2016 
2017 #define STEP 32
2018 /**
2019  * \retval id thread id, or 0 if not found
2020  */
2022 {
2023  SCMutexLock(&thread_store_lock);
2024  if (thread_store.threads == NULL) {
2025  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2026  BUG_ON(thread_store.threads == NULL);
2027  thread_store.threads_size = STEP;
2028  }
2029 
2030  size_t s;
2031  for (s = 0; s < thread_store.threads_size; s++) {
2032  if (thread_store.threads[s].in_use == 0) {
2033  Thread *t = &thread_store.threads[s];
2034  t->name = tv->name;
2035  t->type = type;
2036  t->tv = tv;
2037  t->in_use = 1;
2038 
2039  SCMutexUnlock(&thread_store_lock);
2040  return (int)(s+1);
2041  }
2042  }
2043 
2044  /* if we get here the array is completely filled */
2045  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2046  BUG_ON(newmem == NULL);
2047  thread_store.threads = newmem;
2048  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2049 
2050  Thread *t = &thread_store.threads[thread_store.threads_size];
2051  t->name = tv->name;
2052  t->type = type;
2053  t->tv = tv;
2054  t->in_use = 1;
2055 
2056  s = thread_store.threads_size;
2057  thread_store.threads_size += STEP;
2058 
2059  SCMutexUnlock(&thread_store_lock);
2060  return (int)(s+1);
2061 }
2062 #undef STEP
2063 
2064 void TmThreadsUnregisterThread(const int id)
2065 {
2066  SCMutexLock(&thread_store_lock);
2067  if (id <= 0 || id > (int)thread_store.threads_size) {
2068  SCMutexUnlock(&thread_store_lock);
2069  return;
2070  }
2071 
2072  /* id is one higher than index */
2073  int idx = id - 1;
2074 
2075  /* reset thread_id, which serves as clearing the record */
2076  thread_store.threads[idx].in_use = 0;
2077 
2078  /* check if we have at least one registered thread left */
2079  size_t s;
2080  for (s = 0; s < thread_store.threads_size; s++) {
2081  Thread *t = &thread_store.threads[s];
2082  if (t->in_use == 1) {
2083  goto end;
2084  }
2085  }
2086 
2087  /* if we get here no threads are registered */
2088  SCFree(thread_store.threads);
2089  thread_store.threads = NULL;
2090  thread_store.threads_size = 0;
2091  thread_store.threads_cnt = 0;
2092 
2093 end:
2094  SCMutexUnlock(&thread_store_lock);
2095 }
2096 
2097 void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
2098 {
2099  SCMutexLock(&thread_store_lock);
2100  if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2101  SCMutexUnlock(&thread_store_lock);
2102  return;
2103  }
2104 
2105  int idx = id - 1;
2106  Thread *t = &thread_store.threads[idx];
2107  t->pktts = *ts;
2108  struct timeval systs;
2109  gettimeofday(&systs, NULL);
2110  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2111  SCMutexUnlock(&thread_store_lock);
2112 }
2113 
2115 {
2116  bool ready = true;
2117  SCMutexLock(&thread_store_lock);
2118  for (size_t s = 0; s < thread_store.threads_size; s++) {
2119  Thread *t = &thread_store.threads[s];
2120  if (!t->in_use)
2121  break;
2122  if (t->sys_sec_stamp == 0) {
2123  ready = false;
2124  break;
2125  }
2126  }
2127  SCMutexUnlock(&thread_store_lock);
2128  return ready;
2129 }
2130 
2131 void TmThreadsInitThreadsTimestamp(const struct timeval *ts)
2132 {
2133  struct timeval systs;
2134  gettimeofday(&systs, NULL);
2135  SCMutexLock(&thread_store_lock);
2136  for (size_t s = 0; s < thread_store.threads_size; s++) {
2137  Thread *t = &thread_store.threads[s];
2138  if (!t->in_use)
2139  break;
2140  t->pktts = *ts;
2141  t->sys_sec_stamp = (uint32_t)systs.tv_sec;
2142  }
2143  SCMutexUnlock(&thread_store_lock);
2144 }
2145 
2146 void TmThreadsGetMinimalTimestamp(struct timeval *ts)
2147 {
2148  struct timeval local, nullts;
2149  memset(&local, 0, sizeof(local));
2150  memset(&nullts, 0, sizeof(nullts));
2151  int set = 0;
2152  size_t s;
2153  struct timeval systs;
2154  gettimeofday(&systs, NULL);
2155 
2156  SCMutexLock(&thread_store_lock);
2157  for (s = 0; s < thread_store.threads_size; s++) {
2158  Thread *t = &thread_store.threads[s];
2159  if (t->in_use == 0)
2160  break;
2161  if (!(timercmp(&t->pktts, &nullts, ==))) {
2162  /* ignore sleeping threads */
2163  if (t->sys_sec_stamp + 1 < (uint32_t)systs.tv_sec)
2164  continue;
2165 
2166  if (!set) {
2167  local = t->pktts;
2168  set = 1;
2169  } else {
2170  if (timercmp(&t->pktts, &local, <)) {
2171  local = t->pktts;
2172  }
2173  }
2174  }
2175  }
2176  SCMutexUnlock(&thread_store_lock);
2177  *ts = local;
2178  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2179 }
2180 
2181 /**
2182  * \retval r 1 if packet was accepted, 0 otherwise
2183  * \note if packet was not accepted, it's still the responsibility
2184  * of the caller.
2185  */
2186 int TmThreadsInjectPacketsById(Packet **packets, const int id)
2187 {
2188  if (id <= 0 || id > (int)thread_store.threads_size)
2189  return 0;
2190 
2191  int idx = id - 1;
2192 
2193  Thread *t = &thread_store.threads[idx];
2194  ThreadVars *tv = t->tv;
2195 
2196  if (tv == NULL || tv->stream_pq == NULL)
2197  return 0;
2198 
2200  while (*packets != NULL) {
2201  PacketEnqueue(tv->stream_pq, *packets);
2202  packets++;
2203  }
2205 
2206  /* wake up listening thread(s) if necessary */
2207  if (tv->inq != NULL) {
2208  SCCondSignal(&tv->inq->pq->cond_q);
2209  }
2210  return 1;
2211 }
TmThreadSetCPUAffinity
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:811
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:67
rwr_lock_cnt
__thread uint64_t rwr_lock_cnt
ThreadsAffinityType_::medprio_cpu
cpu_set_t medprio_cpu
Definition: util-affinity.h:75
TmSlot_::tm_id
int tm_id
Definition: tm-threads.h:72
Tmq_::writer_cnt
uint16_t writer_cnt
Definition: tm-queues.h:34
TmThreadsTimeSubsysIsReady
bool TmThreadsTimeSubsysIsReady(void)
Definition: tm-threads.c:2114
TmThreadInitMC
void TmThreadInitMC(ThreadVars *tv)
Initializes the mutex and condition variables for this TV.
Definition: tm-threads.c:1663
tm-threads.h
len
uint8_t len
Definition: app-layer-dnp3.h:4
ts
uint64_t ts
Definition: source-erf-file.c:2
THV_USE
#define THV_USE
Definition: threadvars.h:36
TmThreadsSlotVarRun
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:113
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1630
Threads
struct Threads_ Threads
TmThreadCreateMgmtThreadByName
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1099
SCStrdup
#define SCStrdup(a)
Definition: util-mem.h:268
TmqhNameToID
int TmqhNameToID(const char *name)
Definition: tm-queuehandlers.c:53
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:852
Tmq_::id
uint16_t id
Definition: tm-queues.h:32
ThreadVars_::name
char name[16]
Definition: threadvars.h:64
SC_ATOMIC_INIT
#define SC_ATOMIC_INIT(name)
Initialize the previously declared atomic variable and it's lock.
Definition: util-atomic.h:81
TmThreadCreatePacketHandler
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn't support custo...
Definition: tm-threads.c:1041
Thread_::pktts
struct timeval pktts
Definition: tm-threads.c:1982
SCFree
#define SCFree(a)
Definition: util-mem.h:322
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:207
SCCtrlMutexDestroy
#define SCCtrlMutexDestroy
Definition: threads-debug.h:379
rww_lock_wait_ticks
__thread uint64_t rww_lock_wait_ticks
TmThreadSetGroupName
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1587
TmThreadsGetCallingThread
ThreadVars * TmThreadsGetCallingThread(void)
Returns the TV for the calling thread.
Definition: tm-threads.c:1899
ThreadVars_::outctx
void * outctx
Definition: threadvars.h:104
THV_FLOW_LOOP
#define THV_FLOW_LOOP
Definition: threadvars.h:48
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:335
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:97
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1730
PacketEnqueue
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:173
TmThreadsGetTVContainingSlot
ThreadVars * TmThreadsGetTVContainingSlot(TmSlot *tm_slot)
Definition: tm-threads.c:593
PacketQueue_
simple fifo queue for packets with mutex and cond Calling the mutex or triggering the cond is respons...
Definition: packet-queue.h:47
TmSlot_::SlotFunc
TmSlotFunc SlotFunc
Definition: tm-threads.h:55
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:45
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:80
ThreadsAffinityType_::lowprio_cpu
cpu_set_t lowprio_cpu
Definition: util-affinity.h:74
threads.h
Tmq_::pq
PacketQueue * pq
Definition: tm-queues.h:35
ThreadVars_::t
pthread_t t
Definition: threadvars.h:58
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:299
Tmqh_::OutHandler
void(* OutHandler)(ThreadVars *, Packet *)
Definition: tm-queuehandlers.h:40
spin_lock_cnt
__thread uint64_t spin_lock_cnt
AffinityGetNextCPU
int AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
Definition: util-affinity.c:297
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:1921
ThreadVars_::outq
Tmq * outq
Definition: threadvars.h:103
TmThreadsInitThreadsTimestamp
void TmThreadsInitThreadsTimestamp(const struct timeval *ts)
Definition: tm-threads.c:2131
StatsSetupPrivate
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1194
Tmq_::is_packet_pool
bool is_packet_pool
Definition: tm-queues.h:31
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
ThreadVars_::stream_pq_local
struct PacketQueue_ * stream_pq_local
Definition: threadvars.h:114
MIN
#define MIN(x, y)
Definition: suricata-common.h:360
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:78
ThreadVars_::cpu_affinity
uint16_t cpu_affinity
Definition: threadvars.h:73
TmThreadDisableReceiveThreads
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1325
util-privs.h
SCCtrlCondDestroy
#define SCCtrlCondDestroy
Definition: threads-debug.h:387
SC_ERR_THREAD_CREATE
@ SC_ERR_THREAD_CREATE
Definition: util-error.h:78
SCMUTEX_INITIALIZER
#define SCMUTEX_INITIALIZER
Definition: threads-debug.h:121
TmThreadSetThreadPriority
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:764
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:37
m
SCMutex m
Definition: flow-hash.h:5
SleepUsec
#define SleepUsec(usec)
Definition: tm-threads.h:43
THREAD_SET_PRIORITY
#define THREAD_SET_PRIORITY
Definition: threadvars.h:136
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:449
SC_ERR_SHUTDOWN
@ SC_ERR_SHUTDOWN
Definition: util-error.h:220
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:79
SC_ERR_RUNMODE
@ SC_ERR_RUNMODE
Definition: util-error.h:219
Threads_::threads
Thread * threads
Definition: tm-threads.c:1989
TVT_MGMT
@ TVT_MGMT
Definition: tm-threads-common.h:86
tmqh-packetpool.h
STEP
#define STEP
Definition: tm-threads.c:2017
Thread_::in_use
int in_use
Definition: tm-threads.c:1980
THV_PAUSE
#define THV_PAUSE
Definition: threadvars.h:38
TmThreadDisablePacketThreads
void TmThreadDisablePacketThreads(void)
Disable all packet threads.
Definition: tm-threads.c:1429
TmModule_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
EXCLUSIVE_AFFINITY
@ EXCLUSIVE_AFFINITY
Definition: util-affinity.h:60
PacketPoolInit
void PacketPoolInit(void)
Definition: tmqh-packetpool.c:370
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:78
Tmqh_::InHandler
Packet *(* InHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:38
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:80
TmThreadsInjectPacketsById
int TmThreadsInjectPacketsById(Packet **packets, const int id)
Definition: tm-threads.c:2186
TmThreadsGetMinimalTimestamp
void TmThreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2146
TmThreadContinueThreads
void TmThreadContinueThreads()
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1754
strlcpy
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
PacketQueue_::mutex_q
SCMutex mutex_q
Definition: packet-queue.h:54
TmModuleGetByName
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:53
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:46
TmThreadsUnsetFlag
void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
Unset a thread flag.
Definition: tm-threads.c:105
util-signal.h
SC_ERR_THREAD_INIT
@ SC_ERR_THREAD_INIT
Definition: util-error.h:79
Tmqh_::InShutdownHandler
void(* InShutdownHandler)(ThreadVars *)
Definition: tm-queuehandlers.h:39
rwr_lock_contention
__thread uint64_t rwr_lock_contention
SCCtrlCondInit
#define SCCtrlCondInit
Definition: threads-debug.h:383
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:78
TmThreadSetPrio
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:775
MIN_WAIT_TIME
#define MIN_WAIT_TIME
Definition: tm-threads.c:1507
mutex_lock_cnt
__thread uint64_t mutex_lock_cnt
Thread_::tv
ThreadVars * tv
Definition: tm-threads.c:1977
TmThreadContinue
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1744
util-debug.h
TmSlot_::PktAcqLoop
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:56
type
uint8_t type
Definition: decode-icmpv4.h:2
TmThreadWaitOnThreadInit
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread,...
Definition: tm-threads.c:1824
Threads_::threads_cnt
int threads_cnt
Definition: tm-threads.c:1991
TmModule_::PktAcqBreakLoop
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
util-cpu.h
ThreadsAffinityType_::hiprio_cpu
cpu_set_t hiprio_cpu
Definition: util-affinity.h:76
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:95
TmSlotGetSlotForTM
TmSlot * TmSlotGetSlotForTM(int tm_id)
Returns the slot holding a TM with the particular tm_id.
Definition: tm-threads.c:672
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:206
TmSlot_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:57
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
TmThreadPauseThreads
void TmThreadPauseThreads()
Pauses all threads present in tv_root.
Definition: tm-threads.c:1782
ThreadVars_::perf_public_ctx
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:125
TmThreadDumpThreads
void TmThreadDumpThreads(void)
Definition: tm-threads.c:1947
SCEnter
#define SCEnter(...)
Definition: util-debug.h:337
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:57
SCRealloc
#define SCRealloc(x, a)
Definition: util-mem.h:238
TmqGetQueueByName
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:59
TmThreadTestThreadUnPaused
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1712
TmModule_::Management
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:52
TmThreadClearThreadsFamily
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1605
THV_KILL
#define THV_KILL
Definition: threadvars.h:40
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:85
TmThreadCreate
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:911
PktSrcToString
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:669
TmThreadsUnregisterThread
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2064
TmThreadsRegisterThread
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2021
threading_set_cpu_affinity
int threading_set_cpu_affinity
Definition: runmodes.c:58
Tmqh_::OutHandlerCtxFree
void(* OutHandlerCtxFree)(void *)
Definition: tm-queuehandlers.h:42
SC_ERR_INVALID_ARGUMENT
@ SC_ERR_INVALID_ARGUMENT
Definition: util-error.h:43
SCMalloc
#define SCMalloc(a)
Definition: util-mem.h:222
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:122
ThreadVars_::id
int id
Definition: threadvars.h:86
ThreadVars_::type
uint8_t type
Definition: threadvars.h:71
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:265
TmModuleGetIDForTM
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:109
util-profiling.h
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:81
rwr_lock_wait_ticks
__thread uint64_t rwr_lock_wait_ticks
SCReturn
#define SCReturn
Definition: util-debug.h:339
stream.h
TmThreadKillThreads
void TmThreadKillThreads(void)
Definition: tm-threads.c:1538
PACKET_PROFILING_TMM_END
#define PACKET_PROFILING_TMM_END(p, id)
Definition: util-profiling.h:158
Packet_
Definition: decode.h:408
SCCalloc
#define SCCalloc(nm, a)
Definition: util-mem.h:253
MAX_CPU_SET
@ MAX_CPU_SET
Definition: util-affinity.h:55
ThreadVars_::ctrl_cond
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:130
rww_lock_cnt
__thread uint64_t rww_lock_cnt
ThreadVars_::thread_group_name
char * thread_group_name
Definition: threadvars.h:66
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:90
MAX_WAIT_TIME
#define MAX_WAIT_TIME
Definition: tm-threads.c:1508
ThreadsAffinityType_::cpu_set
cpu_set_t cpu_set
Definition: util-affinity.h:73
TmSlot_
Definition: tm-threads.h:52
ThreadsAffinityType_::mode_flag
uint8_t mode_flag
Definition: util-affinity.h:66
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:68
TmEcode
TmEcode
Definition: tm-threads-common.h:77
queue.h
TmModule_::name
const char * name
Definition: tm-modules.h:44
runmodes.h
PacketQueue_::cond_q
SCCondT cond_q
Definition: packet-queue.h:55
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1071
SCMutexInit
#define SCMutexInit(mut, mutattrs)
Definition: threads-debug.h:116
TM_FLAG_RECEIVE_TM
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:31
TmThreadPause
void TmThreadPause(ThreadVars *tv)
Pauses a thread.
Definition: tm-threads.c:1773
TmModule_
Definition: tm-modules.h:43
SCGetThreadIdLong
#define SCGetThreadIdLong(...)
Definition: threads.h:253
ThreadVars_::stream_pq
struct PacketQueue_ * stream_pq
Definition: threadvars.h:113
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
tm-queuehandlers.h
THV_PAUSED
#define THV_PAUSED
Definition: threadvars.h:39
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:37
TmSlotSetFuncAppend
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:623
StatsPublicThreadContext_::m
SCMutex m
Definition: counters.h:72
TmThreadGetFirstTmSlotForPartialPattern
TmSlot * TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name)
Definition: tm-threads.c:1476
TmThreadKillThreadsFamily
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1509
Thread_::sys_sec_stamp
uint32_t sys_sec_stamp
Definition: tm-threads.c:1984
Thread_::type
int type
Definition: tm-threads.c:1979
Thread
struct Thread_ Thread
SCCondSignal
#define SCCondSignal
Definition: threads-debug.h:139
ThreadVars_::inq
Tmq * inq
Definition: threadvars.h:89
TmThreadAppend
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1158
ThreadVars_::thread_priority
int thread_priority
Definition: threadvars.h:74
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:44
flags
uint8_t flags
Definition: decode-gre.h:2
THV_DEAD
#define THV_DEAD
Definition: threadvars.h:54
suricata-common.h
TmThreadSetCPU
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:820
Threads_
Definition: tm-threads.c:1988
tm-queues.h
PacketQueueNoLock_::top
struct Packet_ * top
Definition: packet-queue.h:33
thread_affinity
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:34
ThreadVars_::tmqh_out
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:105
THV_FAILED
#define THV_FAILED
Definition: threadvars.h:41
ThreadVars_::tm_flowworker
struct TmSlot_ * tm_flowworker
Definition: threadvars.h:100
TmThreadGetNbThreads
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:836
SCLogPerf
#define SCLogPerf(...)
Definition: util-debug.h:261
PacketQueue_::len
uint32_t len
Definition: packet-queue.h:50
TmSlot_::SlotThreadInit
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:65
SCLogError
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:294
TmThreadsSetThreadTimestamp
void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
Definition: tm-threads.c:2097
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
TmSlot_::slot_initdata
const void * slot_initdata
Definition: tm-threads.h:70
FatalError
#define FatalError(x,...)
Definition: util-debug.h:569
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:65
TmThreadCreateCmdThreadByName
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1132
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
THREAD_SET_AFFTYPE
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:137
util-optimize.h
TmModule_::ThreadExitPrintStats
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
PacketDequeue
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:212
threadvars.h
ThreadsAffinityType_
Definition: util-affinity.h:64
spin_lock_contention
__thread uint64_t spin_lock_contention
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:35
THV_KILL_PKTACQ
#define THV_KILL_PKTACQ
Definition: threadvars.h:47
Packet_::next
struct Packet_ * next
Definition: decode.h:571
Thread_::name
const char * name
Definition: tm-threads.c:1978
ThreadsAffinityType_::nb_threads
int nb_threads
Definition: util-affinity.h:68
PACKET_PROFILING_TMM_START
#define PACKET_PROFILING_TMM_START(p, id)
Definition: util-profiling.h:150
PacketQueue_::top
struct Packet_ * top
Definition: packet-queue.h:48
ThreadVars_::tmqh_in
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:90
Tmqh_::OutHandlerCtxSetup
void *(* OutHandlerCtxSetup)(const char *)
Definition: tm-queuehandlers.h:41
TmqCreateQueue
Tmq * TmqCreateQueue(const char *name)
SCLogWarning
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:281
TmSlot_::SlotThreadDeinit
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:67
PRIO_MEDIUM
@ PRIO_MEDIUM
Definition: threads.h:87
SC_ERR_FATAL
@ SC_ERR_FATAL
Definition: util-error.h:203
TVT_MAX
@ TVT_MAX
Definition: tm-threads-common.h:88
SC_ERR_THREAD_NICE_PRIO
@ SC_ERR_THREAD_NICE_PRIO
Definition: util-error.h:77
TmSlot_::SlotThreadExitPrintStats
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:66
TmqhGetQueueHandlerByID
Tmqh * TmqhGetQueueHandlerByID(const int id)
Definition: tm-queuehandlers.c:77
ThreadVars_::outq_id
uint8_t outq_id
Definition: threadvars.h:83
MANAGEMENT_CPU_SET
@ MANAGEMENT_CPU_SET
Definition: util-affinity.h:54
PRIO_LOW
@ PRIO_LOW
Definition: threads.h:86
ThreadVars_::decode_pq
PacketQueueNoLock decode_pq
Definition: threadvars.h:109
mutex_lock_contention
__thread uint64_t mutex_lock_contention
SCCtrlMutexInit
#define SCCtrlMutexInit(mut, mutattr)
Definition: threads-debug.h:375
TVT_CMD
@ TVT_CMD
Definition: tm-threads-common.h:87
rww_lock_contention
__thread uint64_t rww_lock_contention
SC_ERR_MEM_ALLOC
@ SC_ERR_MEM_ALLOC
Definition: util-error.h:31
suricata.h
EngineDone
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:428
THREAD_SET_AFFINITY
#define THREAD_SET_AFFINITY
Definition: threadvars.h:135
PacketPoolInitEmpty
void PacketPoolInitEmpty(void)
Definition: tmqh-packetpool.c:351
Tmq_
Definition: tm-queues.h:29
StatsSyncCounters
#define StatsSyncCounters(tv)
Definition: counters.h:134
TmThreadsListThreads
void TmThreadsListThreads(void)
Definition: tm-threads.c:1997
TMQH_NOT_SET
@ TMQH_NOT_SET
Definition: tm-queuehandlers.h:28
likely
#define likely(expr)
Definition: util-optimize.h:32
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:61
TmThreadCheckThreadState
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:1800
ThreadVars_::ctrl_mutex
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:129
ThreadVars_::inq_id
uint8_t inq_id
Definition: threadvars.h:82
ThreadsAffinityType_::prio
int prio
Definition: util-affinity.h:67
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:192
PacketPoolDestroy
void PacketPoolDestroy(void)
Definition: tmqh-packetpool.c:407
spin_lock_wait_ticks
__thread uint64_t spin_lock_wait_ticks
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:89
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:269
TmqhGetQueueHandlerByName
Tmqh * TmqhGetQueueHandlerByName(const char *name)
Definition: tm-queuehandlers.c:65
PRIO_HIGH
@ PRIO_HIGH
Definition: threads.h:88
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:42
Thread_
Definition: tm-threads.c:1976
SCMutexDestroy
#define SCMutexDestroy
Definition: threads-debug.h:120
Threads_::threads_size
size_t threads_size
Definition: tm-threads.c:1990
StatsThreadCleanup
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1293
Tmq_::reader_cnt
uint16_t reader_cnt
Definition: tm-queues.h:33
ThreadVars_::tm_func
void *(* tm_func)(void *)
Definition: threadvars.h:62
SCMutex
#define SCMutex
Definition: threads-debug.h:114
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:70
SC_ATOMIC_AND
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value from our atomic variable.
Definition: util-atomic.h:141
Tmqh_
Definition: tm-queuehandlers.h:36
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:198
mutex_lock_wait_ticks
__thread uint64_t mutex_lock_wait_ticks
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value from our atomic variable.
Definition: util-atomic.h:154