suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2017 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-debug.h"
39 #include "util-privs.h"
40 #include "util-cpu.h"
41 #include "util-optimize.h"
42 #include "util-profiling.h"
43 #include "util-signal.h"
44 #include "queue.h"
45 
46 #ifdef PROFILE_LOCKING
47 __thread uint64_t mutex_lock_contention;
48 __thread uint64_t mutex_lock_wait_ticks;
49 __thread uint64_t mutex_lock_cnt;
50 
51 __thread uint64_t spin_lock_contention;
52 __thread uint64_t spin_lock_wait_ticks;
53 __thread uint64_t spin_lock_cnt;
54 
55 __thread uint64_t rww_lock_contention;
56 __thread uint64_t rww_lock_wait_ticks;
57 __thread uint64_t rww_lock_cnt;
58 
59 __thread uint64_t rwr_lock_contention;
60 __thread uint64_t rwr_lock_wait_ticks;
61 __thread uint64_t rwr_lock_cnt;
62 #endif
63 
64 #ifdef OS_FREEBSD
65 #include <sched.h>
66 #include <sys/param.h>
67 #include <sys/resource.h>
68 #include <sys/cpuset.h>
69 #include <sys/thr.h>
70 #define cpu_set_t cpuset_t
71 #endif /* OS_FREEBSD */
72 
73 /* prototypes */
74 static int SetCPUAffinity(uint16_t cpu);
75 
76 static void TmThreadDeinitMC(ThreadVars *tv);
77 
78 /* root of the threadvars list */
79 ThreadVars *tv_root[TVT_MAX] = { NULL };
80 
81 /* lock to protect tv_root */
83 
84 /**
85  * \brief Check if a thread flag is set.
86  *
87  * \retval 1 flag is set.
88  * \retval 0 flag is not set.
89  */
90 int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag)
91 {
92  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
93 }
94 
95 /**
96  * \brief Set a thread flag.
97  */
98 void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag)
99 {
100  SC_ATOMIC_OR(tv->flags, flag);
101 }
102 
103 /**
104  * \brief Unset a thread flag.
105  */
106 void TmThreadsUnsetFlag(ThreadVars *tv, uint16_t flag)
107 {
108  SC_ATOMIC_AND(tv->flags, ~flag);
109 }
110 
111 /**
112  * \brief Separate run function so we can call it recursively.
113  *
114  * \todo Deal with post_pq for slots beyond the first.
115  */
117  TmSlot *slot)
118 {
119  TmEcode r;
120  TmSlot *s;
121  Packet *extra_p;
122 
123  for (s = slot; s != NULL; s = s->slot_next) {
124  TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
126 
127  if (unlikely(s->id == 0)) {
128  r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq);
129  } else {
130  r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, NULL);
131  }
132 
134 
135  /* handle error */
136  if (unlikely(r == TM_ECODE_FAILED)) {
137  /* Encountered error. Return packets to packetpool and return */
139 
143 
145  return TM_ECODE_FAILED;
146  }
147 
148  /* handle new packets */
149  while (s->slot_pre_pq.top != NULL) {
150  extra_p = PacketDequeue(&s->slot_pre_pq);
151  if (unlikely(extra_p == NULL))
152  continue;
153 
154  /* see if we need to process the packet */
155  if (s->slot_next != NULL) {
156  r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
157  if (unlikely(r == TM_ECODE_FAILED)) {
159 
163 
164  TmqhOutputPacketpool(tv, extra_p);
166  return TM_ECODE_FAILED;
167  }
168  }
169  tv->tmqh_out(tv, extra_p);
170  }
171  }
172 
173  return TM_ECODE_OK;
174 }
175 
176 #ifndef AFLFUZZ_PCAP_RUNMODE
177 
178 /** \internal
179  *
180  * \brief Process flow timeout packets
181  *
182  * Process flow timeout pseudo packets. During shutdown this loop
183  * is run until the flow engine kills the thread and the queue is
184  * empty.
185  */
186 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
187 {
188  TmSlot *stream_slot = NULL, *slot = NULL;
189  int run = 1;
190  int r = TM_ECODE_OK;
191 
192  for (slot = s; slot != NULL; slot = slot->slot_next) {
193  if (slot->tm_id == TMM_FLOWWORKER)
194  {
195  stream_slot = slot;
196  break;
197  }
198  }
199 
200  if (tv->stream_pq == NULL || stream_slot == NULL) {
201  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, stream_slot);
202  return r;
203  }
204 
205  SCLogDebug("flow end loop starting");
206  while(run) {
207  Packet *p;
208  if (tv->stream_pq->len != 0) {
210  p = PacketDequeue(tv->stream_pq);
212  BUG_ON(p == NULL);
213 
214  if ((r = TmThreadsSlotProcessPkt(tv, stream_slot, p) != TM_ECODE_OK)) {
215  if (r == TM_ECODE_FAILED)
216  run = 0;
217  }
218  } else {
219  SleepUsec(1);
220  }
221 
222  if (tv->stream_pq->len == 0 && TmThreadsCheckFlag(tv, THV_KILL)) {
223  run = 0;
224  }
225  }
226  SCLogDebug("flow end loop complete");
227 
228  return r;
229 }
230 
231 /*
232 
233  pcap/nfq
234 
235  pkt read
236  callback
237  process_pkt
238 
239  pfring
240 
241  pkt read
242  process_pkt
243 
244  slot:
245  setup
246 
247  pkt_ack_loop(tv, slot_data)
248 
249  deinit
250 
251  process_pkt:
252  while(s)
253  run s;
254  queue;
255 
256  */
257 
258 static void *TmThreadsSlotPktAcqLoop(void *td)
259 {
260  ThreadVars *tv = (ThreadVars *)td;
261  TmSlot *s = tv->tm_slots;
262  char run = 1;
263  TmEcode r = TM_ECODE_OK;
264  TmSlot *slot = NULL;
265 
266  /* Set the thread name */
267  if (SCSetThreadName(tv->name) < 0) {
268  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
269  }
270 
271  if (tv->thread_setup_flags != 0)
273 
274  /* Drop the capabilities for this thread */
275  SCDropCaps(tv);
276 
277  PacketPoolInit();
278 
279  /* check if we are setup properly */
280  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
281  SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
282  " PktAcqLoop=%p, tmqh_in=%p,"
283  " tmqh_out=%p",
284  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
286  pthread_exit((void *) -1);
287  return NULL;
288  }
289 
290  for (slot = s; slot != NULL; slot = slot->slot_next) {
291  if (slot->SlotThreadInit != NULL) {
292  void *slot_data = NULL;
293  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
294  if (r != TM_ECODE_OK) {
295  if (r == TM_ECODE_DONE) {
296  EngineDone();
298  goto error;
299  } else {
301  goto error;
302  }
303  }
304  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
305  }
306  memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
307  SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
308  memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
309  SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
310 
311  /* get the 'pre qeueue' from module before the stream module */
312  if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
313  SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
314  tv->stream_pq = &slot->slot_post_pq;
315  /* if the stream module is the first, get the threads input queue */
316  } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
317  tv->stream_pq = &trans_q[tv->inq->id];
318  SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
319  }
320  }
321 
322  StatsSetupPrivate(tv);
323 
325 
326  while(run) {
327  if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
331  }
332 
333  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
334 
335  if (r == TM_ECODE_FAILED) {
337  run = 0;
338  }
340  run = 0;
341  }
342  if (r == TM_ECODE_DONE) {
343  run = 0;
344  }
345  }
346  StatsSyncCounters(tv);
347 
349 
350  /* process all pseudo packets the flow timeout may throw at us */
351  TmThreadTimeoutLoop(tv, s);
352 
355 
357 
358  for (slot = s; slot != NULL; slot = slot->slot_next) {
359  if (slot->SlotThreadExitPrintStats != NULL) {
360  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
361  }
362 
363  if (slot->SlotThreadDeinit != NULL) {
364  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
365  if (r != TM_ECODE_OK) {
367  goto error;
368  }
369  }
370 
371  BUG_ON(slot->slot_pre_pq.len);
372  BUG_ON(slot->slot_post_pq.len);
373  }
374 
375  tv->stream_pq = NULL;
376  SCLogDebug("%s ending", tv->name);
378  pthread_exit((void *) 0);
379  return NULL;
380 
381 error:
382  tv->stream_pq = NULL;
383  pthread_exit((void *) -1);
384  return NULL;
385 }
386 
387 #endif /* NO AFLFUZZ_PCAP_RUNMODE */
388 
389 #ifdef AFLFUZZ_PCAP_RUNMODE
390 /** \brief simplified loop to speed up AFL
391  *
392  * The loop runs in the caller's thread. No separate thread.
393  */
394 static void *TmThreadsSlotPktAcqLoopAFL(void *td)
395 {
396  SCLogNotice("AFL mode starting");
397 
398  ThreadVars *tv = (ThreadVars *)td;
399  TmSlot *s = tv->tm_slots;
400  char run = 1;
401  TmEcode r = TM_ECODE_OK;
402  TmSlot *slot = NULL;
403 
404  PacketPoolInit();
405 
406  /* check if we are setup properly */
407  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
408  SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
409  " PktAcqLoop=%p, tmqh_in=%p,"
410  " tmqh_out=%p",
411  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
413  return NULL;
414  }
415 
416  for (slot = s; slot != NULL; slot = slot->slot_next) {
417  if (slot->SlotThreadInit != NULL) {
418  void *slot_data = NULL;
419  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
420  if (r != TM_ECODE_OK) {
421  if (r == TM_ECODE_DONE) {
422  EngineDone();
424  goto error;
425  } else {
427  goto error;
428  }
429  }
430  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
431  }
432  memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
433  SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
434  memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
435  SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
436 
437  /* get the 'pre qeueue' from module before the stream module */
438  if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
439  SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
440  tv->stream_pq = &slot->slot_post_pq;
441  /* if the stream module is the first, get the threads input queue */
442  } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
443  tv->stream_pq = &trans_q[tv->inq->id];
444  SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
445  }
446  }
447 
448  StatsSetupPrivate(tv);
449 
451 
452  while(run) {
453  /* run right away */
454 
455  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
456 
457  if (r == TM_ECODE_FAILED) {
459  run = 0;
460  }
462  run = 0;
463  }
464  if (r == TM_ECODE_DONE) {
465  run = 0;
466  }
467  }
468  StatsSyncCounters(tv);
469 
471 
473 
475 
476  for (slot = s; slot != NULL; slot = slot->slot_next) {
477  if (slot->SlotThreadExitPrintStats != NULL) {
478  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
479  }
480 
481  if (slot->SlotThreadDeinit != NULL) {
482  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
483  if (r != TM_ECODE_OK) {
485  goto error;
486  }
487  }
488 
489  BUG_ON(slot->slot_pre_pq.len);
490  BUG_ON(slot->slot_post_pq.len);
491  }
492 
493  tv->stream_pq = NULL;
494  SCLogDebug("%s ending", tv->name);
496  return NULL;
497 
498 error:
499  tv->stream_pq = NULL;
500  return NULL;
501 }
502 #endif
503 
504 /**
505  * \todo Only the first "slot" currently makes the "post_pq" available
506  * to the thread module.
507  */
508 static void *TmThreadsSlotVar(void *td)
509 {
510  ThreadVars *tv = (ThreadVars *)td;
511  TmSlot *s = (TmSlot *)tv->tm_slots;
512  Packet *p = NULL;
513  char run = 1;
514  TmEcode r = TM_ECODE_OK;
515 
517 
518  /* Set the thread name */
519  if (SCSetThreadName(tv->name) < 0) {
520  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
521  }
522 
523  if (tv->thread_setup_flags != 0)
525 
526  /* Drop the capabilities for this thread */
527  SCDropCaps(tv);
528 
529  /* check if we are setup properly */
530  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
532  pthread_exit((void *) -1);
533  return NULL;
534  }
535 
536  for (; s != NULL; s = s->slot_next) {
537  if (s->SlotThreadInit != NULL) {
538  void *slot_data = NULL;
539  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
540  if (r != TM_ECODE_OK) {
542  goto error;
543  }
544  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
545  }
546  memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
547  SCMutexInit(&s->slot_pre_pq.mutex_q, NULL);
548  memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
549  SCMutexInit(&s->slot_post_pq.mutex_q, NULL);
550 
551  /* special case: we need to access the stream queue
552  * from the flow timeout code */
553 
554  /* get the 'pre qeueue' from module before the stream module */
555  if (s->slot_next != NULL && (s->slot_next->tm_id == TMM_FLOWWORKER)) {
556  SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq);
557  tv->stream_pq = &s->slot_pre_pq;
558  /* if the stream module is the first, get the threads input queue */
559  } else if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
560  tv->stream_pq = &trans_q[tv->inq->id];
561  SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq);
562  }
563  }
564 
565  StatsSetupPrivate(tv);
566 
568 
569  s = (TmSlot *)tv->tm_slots;
570 
571  while (run) {
572  if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
576  }
577 
578  /* input a packet */
579  p = tv->tmqh_in(tv);
580 
581  if (p != NULL) {
582  /* run the thread module(s) */
583  r = TmThreadsSlotVarRun(tv, p, s);
584  if (r == TM_ECODE_FAILED) {
585  TmqhOutputPacketpool(tv, p);
587  break;
588  }
589 
590  /* output the packet */
591  tv->tmqh_out(tv, p);
592 
593  } /* if (p != NULL) */
594 
595  /* now handle the post_pq packets */
596  TmSlot *slot;
597  for (slot = s; slot != NULL; slot = slot->slot_next) {
598  if (slot->slot_post_pq.top != NULL) {
599  while (1) {
601  Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
603 
604  if (extra_p == NULL)
605  break;
606 
607  if (slot->slot_next != NULL) {
608  r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
609  if (r == TM_ECODE_FAILED) {
613 
614  TmqhOutputPacketpool(tv, extra_p);
616  break;
617  }
618  }
619  /* output the packet */
620  tv->tmqh_out(tv, extra_p);
621  } /* while */
622  } /* if */
623  } /* for */
624 
625  if (TmThreadsCheckFlag(tv, THV_KILL)) {
626  run = 0;
627  }
628  } /* while (run) */
629  StatsSyncCounters(tv);
630 
633 
635 
636  s = (TmSlot *)tv->tm_slots;
637 
638  for ( ; s != NULL; s = s->slot_next) {
639  if (s->SlotThreadExitPrintStats != NULL) {
640  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
641  }
642 
643  if (s->SlotThreadDeinit != NULL) {
644  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
645  if (r != TM_ECODE_OK) {
647  goto error;
648  }
649  }
650  BUG_ON(s->slot_pre_pq.len);
651  BUG_ON(s->slot_post_pq.len);
652  }
653 
654  SCLogDebug("%s ending", tv->name);
655  tv->stream_pq = NULL;
657  pthread_exit((void *) 0);
658  return NULL;
659 
660 error:
661  tv->stream_pq = NULL;
662  pthread_exit((void *) -1);
663  return NULL;
664 }
665 
666 static void *TmThreadsManagement(void *td)
667 {
668  ThreadVars *tv = (ThreadVars *)td;
669  TmSlot *s = (TmSlot *)tv->tm_slots;
670  TmEcode r = TM_ECODE_OK;
671 
672  BUG_ON(s == NULL);
673 
674  /* Set the thread name */
675  if (SCSetThreadName(tv->name) < 0) {
676  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
677  }
678 
679  if (tv->thread_setup_flags != 0)
681 
682  /* Drop the capabilities for this thread */
683  SCDropCaps(tv);
684 
685  SCLogDebug("%s starting", tv->name);
686 
687  if (s->SlotThreadInit != NULL) {
688  void *slot_data = NULL;
689  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
690  if (r != TM_ECODE_OK) {
692  pthread_exit((void *) -1);
693  return NULL;
694  }
695  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
696  }
697  memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
698  memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
699 
700  StatsSetupPrivate(tv);
701 
703 
704  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
705  /* handle error */
706  if (r == TM_ECODE_FAILED) {
708  }
709 
710  if (TmThreadsCheckFlag(tv, THV_KILL)) {
711  StatsSyncCounters(tv);
712  }
713 
716 
717  if (s->SlotThreadExitPrintStats != NULL) {
718  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
719  }
720 
721  if (s->SlotThreadDeinit != NULL) {
722  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
723  if (r != TM_ECODE_OK) {
725  pthread_exit((void *) -1);
726  return NULL;
727  }
728  }
729 
731  pthread_exit((void *) 0);
732  return NULL;
733 }
734 
735 /**
736  * \brief We set the slot functions.
737  *
738  * \param tv Pointer to the TV to set the slot function for.
739  * \param name Name of the slot variant.
740  * \param fn_p Pointer to a custom slot function. Used only if slot variant
741  * "name" is "custom".
742  *
743  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
744  */
745 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
746 {
747  if (name == NULL) {
748  if (fn_p == NULL) {
749  printf("Both slot name and function pointer can't be NULL inside "
750  "TmThreadSetSlots\n");
751  goto error;
752  } else {
753  name = "custom";
754  }
755  }
756 
757  if (strcmp(name, "varslot") == 0) {
758  tv->tm_func = TmThreadsSlotVar;
759  } else if (strcmp(name, "pktacqloop") == 0) {
760 #ifndef AFLFUZZ_PCAP_RUNMODE
761  tv->tm_func = TmThreadsSlotPktAcqLoop;
762 #else
763  tv->tm_func = TmThreadsSlotPktAcqLoopAFL;
764 #endif
765  } else if (strcmp(name, "management") == 0) {
766  tv->tm_func = TmThreadsManagement;
767  } else if (strcmp(name, "command") == 0) {
768  tv->tm_func = TmThreadsManagement;
769  } else if (strcmp(name, "custom") == 0) {
770  if (fn_p == NULL)
771  goto error;
772  tv->tm_func = fn_p;
773  } else {
774  printf("Error: Slot \"%s\" not supported\n", name);
775  goto error;
776  }
777 
778  return TM_ECODE_OK;
779 
780 error:
781  return TM_ECODE_FAILED;
782 }
783 
785 {
786  ThreadVars *tv;
787  int i;
788 
790 
791  for (i = 0; i < TVT_MAX; i++) {
792  tv = tv_root[i];
793 
794  while (tv) {
795  TmSlot *slots = tv->tm_slots;
796  while (slots != NULL) {
797  if (slots == tm_slot) {
799  return tv;
800  }
801  slots = slots->slot_next;
802  }
803  tv = tv->next;
804  }
805  }
806 
808 
809  return NULL;
810 }
811 
812 /**
813  * \brief Appends a new entry to the slots.
814  *
815  * \param tv TV the slot is attached to.
816  * \param tm TM to append.
817  * \param data Data to be passed on to the slot init function.
818  *
819  * \retval The allocated TmSlot or NULL if there is an error
820  */
821 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
822 {
823  TmSlot *slot = SCMalloc(sizeof(TmSlot));
824  if (unlikely(slot == NULL))
825  return;
826  memset(slot, 0, sizeof(TmSlot));
827  SC_ATOMIC_INIT(slot->slot_data);
828  slot->tv = tv;
829  slot->SlotThreadInit = tm->ThreadInit;
830  slot->slot_initdata = data;
831  SC_ATOMIC_INIT(slot->SlotFunc);
832  (void)SC_ATOMIC_SET(slot->SlotFunc, tm->Func);
833  slot->PktAcqLoop = tm->PktAcqLoop;
834  slot->Management = tm->Management;
836  slot->SlotThreadDeinit = tm->ThreadDeinit;
837  /* we don't have to check for the return value "-1". We wouldn't have
838  * received a TM as arg, if it didn't exist */
839  slot->tm_id = TmModuleGetIDForTM(tm);
840 
841  tv->tmm_flags |= tm->flags;
842  tv->cap_flags |= tm->cap_flags;
843 
844  if (tv->tm_slots == NULL) {
845  tv->tm_slots = slot;
846  slot->id = 0;
847  } else {
848  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
849 
850  /* get the last slot */
851  for ( ; a != NULL; a = a->slot_next) {
852  b = a;
853  }
854  /* append the new slot */
855  if (b != NULL) {
856  b->slot_next = slot;
857  slot->id = b->id + 1;
858  }
859  }
860  return;
861 }
862 
863 /**
864  * \brief Returns the slot holding a TM with the particular tm_id.
865  *
866  * \param tm_id TM id of the TM whose slot has to be returned.
867  *
868  * \retval slots Pointer to the slot.
869  */
871 {
872  ThreadVars *tv = NULL;
873  TmSlot *slots;
874  int i;
875 
877 
878  for (i = 0; i < TVT_MAX; i++) {
879  tv = tv_root[i];
880  while (tv) {
881  slots = tv->tm_slots;
882  while (slots != NULL) {
883  if (slots->tm_id == tm_id) {
885  return slots;
886  }
887  slots = slots->slot_next;
888  }
889  tv = tv->next;
890  }
891  }
892 
894 
895  return NULL;
896 }
897 
898 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
899 static int SetCPUAffinitySet(cpu_set_t *cs)
900 {
901 #if defined OS_FREEBSD
902  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
903  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
904 #elif OS_DARWIN
905  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
906  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
907 #else
908  pid_t tid = syscall(SYS_gettid);
909  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
910 #endif /* OS_FREEBSD */
911 
912  if (r != 0) {
913  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
914  strerror(errno));
915  return -1;
916  }
917 
918  return 0;
919 }
920 #endif
921 
922 
923 /**
924  * \brief Set the thread affinity on the calling thread.
925  *
926  * \param cpuid Id of the core/cpu to setup the affinity.
927  *
928  * \retval 0 If all goes well; -1 if something is wrong.
929  */
930 static int SetCPUAffinity(uint16_t cpuid)
931 {
932 #if defined __OpenBSD__ || defined sun
933  return 0;
934 #else
935  int cpu = (int)cpuid;
936 
937 #if defined OS_WIN32 || defined __CYGWIN__
938  DWORD cs = 1 << cpu;
939 
940  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
941  if (r != 0) {
942  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
943  strerror(errno));
944  return -1;
945  }
946  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
947  SCGetThreadIdLong(), cpu);
948 
949  return 0;
950 
951 #else
952  cpu_set_t cs;
953 
954  CPU_ZERO(&cs);
955  CPU_SET(cpu, &cs);
956  return SetCPUAffinitySet(&cs);
957 #endif /* windows */
958 #endif /* not supported */
959 }
960 
961 
962 /**
963  * \brief Set the thread options (thread priority).
964  *
965  * \param tv Pointer to the ThreadVars to setup the thread priority.
966  *
967  * \retval TM_ECODE_OK.
968  */
970 {
972  tv->thread_priority = prio;
973 
974  return TM_ECODE_OK;
975 }
976 
977 /**
978  * \brief Adjusting nice value for threads.
979  */
981 {
982  SCEnter();
983 #ifndef __CYGWIN__
984 #ifdef OS_WIN32
985  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
986  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for "
987  "thread %s: %s", tv->name, strerror(errno));
988  } else {
989  SCLogDebug("Priority set to %"PRId32" for thread %s",
990  tv->thread_priority, tv->name);
991  }
992 #else
993  int ret = nice(tv->thread_priority);
994  if (ret == -1) {
995  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value %d "
996  "for thread %s: %s", tv->thread_priority, tv->name,
997  strerror(errno));
998  } else {
999  SCLogDebug("Nice value set to %"PRId32" for thread %s",
1000  tv->thread_priority, tv->name);
1001  }
1002 #endif /* OS_WIN32 */
1003 #endif
1004  SCReturn;
1005 }
1006 
1007 
1008 /**
1009  * \brief Set the thread options (cpu affinity).
1010  *
1011  * \param tv pointer to the ThreadVars to setup the affinity.
1012  * \param cpu cpu on which affinity is set.
1013  *
1014  * \retval TM_ECODE_OK
1015  */
1017 {
1019  tv->cpu_affinity = cpu;
1020 
1021  return TM_ECODE_OK;
1022 }
1023 
1024 
1026 {
1028  return TM_ECODE_OK;
1029 
1030  if (type > MAX_CPU_SET) {
1031  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
1032  return TM_ECODE_FAILED;
1033  }
1034 
1036  tv->cpu_affinity = type;
1037 
1038  return TM_ECODE_OK;
1039 }
1040 
1042 {
1043  if (type >= MAX_CPU_SET) {
1044  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
1045  return 0;
1046  }
1047 
1049 }
1050 
1051 /**
1052  * \brief Set the thread options (cpu affinitythread).
1053  * Priority should be already set by pthread_create.
1054  *
1055  * \param tv pointer to the ThreadVars of the calling thread.
1056  */
1058 {
1060  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
1061  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
1062  SCGetThreadIdLong());
1063  SetCPUAffinity(tv->cpu_affinity);
1064  }
1065 
1066 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
1068  TmThreadSetPrio(tv);
1071  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
1072  int cpu = AffinityGetNextCPU(taf);
1073  SetCPUAffinity(cpu);
1074  /* If CPU is in a set overwrite the default thread prio */
1075  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
1076  tv->thread_priority = PRIO_LOW;
1077  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
1079  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
1080  tv->thread_priority = PRIO_HIGH;
1081  } else {
1082  tv->thread_priority = taf->prio;
1083  }
1084  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
1085  "%d, thread id %lu", tv->thread_priority,
1086  tv->name, cpu, SCGetThreadIdLong());
1087  } else {
1088  SetCPUAffinitySet(&taf->cpu_set);
1089  tv->thread_priority = taf->prio;
1090  SCLogPerf("Setting prio %d for thread \"%s\", "
1091  "thread id %lu", tv->thread_priority,
1092  tv->name, SCGetThreadIdLong());
1093  }
1094  TmThreadSetPrio(tv);
1095  }
1096 #endif
1097 
1098  return TM_ECODE_OK;
1099 }
1100 
1101 /**
1102  * \brief Creates and returns the TV instance for a new thread.
1103  *
1104  * \param name Name of this TV instance
1105  * \param inq_name Incoming queue name
1106  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1107  * \param outq_name Outgoing queue name
1108  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1109  * \param slots String representation for the slot function to be used
1110  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1111  * \param mucond Flag to indicate whether to initialize the condition
1112  * and the mutex variables for this newly created TV.
1113  *
1114  * \retval the newly created TV instance, or NULL on error
1115  */
1116 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
1117  const char *outq_name, const char *outqh_name, const char *slots,
1118  void * (*fn_p)(void *), int mucond)
1119 {
1120  ThreadVars *tv = NULL;
1121  Tmq *tmq = NULL;
1122  Tmqh *tmqh = NULL;
1123 
1124  SCLogDebug("creating thread \"%s\"...", name);
1125 
1126  /* XXX create separate function for this: allocate a thread container */
1127  tv = SCMalloc(sizeof(ThreadVars));
1128  if (unlikely(tv == NULL))
1129  goto error;
1130  memset(tv, 0, sizeof(ThreadVars));
1131 
1132  SC_ATOMIC_INIT(tv->flags);
1133  SCMutexInit(&tv->perf_public_ctx.m, NULL);
1134 
1135  strlcpy(tv->name, name, sizeof(tv->name));
1136 
1137  /* default state for every newly created thread */
1140 
1141  /* set the incoming queue */
1142  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
1143  SCLogDebug("inq_name \"%s\"", inq_name);
1144 
1145  tmq = TmqGetQueueByName(inq_name);
1146  if (tmq == NULL) {
1147  tmq = TmqCreateQueue(inq_name);
1148  if (tmq == NULL)
1149  goto error;
1150  }
1151  SCLogDebug("tmq %p", tmq);
1152 
1153  tv->inq = tmq;
1154  tv->inq->reader_cnt++;
1155  SCLogDebug("tv->inq %p", tv->inq);
1156  }
1157  if (inqh_name != NULL) {
1158  SCLogDebug("inqh_name \"%s\"", inqh_name);
1159 
1160  tmqh = TmqhGetQueueHandlerByName(inqh_name);
1161  if (tmqh == NULL)
1162  goto error;
1163 
1164  tv->tmqh_in = tmqh->InHandler;
1166  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
1167  }
1168 
1169  /* set the outgoing queue */
1170  if (outqh_name != NULL) {
1171  SCLogDebug("outqh_name \"%s\"", outqh_name);
1172 
1173  tmqh = TmqhGetQueueHandlerByName(outqh_name);
1174  if (tmqh == NULL)
1175  goto error;
1176 
1177  tv->tmqh_out = tmqh->OutHandler;
1178  tv->outqh_name = tmqh->name;
1179 
1180  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1181  SCLogDebug("outq_name \"%s\"", outq_name);
1182 
1183  if (tmqh->OutHandlerCtxSetup != NULL) {
1184  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1185  if (tv->outctx == NULL)
1186  goto error;
1187  tv->outq = NULL;
1188  } else {
1189  tmq = TmqGetQueueByName(outq_name);
1190  if (tmq == NULL) {
1191  tmq = TmqCreateQueue(outq_name);
1192  if (tmq == NULL)
1193  goto error;
1194  }
1195  SCLogDebug("tmq %p", tmq);
1196 
1197  tv->outq = tmq;
1198  tv->outctx = NULL;
1199  tv->outq->writer_cnt++;
1200  }
1201  }
1202  }
1203 
1204  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1205  goto error;
1206  }
1207 
1208  if (mucond != 0)
1209  TmThreadInitMC(tv);
1210 
1211  return tv;
1212 
1213 error:
1214  SCLogError(SC_ERR_THREAD_CREATE, "failed to setup a thread");
1215 
1216  if (tv != NULL)
1217  SCFree(tv);
1218  return NULL;
1219 }
1220 
1221 /**
1222  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1223  * This function doesn't support custom slots, and hence shouldn't be
1224  * supplied \"custom\" as its slot type. All PPT threads are created
1225  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1226  * conditional variables are not used to kill the thread.
1227  *
1228  * \param name Name of this TV instance
1229  * \param inq_name Incoming queue name
1230  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1231  * \param outq_name Outgoing queue name
1232  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1233  * \param slots String representation for the slot function to be used
1234  *
1235  * \retval the newly created TV instance, or NULL on error
1236  */
1237 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1238  const char *inqh_name, const char *outq_name,
1239  const char *outqh_name, const char *slots)
1240 {
1241  ThreadVars *tv = NULL;
1242 
1243  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1244  slots, NULL, 0);
1245 
1246  if (tv != NULL) {
1247  tv->type = TVT_PPT;
1248  tv->id = TmThreadsRegisterThread(tv, tv->type);
1249  }
1250 
1251 
1252  return tv;
1253 }
1254 
1255 /**
1256  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1257  * This function supports only custom slot functions and hence a
1258  * function pointer should be sent as an argument.
1259  *
1260  * \param name Name of this TV instance
1261  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1262  * \param mucond Flag to indicate whether to initialize the condition
1263  * and the mutex variables for this newly created TV.
1264  *
1265  * \retval the newly created TV instance, or NULL on error
1266  */
1267 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1268  int mucond)
1269 {
1270  ThreadVars *tv = NULL;
1271 
1272  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1273 
1274  if (tv != NULL) {
1275  tv->type = TVT_MGMT;
1276  tv->id = TmThreadsRegisterThread(tv, tv->type);
1278  }
1279 
1280  return tv;
1281 }
1282 
1283 /**
1284  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1285  * This function supports only custom slot functions and hence a
1286  * function pointer should be sent as an argument.
1287  *
1288  * \param name Name of this TV instance
1289  * \param module Name of TmModule with MANAGEMENT flag set.
1290  * \param mucond Flag to indicate whether to initialize the condition
1291  * and the mutex variables for this newly created TV.
1292  *
1293  * \retval the newly created TV instance, or NULL on error
1294  */
1295 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1296  int mucond)
1297 {
1298  ThreadVars *tv = NULL;
1299 
1300  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1301 
1302  if (tv != NULL) {
1303  tv->type = TVT_MGMT;
1304  tv->id = TmThreadsRegisterThread(tv, tv->type);
1306 
1307  TmModule *m = TmModuleGetByName(module);
1308  if (m) {
1309  TmSlotSetFuncAppend(tv, m, NULL);
1310  }
1311  }
1312 
1313  return tv;
1314 }
1315 
1316 /**
1317  * \brief Creates and returns the TV instance for a Command thread (CMD).
1318  * This function supports only custom slot functions and hence a
1319  * function pointer should be sent as an argument.
1320  *
1321  * \param name Name of this TV instance
1322  * \param module Name of TmModule with COMMAND flag set.
1323  * \param mucond Flag to indicate whether to initialize the condition
1324  * and the mutex variables for this newly created TV.
1325  *
1326  * \retval the newly created TV instance, or NULL on error
1327  */
1328 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1329  int mucond)
1330 {
1331  ThreadVars *tv = NULL;
1332 
1333  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1334 
1335  if (tv != NULL) {
1336  tv->type = TVT_CMD;
1337  tv->id = TmThreadsRegisterThread(tv, tv->type);
1339 
1340  TmModule *m = TmModuleGetByName(module);
1341  if (m) {
1342  TmSlotSetFuncAppend(tv, m, NULL);
1343  }
1344  }
1345 
1346  return tv;
1347 }
1348 
1349 /**
1350  * \brief Appends this TV to tv_root based on its type
1351  *
1352  * \param type holds the type this TV belongs to.
1353  */
1355 {
1357 
1358  if (tv_root[type] == NULL) {
1359  tv_root[type] = tv;
1360  tv->next = NULL;
1361  tv->prev = NULL;
1362 
1364 
1365  return;
1366  }
1367 
1368  ThreadVars *t = tv_root[type];
1369 
1370  while (t) {
1371  if (t->next == NULL) {
1372  t->next = tv;
1373  tv->prev = t;
1374  tv->next = NULL;
1375  break;
1376  }
1377 
1378  t = t->next;
1379  }
1380 
1382 
1383  return;
1384 }
1385 
1386 /**
1387  * \brief Removes this TV from tv_root based on its type
1388  *
1389  * \param tv The tv instance to remove from the global tv list.
1390  * \param type Holds the type this TV belongs to.
1391  */
1393 {
1395 
1396  if (tv_root[type] == NULL) {
1398 
1399  return;
1400  }
1401 
1402  ThreadVars *t = tv_root[type];
1403  while (t != tv) {
1404  t = t->next;
1405  }
1406 
1407  if (t != NULL) {
1408  if (t->prev != NULL)
1409  t->prev->next = t->next;
1410  if (t->next != NULL)
1411  t->next->prev = t->prev;
1412 
1413  if (t == tv_root[type])
1414  tv_root[type] = t->next;;
1415  }
1416 
1418 
1419  return;
1420 }
1421 
1422 /**
1423  * \brief Kill a thread.
1424  *
1425  * \param tv A ThreadVars instance corresponding to the thread that has to be
1426  * killed.
1427  *
1428  * \retval r 1 killed succesfully
1429  * 0 not yet ready, needs another look
1430  */
1431 static int TmThreadKillThread(ThreadVars *tv)
1432 {
1433  int i = 0;
1434 
1435  BUG_ON(tv == NULL);
1436 
1437  /* kill only once :) */
1438  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1439  return 1;
1440  }
1441 
1442  if (tv->inq != NULL) {
1443  /* we wait till we dry out all the inq packets, before we
1444  * kill this thread. Do note that you should have disabled
1445  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1446  if (!(strlen(tv->inq->name) == strlen("packetpool") &&
1447  strcasecmp(tv->inq->name, "packetpool") == 0)) {
1448  PacketQueue *q = &trans_q[tv->inq->id];
1449  if (q->len != 0) {
1450  return 0;
1451  }
1452  }
1453  }
1454 
1455  /* set the thread flag informing the thread that it needs to be
1456  * terminated */
1459 
1460  /* to be sure, signal more */
1461  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1462  if (tv->InShutdownHandler != NULL) {
1463  tv->InShutdownHandler(tv);
1464  }
1465  if (tv->inq != NULL) {
1466  for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1467  SCCondSignal(&trans_q[tv->inq->id].cond_q);
1468  }
1469  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1470  }
1471 
1472  if (tv->ctrl_cond != NULL ) {
1473  pthread_cond_broadcast(tv->ctrl_cond);
1474  }
1475  return 0;
1476  }
1477 
1478  if (tv->outctx != NULL) {
1480  if (tmqh == NULL)
1481  BUG_ON(1);
1482 
1483  if (tmqh->OutHandlerCtxFree != NULL) {
1484  tmqh->OutHandlerCtxFree(tv->outctx);
1485  tv->outctx = NULL;
1486  }
1487  }
1488 
1489  /* join it and flag it as dead */
1490  pthread_join(tv->t, NULL);
1491  SCLogDebug("thread %s stopped", tv->name);
1493  return 1;
1494 }
1495 
1496 /** \internal
1497  *
1498  * \brief make sure that all packet threads are done processing their
1499  * in-flight packets
1500  */
1501 static void TmThreadDrainPacketThreads(void)
1502 {
1503  ThreadVars *tv = NULL;
1504  struct timeval start_ts;
1505  struct timeval cur_ts;
1506  gettimeofday(&start_ts, NULL);
1507 
1508 again:
1509  gettimeofday(&cur_ts, NULL);
1510  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1511  SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads "
1512  "to process their packets in time");
1513  return;
1514  }
1515 
1517 
1518  /* all receive threads are part of packet processing threads */
1519  tv = tv_root[TVT_PPT];
1520  while (tv) {
1521  if (tv->inq != NULL) {
1522  /* we wait till we dry out all the inq packets, before we
1523  * kill this thread. Do note that you should have disabled
1524  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1525  if (!(strlen(tv->inq->name) == strlen("packetpool") &&
1526  strcasecmp(tv->inq->name, "packetpool") == 0)) {
1527  PacketQueue *q = &trans_q[tv->inq->id];
1528  if (q->len != 0) {
1530 
1531  /* sleep outside lock */
1532  SleepMsec(1);
1533  goto again;
1534  }
1535  }
1536  }
1537 
1538  tv = tv->next;
1539  }
1540 
1542  return;
1543 }
1544 
1545 /**
1546  * \brief Disable all threads having the specified TMs.
1547  *
1548  * Breaks out of the packet acquisition loop, and bumps
1549  * into the 'flow loop', where it will process packets
1550  * from the flow engine's shutdown handling.
1551  */
1553 {
1554  ThreadVars *tv = NULL;
1555  struct timeval start_ts;
1556  struct timeval cur_ts;
1557  gettimeofday(&start_ts, NULL);
1558 
1559 again:
1560  gettimeofday(&cur_ts, NULL);
1561  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1562  FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1563  "thread - \"%s\". Killing engine", tv->name);
1564  }
1565 
1567 
1568  /* all receive threads are part of packet processing threads */
1569  tv = tv_root[TVT_PPT];
1570 
1571  /* we do have to keep in mind that TVs are arranged in the order
1572  * right from receive to log. The moment we fail to find a
1573  * receive TM amongst the slots in a tv, it indicates we are done
1574  * with all receive threads */
1575  while (tv) {
1576  int disable = 0;
1577  TmModule *tm = NULL;
1578  /* obtain the slots for this TV */
1579  TmSlot *slots = tv->tm_slots;
1580  while (slots != NULL) {
1581  tm = TmModuleGetById(slots->tm_id);
1582 
1583  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1584  disable = 1;
1585  break;
1586  }
1587 
1588  slots = slots->slot_next;
1589  continue;
1590  }
1591 
1592  if (disable) {
1593  if (tv->inq != NULL) {
1594  /* we wait till we dry out all the inq packets, before we
1595  * kill this thread. Do note that you should have disabled
1596  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1597  if (!(strlen(tv->inq->name) == strlen("packetpool") &&
1598  strcasecmp(tv->inq->name, "packetpool") == 0)) {
1599  PacketQueue *q = &trans_q[tv->inq->id];
1600  if (q->len != 0) {
1602  /* don't sleep while holding a lock */
1603  SleepMsec(1);
1604  goto again;
1605  }
1606  }
1607  }
1608 
1609  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1610  if (tm && tm->PktAcqBreakLoop != NULL) {
1611  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1612  }
1614 
1615  if (tv->inq != NULL) {
1616  int i;
1617  for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1618  SCCondSignal(&trans_q[tv->inq->id].cond_q);
1619  }
1620  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1621  }
1622 
1623  /* wait for it to enter the 'flow loop' stage */
1624  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1626 
1627  SleepMsec(1);
1628  goto again;
1629  }
1630  }
1631 
1632  tv = tv->next;
1633  }
1634 
1636 
1637  /* finally wait for all packet threads to have
1638  * processed all of their 'live' packets so we
1639  * don't process the last live packets together
1640  * with FFR packets */
1641  TmThreadDrainPacketThreads();
1642  return;
1643 }
1644 
1645 /**
1646  * \brief Disable all threads having the specified TMs.
1647  */
1649 {
1650  ThreadVars *tv = NULL;
1651  struct timeval start_ts;
1652  struct timeval cur_ts;
1653 
1654  /* first drain all packet threads of their packets */
1655  TmThreadDrainPacketThreads();
1656 
1657  gettimeofday(&start_ts, NULL);
1658 again:
1659  gettimeofday(&cur_ts, NULL);
1660  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1661  FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1662  "thread - \"%s\". Killing engine",
1663  tv ? tv->name : "<unknown>");
1664  }
1665 
1667 
1668  /* all receive threads are part of packet processing threads */
1669  tv = tv_root[TVT_PPT];
1670 
1671  /* we do have to keep in mind that TVs are arranged in the order
1672  * right from receive to log. The moment we fail to find a
1673  * receive TM amongst the slots in a tv, it indicates we are done
1674  * with all receive threads */
1675  while (tv) {
1676  if (tv->inq != NULL) {
1677  /* we wait till we dry out all the inq packets, before we
1678  * kill this thread. Do note that you should have disabled
1679  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1680  if (!(strlen(tv->inq->name) == strlen("packetpool") &&
1681  strcasecmp(tv->inq->name, "packetpool") == 0)) {
1682  PacketQueue *q = &trans_q[tv->inq->id];
1683  if (q->len != 0) {
1685  /* don't sleep while holding a lock */
1686  SleepMsec(1);
1687  goto again;
1688  }
1689  }
1690  }
1691 
1692  /* we found our receive TV. Send it a KILL signal. This is all
1693  * we need to do to kill receive threads */
1695 
1696  if (tv->inq != NULL) {
1697  int i;
1698  for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1699  SCCondSignal(&trans_q[tv->inq->id].cond_q);
1700  }
1701  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1702  }
1703 
1704  while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
1706 
1707  SleepMsec(1);
1708  goto again;
1709  }
1710 
1711  tv = tv->next;
1712  }
1713 
1715 
1716  return;
1717 }
1718 
1720 {
1721  ThreadVars *tv = NULL;
1722  TmSlot *slots = NULL;
1723 
1725 
1726  /* all receive threads are part of packet processing threads */
1727  tv = tv_root[TVT_PPT];
1728 
1729  while (tv) {
1730  slots = tv->tm_slots;
1731 
1732  while (slots != NULL) {
1733  TmModule *tm = TmModuleGetById(slots->tm_id);
1734 
1735  char *found = strstr(tm->name, tm_name);
1736  if (found != NULL)
1737  goto end;
1738 
1739  slots = slots->slot_next;
1740  }
1741 
1742  tv = tv->next;
1743  }
1744 
1745  end:
1747  return slots;
1748 }
1749 
1750 #define MIN_WAIT_TIME 100
1751 #define MAX_WAIT_TIME 999999
1753 {
1754  ThreadVars *tv = NULL;
1755  unsigned int sleep_usec = MIN_WAIT_TIME;
1756 
1757  BUG_ON((family < 0) || (family >= TVT_MAX));
1758 
1759 again:
1761  tv = tv_root[family];
1762 
1763  while (tv) {
1764  int r = TmThreadKillThread(tv);
1765  if (r == 0) {
1767  SleepUsec(sleep_usec);
1768  sleep_usec *= 2; /* slowly back off */
1769  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1770  goto again;
1771  }
1772  sleep_usec = MIN_WAIT_TIME; /* reset */
1773 
1774  tv = tv->next;
1775  }
1777 }
1778 #undef MIN_WAIT_TIME
1779 #undef MAX_WAIT_TIME
1780 
1782 {
1783  int i = 0;
1784 
1785  for (i = 0; i < TVT_MAX; i++) {
1787  }
1788 
1789  return;
1790 }
1791 
1792 static void TmThreadFree(ThreadVars *tv)
1793 {
1794  TmSlot *s;
1795  TmSlot *ps;
1796  if (tv == NULL)
1797  return;
1798 
1799  SCLogDebug("Freeing thread '%s'.", tv->name);
1800 
1801  StatsThreadCleanup(tv);
1802 
1803  TmThreadDeinitMC(tv);
1804 
1805  if (tv->thread_group_name) {
1807  }
1808 
1809  if (tv->printable_name) {
1810  SCFree(tv->printable_name);
1811  }
1812 
1813  s = (TmSlot *)tv->tm_slots;
1814  while (s) {
1815  ps = s;
1816  s = s->slot_next;
1817  SCFree(ps);
1818  }
1819 
1821  SCFree(tv);
1822 }
1823 
1824 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1825 {
1826  char *thread_group_name = NULL;
1827 
1828  if (name == NULL)
1829  return;
1830 
1831  if (tv == NULL)
1832  return;
1833 
1834  thread_group_name = SCStrdup(name);
1835  if (unlikely(thread_group_name == NULL)) {
1836  SCLogError(SC_ERR_RUNMODE, "error allocating memory");
1837  return;
1838  }
1839  tv->thread_group_name = thread_group_name;
1840 }
1841 
1843 {
1844  ThreadVars *tv = NULL;
1845  ThreadVars *ptv = NULL;
1846 
1847  if ((family < 0) || (family >= TVT_MAX))
1848  return;
1849 
1851  tv = tv_root[family];
1852 
1853  while (tv) {
1854  ptv = tv;
1855  tv = tv->next;
1856  TmThreadFree(ptv);
1857  }
1858  tv_root[family] = NULL;
1860 }
1861 
1862 /**
1863  * \brief Spawns a thread associated with the ThreadVars instance tv
1864  *
1865  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1866  */
1868 {
1869  pthread_attr_t attr;
1870  if (tv->tm_func == NULL) {
1871  printf("ERROR: no thread function set\n");
1872  return TM_ECODE_FAILED;
1873  }
1874 
1875  /* Initialize and set thread detached attribute */
1876  pthread_attr_init(&attr);
1877 
1878  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1879 
1880  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1881  if (rc) {
1882  printf("ERROR; return code from pthread_create() is %" PRId32 "\n", rc);
1883  return TM_ECODE_FAILED;
1884  }
1885 
1887 
1888  TmThreadAppend(tv, tv->type);
1889  return TM_ECODE_OK;
1890 }
1891 
1892 /**
1893  * \brief Sets the thread flags for a thread instance(tv)
1894  *
1895  * \param tv Pointer to the thread instance for which the flag has to be set
1896  * \param flags Holds the thread state this thread instance has to be set to
1897  */
1898 #if 0
1899 void TmThreadSetFlags(ThreadVars *tv, uint8_t flags)
1900 {
1901  if (tv != NULL)
1902  tv->flags = flags;
1903 
1904  return;
1905 }
1906 #endif
1907 
1908 /**
1909  * \brief Initializes the mutex and condition variables for this TV
1910  *
1911  * It can be used by a thread to control a wait loop that can also be
1912  * influenced by other threads.
1913  *
1914  * \param tv Pointer to a TV instance
1915  */
1917 {
1918  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1919  SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. "
1920  "Exiting...");
1921  exit(EXIT_FAILURE);
1922  }
1923 
1924  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1925  printf("Error initializing the tv->m mutex\n");
1926  exit(EXIT_FAILURE);
1927  }
1928 
1929  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1930  SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. "
1931  "Exiting...");
1932  exit(EXIT_FAILURE);
1933  }
1934 
1935  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1936  SCLogError(SC_ERR_FATAL, "Error initializing the tv->cond condition "
1937  "variable");
1938  exit(EXIT_FAILURE);
1939  }
1940 
1941  return;
1942 }
1943 
1944 static void TmThreadDeinitMC(ThreadVars *tv)
1945 {
1946  if (tv->ctrl_mutex) {
1948  SCFree(tv->ctrl_mutex);
1949  }
1950  if (tv->ctrl_cond) {
1952  SCFree(tv->ctrl_cond);
1953  }
1954  return;
1955 }
1956 
1957 /**
1958  * \brief Tests if the thread represented in the arg has been unpaused or not.
1959  *
1960  * The function would return if the thread tv has been unpaused or if the
1961  * kill flag for the thread has been set.
1962  *
1963  * \param tv Pointer to the TV instance.
1964  */
1966 {
1967  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1968  SleepUsec(100);
1969 
1970  if (TmThreadsCheckFlag(tv, THV_KILL))
1971  break;
1972  }
1973 
1974  return;
1975 }
1976 
1977 /**
1978  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1979  * the kill flag has been set or not on the thread.
1980  *
1981  * \param tv Pointer to the TV instance.
1982  */
1983 void TmThreadWaitForFlag(ThreadVars *tv, uint16_t flags)
1984 {
1985  while (!TmThreadsCheckFlag(tv, flags)) {
1986  SleepUsec(100);
1987  }
1988 
1989  return;
1990 }
1991 
1992 /**
1993  * \brief Unpauses a thread
1994  *
1995  * \param tv Pointer to a TV instance that has to be unpaused
1996  */
1998 {
2000 
2001  return;
2002 }
2003 
2004 /**
2005  * \brief Unpauses all threads present in tv_root
2006  */
2008 {
2009  ThreadVars *tv = NULL;
2010  int i = 0;
2011 
2013  for (i = 0; i < TVT_MAX; i++) {
2014  tv = tv_root[i];
2015  while (tv != NULL) {
2016  TmThreadContinue(tv);
2017  tv = tv->next;
2018  }
2019  }
2021 
2022  return;
2023 }
2024 
2025 /**
2026  * \brief Pauses a thread
2027  *
2028  * \param tv Pointer to a TV instance that has to be paused
2029  */
2031 {
2033 
2034  return;
2035 }
2036 
2037 /**
2038  * \brief Pauses all threads present in tv_root
2039  */
2041 {
2042  ThreadVars *tv = NULL;
2043  int i = 0;
2044 
2046 
2048  for (i = 0; i < TVT_MAX; i++) {
2049  tv = tv_root[i];
2050  while (tv != NULL) {
2051  TmThreadPause(tv);
2052  tv = tv->next;
2053  }
2054  }
2056 
2057  return;
2058 }
2059 
2060 /**
2061  * \brief Used to check the thread for certain conditions of failure.
2062  */
2064 {
2065  ThreadVars *tv = NULL;
2066  int i = 0;
2067 
2069  for (i = 0; i < TVT_MAX; i++) {
2070  tv = tv_root[i];
2071 
2072  while (tv) {
2073  if (TmThreadsCheckFlag(tv, THV_FAILED)) {
2074  FatalError(SC_ERR_FATAL, "thread %s failed", tv->name);
2075  }
2076  tv = tv->next;
2077  }
2078  }
2080  return;
2081 }
2082 
2083 /**
2084  * \brief Used to check if all threads have finished their initialization. On
2085  * finding an un-initialized thread, it waits till that thread completes
2086  * its initialization, before proceeding to the next thread.
2087  *
2088  * \retval TM_ECODE_OK all initialized properly
2089  * \retval TM_ECODE_FAILED failure
2090  */
2092 {
2093  ThreadVars *tv = NULL;
2094  int i = 0;
2095  uint16_t mgt_num = 0;
2096  uint16_t ppt_num = 0;
2097 
2098  struct timeval start_ts;
2099  struct timeval cur_ts;
2100  gettimeofday(&start_ts, NULL);
2101 
2102 again:
2104  for (i = 0; i < TVT_MAX; i++) {
2105  tv = tv_root[i];
2106  while (tv != NULL) {
2107  if (TmThreadsCheckFlag(tv, (THV_CLOSED|THV_DEAD))) {
2109 
2110  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
2111  "initialize: flags %04x", tv->name,
2112  SC_ATOMIC_GET(tv->flags));
2113  return TM_ECODE_FAILED;
2114  }
2115 
2116  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
2118 
2119  gettimeofday(&cur_ts, NULL);
2120  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
2121  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
2122  "initialize in time: flags %04x", tv->name,
2123  SC_ATOMIC_GET(tv->flags));
2124  return TM_ECODE_FAILED;
2125  }
2126 
2127  /* sleep a little to give the thread some
2128  * time to finish initialization */
2129  SleepUsec(100);
2130  goto again;
2131  }
2132 
2133  if (TmThreadsCheckFlag(tv, THV_FAILED)) {
2135  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
2136  "initialize.", tv->name);
2137  return TM_ECODE_FAILED;
2138  }
2139  if (TmThreadsCheckFlag(tv, THV_CLOSED)) {
2141  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" closed on "
2142  "initialization.", tv->name);
2143  return TM_ECODE_FAILED;
2144  }
2145 
2146  if (i == TVT_MGMT)
2147  mgt_num++;
2148  else if (i == TVT_PPT)
2149  ppt_num++;
2150 
2151  tv = tv->next;
2152  }
2153  }
2155 
2156  SCLogNotice("all %"PRIu16" packet processing threads, %"PRIu16" management "
2157  "threads initialized, engine started.", ppt_num, mgt_num);
2158 
2159  return TM_ECODE_OK;
2160 }
2161 
2162 /**
2163  * \brief Returns the TV for the calling thread.
2164  *
2165  * \retval tv Pointer to the ThreadVars instance for the calling thread;
2166  * NULL on no match
2167  */
2169 {
2170  pthread_t self = pthread_self();
2171  ThreadVars *tv = NULL;
2172  int i = 0;
2173 
2175 
2176  for (i = 0; i < TVT_MAX; i++) {
2177  tv = tv_root[i];
2178  while (tv) {
2179  if (pthread_equal(self, tv->t)) {
2181  return tv;
2182  }
2183  tv = tv->next;
2184  }
2185  }
2186 
2188 
2189  return NULL;
2190 }
2191 
2192 /**
2193  * \brief returns a count of all the threads that match the flag
2194  */
2195 uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
2196 {
2197  ThreadVars *tv = NULL;
2198  int i = 0;
2199  uint32_t cnt = 0;
2200 
2202  for (i = 0; i < TVT_MAX; i++) {
2203  tv = tv_root[i];
2204  while (tv != NULL) {
2205  if ((tv->tmm_flags & flags) == flags)
2206  cnt++;
2207 
2208  tv = tv->next;
2209  }
2210  }
2212  return cnt;
2213 }
2214 
2215 typedef struct Thread_ {
2216  ThreadVars *tv; /**< threadvars structure */
2217  const char *name;
2218  int type;
2219  int in_use; /**< bool to indicate this is in use */
2220 
2221  struct timeval ts; /**< current time of this thread (offline mode) */
2222 } Thread;
2223 
2224 typedef struct Threads_ {
2225  Thread *threads;
2228 } Threads;
2229 
2230 static Threads thread_store = { NULL, 0, 0 };
2231 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2232 
2234 {
2235  Thread *t;
2236  size_t s;
2237 
2238  SCMutexLock(&thread_store_lock);
2239 
2240  for (s = 0; s < thread_store.threads_size; s++) {
2241  t = &thread_store.threads[s];
2242  if (t == NULL || t->in_use == 0)
2243  continue;
2244  SCLogInfo("Thread %"PRIuMAX", %s type %d, tv %p", (uintmax_t)s+1, t->name, t->type, t->tv);
2245  }
2246 
2247  SCMutexUnlock(&thread_store_lock);
2248 }
2249 
2250 #define STEP 32
2251 /**
2252  * \retval id thread id, or 0 if not found
2253  */
2255 {
2256  SCMutexLock(&thread_store_lock);
2257  if (thread_store.threads == NULL) {
2258  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2259  BUG_ON(thread_store.threads == NULL);
2260  thread_store.threads_size = STEP;
2261  }
2262 
2263  size_t s;
2264  for (s = 0; s < thread_store.threads_size; s++) {
2265  if (thread_store.threads[s].in_use == 0) {
2266  Thread *t = &thread_store.threads[s];
2267  t->name = tv->name;
2268  t->type = type;
2269  t->tv = tv;
2270  t->in_use = 1;
2271 
2272  SCMutexUnlock(&thread_store_lock);
2273  return (int)(s+1);
2274  }
2275  }
2276 
2277  /* if we get here the array is completely filled */
2278  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2279  BUG_ON(newmem == NULL);
2280  thread_store.threads = newmem;
2281  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2282 
2283  Thread *t = &thread_store.threads[thread_store.threads_size];
2284  t->name = tv->name;
2285  t->type = type;
2286  t->tv = tv;
2287  t->in_use = 1;
2288 
2289  s = thread_store.threads_size;
2290  thread_store.threads_size += STEP;
2291 
2292  SCMutexUnlock(&thread_store_lock);
2293  return (int)(s+1);
2294 }
2295 #undef STEP
2296 
2297 void TmThreadsUnregisterThread(const int id)
2298 {
2299  SCMutexLock(&thread_store_lock);
2300  if (id <= 0 || id > (int)thread_store.threads_size) {
2301  SCMutexUnlock(&thread_store_lock);
2302  return;
2303  }
2304 
2305  /* id is one higher than index */
2306  int idx = id - 1;
2307 
2308  /* reset thread_id, which serves as clearing the record */
2309  thread_store.threads[idx].in_use = 0;
2310 
2311  /* check if we have at least one registered thread left */
2312  size_t s;
2313  for (s = 0; s < thread_store.threads_size; s++) {
2314  Thread *t = &thread_store.threads[s];
2315  if (t->in_use == 1) {
2316  goto end;
2317  }
2318  }
2319 
2320  /* if we get here no threads are registered */
2321  SCFree(thread_store.threads);
2322  thread_store.threads = NULL;
2323  thread_store.threads_size = 0;
2324  thread_store.threads_cnt = 0;
2325 
2326 end:
2327  SCMutexUnlock(&thread_store_lock);
2328 }
2329 
2330 void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
2331 {
2332  SCMutexLock(&thread_store_lock);
2333  if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2334  SCMutexUnlock(&thread_store_lock);
2335  return;
2336  }
2337 
2338  int idx = id - 1;
2339  Thread *t = &thread_store.threads[idx];
2340  t->ts.tv_sec = ts->tv_sec;
2341  t->ts.tv_usec = ts->tv_usec;
2342  SCMutexUnlock(&thread_store_lock);
2343 }
2344 
2345 #define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec) // XXX unify with flow-util.h
2346 void TmreadsGetMinimalTimestamp(struct timeval *ts)
2347 {
2348  struct timeval local, nullts;
2349  memset(&local, 0, sizeof(local));
2350  memset(&nullts, 0, sizeof(nullts));
2351  int set = 0;
2352  size_t s;
2353 
2354  SCMutexLock(&thread_store_lock);
2355  for (s = 0; s < thread_store.threads_size; s++) {
2356  Thread *t = &thread_store.threads[s];
2357  if (t == NULL || t->in_use == 0)
2358  continue;
2359  if (!(timercmp(&t->ts, &nullts, ==))) {
2360  if (!set) {
2361  local.tv_sec = t->ts.tv_sec;
2362  local.tv_usec = t->ts.tv_usec;
2363  set = 1;
2364  } else {
2365  if (timercmp(&t->ts, &local, <)) {
2366  COPY_TIMESTAMP(&t->ts, &local);
2367  }
2368  }
2369  }
2370  }
2371  SCMutexUnlock(&thread_store_lock);
2372  COPY_TIMESTAMP(&local, ts);
2373  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2374 }
2375 #undef COPY_TIMESTAMP
2376 
2377 /**
2378  * \retval r 1 if packet was accepted, 0 otherwise
2379  * \note if packet was not accepted, it's still the responsibility
2380  * of the caller.
2381  */
2382 int TmThreadsInjectPacketsById(Packet **packets, const int id)
2383 {
2384  if (id <= 0 || id > (int)thread_store.threads_size)
2385  return 0;
2386 
2387  int idx = id - 1;
2388 
2389  Thread *t = &thread_store.threads[idx];
2390  ThreadVars *tv = t->tv;
2391 
2392  if (tv == NULL || tv->stream_pq == NULL)
2393  return 0;
2394 
2395  SCMutexLock(&tv->stream_pq->mutex_q);
2396  while (*packets != NULL) {
2397  PacketEnqueue(tv->stream_pq, *packets);
2398  packets++;
2399  }
2401 
2402  /* wake up listening thread(s) if necessary */
2403  if (tv->inq != NULL) {
2404  SCCondSignal(&trans_q[tv->inq->id].cond_q);
2405  }
2406  return 1;
2407 }
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value from our atomic variable.
Definition: util-atomic.h:154
Packet *(* InHandler)(ThreadVars *)
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:88
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:821
#define THREAD_SET_PRIORITY
Definition: threadvars.h:117
const char * name
#define SCMutex
#define SCDropCaps(...)
Definition: util-privs.h:37
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:56
uint16_t flags
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:431
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:1025
#define SCCtrlMutexDestroy
#define SCLogDebug(...)
Definition: util-debug.h:335
void TmThreadSetFlags(ThreadVars *, uint8_t)
uint8_t type
Definition: threadvars.h:92
int AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
__thread uint64_t rww_lock_contention
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:1041
uint8_t cap_flags
Definition: tm-modules.h:67
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:2063
void TmThreadPause(ThreadVars *tv)
Pauses a thread.
Definition: tm-threads.c:2030
SCCondT cond_q
Definition: decode.h:628
uint16_t writer_cnt
Definition: tm-queues.h:31
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:64
int thread_priority
Definition: threadvars.h:96
#define BUG_ON(x)
#define THV_KILL_PKTACQ
Definition: threadvars.h:47
uint8_t flags
Definition: tm-modules.h:70
#define SCSetThreadName(n)
Definition: threads.h:299
#define THV_RUNNING_DONE
Definition: threadvars.h:45
#define STEP
Definition: tm-threads.c:2250
struct ThreadVars_ * prev
Definition: threadvars.h:112
void TmThreadDisablePacketThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1648
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1267
int TmThreadsInjectPacketsById(Packet **packets, const int id)
Definition: tm-threads.c:2382
#define unlikely(expr)
Definition: util-optimize.h:35
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1310
TmEcode(* Func)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *)
Definition: tm-modules.h:52
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:969
pthread_t t
Definition: threadvars.h:58
TmSlot * TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name)
Definition: tm-threads.c:1719
#define THV_DEAD
Definition: threadvars.h:54
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:60
uint16_t id
Definition: tm-queues.h:29
#define SCCtrlCondInit
#define MIN(x, y)
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:107
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn&#39;t support custo...
Definition: tm-threads.c:1237
int type
Definition: tm-threads.c:2218
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1824
#define THV_PAUSED
Definition: threadvars.h:38
#define COPY_TIMESTAMP(src, dst)
Definition: tm-threads.c:2345
__thread uint64_t spin_lock_wait_ticks
struct Thread_ Thread
#define THV_FAILED
Definition: threadvars.h:40
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:199
void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
Definition: tm-threads.c:2330
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value from our atomic variable.
Definition: util-atomic.h:141
__thread uint64_t spin_lock_contention
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1328
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1842
#define THREAD_SET_AFFINITY
Definition: threadvars.h:116
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1997
void(* InShutdownHandler)(struct ThreadVars_ *)
Definition: threadvars.h:79
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
#define SCMutexLock(mut)
__thread uint64_t rwr_lock_wait_ticks
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread...
Definition: tm-threads.c:2091
ThreadVars * TmThreadsGetCallingThread(void)
Returns the TV for the calling thread.
Definition: tm-threads.c:2168
char * thread_group_name
Definition: threadvars.h:61
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:106
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:31
uint8_t thread_setup_flags
Definition: threadvars.h:89
void TmThreadsListThreads(void)
Definition: tm-threads.c:2233
void(* OutHandlerCtxFree)(void *)
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag)
Set a thread flag.
Definition: tm-threads.c:98
#define SC_ATOMIC_INIT(name)
Initialize the previously declared atomic variable and it&#39;s lock.
Definition: util-atomic.h:81
#define SCCalloc(nm, a)
Definition: util-mem.h:197
struct TmSlot_ * tm_slots
Definition: threadvars.h:84
#define SCMutexUnlock(mut)
void TmThreadWaitForFlag(ThreadVars *tv, uint16_t flags)
Waits till the specified flag(s) is(are) set. We don&#39;t bother if the kill flag has been set or not on...
Definition: tm-threads.c:1983
#define THV_INIT_DONE
Definition: threadvars.h:36
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tm-queues.h:27
void TmThreadRemove(ThreadVars *tv, int type)
Removes this TV from tv_root based on its type.
Definition: tm-threads.c:1392
#define SCCtrlCondDestroy
#define THV_CLOSED
Definition: threadvars.h:41
SCMutex tv_root_lock
Definition: tm-threads.c:82
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
#define SCMUTEX_INITIALIZER
uint32_t len
Definition: decode.h:623
uint8_t type
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1752
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:294
#define PACKET_PROFILING_TMM_END(p, id)
struct ThreadVars_ * next
Definition: threadvars.h:111
#define SCGetThreadIdLong(...)
Definition: threads.h:253
const void * slot_initdata
Definition: tm-threads.h:67
#define SleepMsec(msec)
Definition: tm-threads.h:44
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:80
__thread uint64_t mutex_lock_wait_ticks
size_t threads_size
Definition: tm-threads.c:2226
PacketQueue slot_post_pq
Definition: tm-threads.h:78
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
#define SCEnter(...)
Definition: util-debug.h:337
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:90
#define PACKET_PROFILING_TMM_START(p, id)
Packet * top
Definition: decode.h:621
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1295
int tm_id
Definition: tm-threads.h:81
struct TmSlot_ * slot_next
Definition: tm-threads.h:87
#define THV_USE
Definition: threadvars.h:35
void TmThreadPauseThreads()
Pauses all threads present in tv_root.
Definition: tm-threads.c:2040
void TmThreadContinueThreads()
Unpauses all threads present in tv_root.
Definition: tm-threads.c:2007
void PacketPoolInitEmpty(void)
void TmThreadsUnsetFlag(ThreadVars *tv, uint16_t flag)
Unset a thread flag.
Definition: tm-threads.c:106
PacketQueue slot_pre_pq
Definition: tm-threads.h:73
Tmq * outq
Definition: threadvars.h:73
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1354
uint8_t tmm_flags
Definition: threadvars.h:66
#define SCMutexInit(mut, mutattrs)
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:116
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2254
void PacketPoolDestroy(void)
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:62
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
#define THV_PAUSE
Definition: threadvars.h:37
void(* OutHandler)(ThreadVars *, Packet *)
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:63
struct Threads_ Threads
#define THV_KILL
Definition: threadvars.h:39
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:281
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:118
__thread uint64_t mutex_lock_contention
__thread uint64_t rwr_lock_cnt
#define SCRealloc(x, a)
Definition: util-mem.h:182
uint16_t cpu_affinity
Definition: threadvars.h:94
#define THV_FLOW_LOOP
Definition: threadvars.h:48
TmSlot * TmSlotGetSlotForTM(int tm_id)
Returns the slot holding a TM with the particular tm_id.
Definition: tm-threads.c:870
void PacketPoolInit(void)
void TmThreadKillThreads(void)
Definition: tm-threads.c:1781
uint16_t reader_cnt
Definition: tm-queues.h:30
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:34
const char * name
Definition: tm-modules.h:44
Tmqh * TmqhGetQueueHandlerByName(const char *name)
#define MAX_WAIT_TIME
Definition: tm-threads.c:1751
void TmreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2346
#define SCMalloc(a)
Definition: util-mem.h:166
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:207
ThreadVars * tv
Definition: tm-threads.h:55
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2297
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:254
ThreadVars * tv
Definition: tm-threads.c:2216
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:101
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:1057
#define SCFree(a)
Definition: util-mem.h:228
void *(* tm_func)(void *)
Definition: threadvars.h:83
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:269
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1552
#define SCCondSignal
const char * outqh_name
Definition: threadvars.h:75
char * printable_name
Definition: threadvars.h:60
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2195
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:980
#define SleepUsec(usec)
Definition: tm-threads.h:43
#define MIN_WAIT_TIME
Definition: tm-threads.c:1750
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:107
PacketQueue trans_q[256]
Definition: suricata.h:132
#define SCLogPerf(...)
Definition: util-debug.h:261
#define FatalError(x,...)
Definition: util-debug.h:539
ThreadVars * TmThreadsGetTVContainingSlot(TmSlot *tm_slot)
Definition: tm-threads.c:784
__thread uint64_t rwr_lock_contention
__thread uint64_t rww_lock_cnt
void(* InShutdownHandler)(ThreadVars *)
TmEcode(* TmSlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *)
Definition: tm-threads.h:50
SCMutex m
Definition: flow-hash.h:105
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1201
__thread uint64_t rww_lock_wait_ticks
void * outctx
Definition: threadvars.h:74
struct timeval ts
Definition: tm-threads.c:2221
int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:90
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:192
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:1016
#define StatsSyncCounters(tv)
Definition: counters.h:133
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:167
int in_use
Definition: tm-threads.c:2219
void TmThreadInitMC(ThreadVars *tv)
Sets the thread flags for a thread instance(tv)
Definition: tm-threads.c:1916
const char * name
Definition: tm-threads.c:2217
#define SCStrdup(a)
Definition: util-mem.h:212
char name[16]
Definition: threadvars.h:59
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:51
Thread * threads
Definition: tm-threads.c:2225
struct PacketQueue_ * stream_pq
Definition: threadvars.h:87
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:79
#define SCReturn
Definition: util-debug.h:339
#define THV_DEINIT
Definition: threadvars.h:44
Per thread variable structure.
Definition: threadvars.h:57
__thread uint64_t mutex_lock_cnt
#define SCCtrlMutexInit(mut, mutattr)
char * name
Definition: tm-queues.h:28
void *(* OutHandlerCtxSetup)(const char *)
Tmq * inq
Definition: threadvars.h:72
uint8_t cap_flags
Definition: threadvars.h:109
int id
Definition: tm-threads.h:84
int threading_set_cpu_affinity
Definition: runmodes.h:112
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:78
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1867
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:1116
Tmq * TmqCreateQueue(const char *name)
Definition: tm-queues.c:36
int threads_cnt
Definition: tm-threads.c:2227
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:139
__thread uint64_t spin_lock_cnt
SCMutex mutex_q
Definition: decode.h:627
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1965
void TmqhReleasePacketsToPacketPool(PacketQueue *pq)
Release all the packets in the queue back to the packetpool. Mainly used by threads that have failed...