suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2017 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-debug.h"
39 #include "util-privs.h"
40 #include "util-cpu.h"
41 #include "util-optimize.h"
42 #include "util-profiling.h"
43 #include "util-signal.h"
44 #include "queue.h"
45 
46 #ifdef PROFILE_LOCKING
47 __thread uint64_t mutex_lock_contention;
48 __thread uint64_t mutex_lock_wait_ticks;
49 __thread uint64_t mutex_lock_cnt;
50 
51 __thread uint64_t spin_lock_contention;
52 __thread uint64_t spin_lock_wait_ticks;
53 __thread uint64_t spin_lock_cnt;
54 
55 __thread uint64_t rww_lock_contention;
56 __thread uint64_t rww_lock_wait_ticks;
57 __thread uint64_t rww_lock_cnt;
58 
59 __thread uint64_t rwr_lock_contention;
60 __thread uint64_t rwr_lock_wait_ticks;
61 __thread uint64_t rwr_lock_cnt;
62 #endif
63 
64 #ifdef OS_FREEBSD
65 #include <sched.h>
66 #include <sys/param.h>
67 #include <sys/resource.h>
68 #include <sys/cpuset.h>
69 #include <sys/thr.h>
70 #define cpu_set_t cpuset_t
71 #endif /* OS_FREEBSD */
72 
73 /* prototypes */
74 static int SetCPUAffinity(uint16_t cpu);
75 
76 static void TmThreadDeinitMC(ThreadVars *tv);
77 
78 /* root of the threadvars list */
79 ThreadVars *tv_root[TVT_MAX] = { NULL };
80 
81 /* lock to protect tv_root */
83 
84 /**
85  * \brief Check if a thread flag is set.
86  *
87  * \retval 1 flag is set.
88  * \retval 0 flag is not set.
89  */
90 int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag)
91 {
92  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
93 }
94 
95 /**
96  * \brief Set a thread flag.
97  */
98 void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag)
99 {
100  SC_ATOMIC_OR(tv->flags, flag);
101 }
102 
103 /**
104  * \brief Unset a thread flag.
105  */
106 void TmThreadsUnsetFlag(ThreadVars *tv, uint16_t flag)
107 {
108  SC_ATOMIC_AND(tv->flags, ~flag);
109 }
110 
111 /**
112  * \brief Separate run function so we can call it recursively.
113  *
114  * \todo Deal with post_pq for slots beyond the first.
115  */
117  TmSlot *slot)
118 {
119  TmEcode r;
120  TmSlot *s;
121  Packet *extra_p;
122 
123  for (s = slot; s != NULL; s = s->slot_next) {
124  TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
126 
127  if (unlikely(s->id == 0)) {
128  r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq);
129  } else {
130  r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, NULL);
131  }
132 
134 
135  /* handle error */
136  if (unlikely(r == TM_ECODE_FAILED)) {
137  /* Encountered error. Return packets to packetpool and return */
139 
143 
145  return TM_ECODE_FAILED;
146  }
147 
148  /* handle new packets */
149  while (s->slot_pre_pq.top != NULL) {
150  extra_p = PacketDequeue(&s->slot_pre_pq);
151  if (unlikely(extra_p == NULL))
152  continue;
153 
154  /* see if we need to process the packet */
155  if (s->slot_next != NULL) {
156  r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
157  if (unlikely(r == TM_ECODE_FAILED)) {
159 
163 
164  TmqhOutputPacketpool(tv, extra_p);
166  return TM_ECODE_FAILED;
167  }
168  }
169  tv->tmqh_out(tv, extra_p);
170  }
171  }
172 
173  return TM_ECODE_OK;
174 }
175 
176 #ifndef AFLFUZZ_PCAP_RUNMODE
177 
178 /** \internal
179  *
180  * \brief Process flow timeout packets
181  *
182  * Process flow timeout pseudo packets. During shutdown this loop
183  * is run until the flow engine kills the thread and the queue is
184  * empty.
185  */
186 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
187 {
188  TmSlot *fw_slot = NULL;
189  int r = TM_ECODE_OK;
190 
191  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
192  if (slot->tm_id == TMM_FLOWWORKER) {
193  fw_slot = slot;
194  break;
195  }
196  }
197 
198  if (tv->stream_pq == NULL || fw_slot == NULL) {
199  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
200  return r;
201  }
202 
203  SCLogDebug("flow end loop starting");
204  while (1) {
206  uint32_t len = tv->stream_pq->len;
208  if (len > 0) {
209  while (len--) {
211  Packet *p = PacketDequeue(tv->stream_pq);
213  if (likely(p)) {
214  if ((r = TmThreadsSlotProcessPkt(tv, fw_slot, p) != TM_ECODE_OK)) {
215  if (r == TM_ECODE_FAILED)
216  break;
217  }
218  }
219  }
220  } else {
221  SleepUsec(1);
222  }
223 
224  if (tv->stream_pq->len == 0 && TmThreadsCheckFlag(tv, THV_KILL)) {
225  break;
226  }
227  }
228  SCLogDebug("flow end loop complete");
229 
230  return r;
231 }
232 
233 /*
234 
235  pcap/nfq
236 
237  pkt read
238  callback
239  process_pkt
240 
241  pfring
242 
243  pkt read
244  process_pkt
245 
246  slot:
247  setup
248 
249  pkt_ack_loop(tv, slot_data)
250 
251  deinit
252 
253  process_pkt:
254  while(s)
255  run s;
256  queue;
257 
258  */
259 
260 static void *TmThreadsSlotPktAcqLoop(void *td)
261 {
262  ThreadVars *tv = (ThreadVars *)td;
263  TmSlot *s = tv->tm_slots;
264  char run = 1;
265  TmEcode r = TM_ECODE_OK;
266  TmSlot *slot = NULL;
267 
268  /* Set the thread name */
269  if (SCSetThreadName(tv->name) < 0) {
270  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
271  }
272 
273  if (tv->thread_setup_flags != 0)
275 
276  /* Drop the capabilities for this thread */
277  SCDropCaps(tv);
278 
279  PacketPoolInit();
280 
281  /* check if we are setup properly */
282  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
283  SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
284  " PktAcqLoop=%p, tmqh_in=%p,"
285  " tmqh_out=%p",
286  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
288  pthread_exit((void *) -1);
289  return NULL;
290  }
291 
292  for (slot = s; slot != NULL; slot = slot->slot_next) {
293  if (slot->SlotThreadInit != NULL) {
294  void *slot_data = NULL;
295  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
296  if (r != TM_ECODE_OK) {
297  if (r == TM_ECODE_DONE) {
298  EngineDone();
300  goto error;
301  } else {
303  goto error;
304  }
305  }
306  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
307  }
308  memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
309  SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
310  memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
311  SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
312 
313  /* get the 'pre qeueue' from module before the stream module */
314  if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
315  SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
316  tv->stream_pq = &slot->slot_post_pq;
317  /* if the stream module is the first, get the threads input queue */
318  } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
319  tv->stream_pq = &trans_q[tv->inq->id];
320  SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
321  }
322  }
323 
324  StatsSetupPrivate(tv);
325 
327 
328  while(run) {
329  if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
333  }
334 
335  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
336 
337  if (r == TM_ECODE_FAILED) {
339  run = 0;
340  }
342  run = 0;
343  }
344  if (r == TM_ECODE_DONE) {
345  run = 0;
346  }
347  }
348  StatsSyncCounters(tv);
349 
351 
352  /* process all pseudo packets the flow timeout may throw at us */
353  TmThreadTimeoutLoop(tv, s);
354 
357 
359 
360  for (slot = s; slot != NULL; slot = slot->slot_next) {
361  if (slot->SlotThreadExitPrintStats != NULL) {
362  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
363  }
364 
365  if (slot->SlotThreadDeinit != NULL) {
366  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
367  if (r != TM_ECODE_OK) {
369  goto error;
370  }
371  }
372 
373  BUG_ON(slot->slot_pre_pq.len);
374  BUG_ON(slot->slot_post_pq.len);
375  }
376 
377  tv->stream_pq = NULL;
378  SCLogDebug("%s ending", tv->name);
380  pthread_exit((void *) 0);
381  return NULL;
382 
383 error:
384  tv->stream_pq = NULL;
385  pthread_exit((void *) -1);
386  return NULL;
387 }
388 
389 #endif /* NO AFLFUZZ_PCAP_RUNMODE */
390 
391 #ifdef AFLFUZZ_PCAP_RUNMODE
392 /** \brief simplified loop to speed up AFL
393  *
394  * The loop runs in the caller's thread. No separate thread.
395  */
396 static void *TmThreadsSlotPktAcqLoopAFL(void *td)
397 {
398  SCLogNotice("AFL mode starting");
399 
400  ThreadVars *tv = (ThreadVars *)td;
401  TmSlot *s = tv->tm_slots;
402  char run = 1;
403  TmEcode r = TM_ECODE_OK;
404  TmSlot *slot = NULL;
405 
406  PacketPoolInit();
407 
408  /* check if we are setup properly */
409  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
410  SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
411  " PktAcqLoop=%p, tmqh_in=%p,"
412  " tmqh_out=%p",
413  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
415  return NULL;
416  }
417 
418  for (slot = s; slot != NULL; slot = slot->slot_next) {
419  if (slot->SlotThreadInit != NULL) {
420  void *slot_data = NULL;
421  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
422  if (r != TM_ECODE_OK) {
423  if (r == TM_ECODE_DONE) {
424  EngineDone();
426  goto error;
427  } else {
429  goto error;
430  }
431  }
432  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
433  }
434  memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
435  SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
436  memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
437  SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
438 
439  /* get the 'pre qeueue' from module before the stream module */
440  if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
441  SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
442  tv->stream_pq = &slot->slot_post_pq;
443  /* if the stream module is the first, get the threads input queue */
444  } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
445  tv->stream_pq = &trans_q[tv->inq->id];
446  SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
447  }
448  }
449 
450  StatsSetupPrivate(tv);
451 
453 
454  while(run) {
455  /* run right away */
456 
457  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
458 
459  if (r == TM_ECODE_FAILED) {
461  run = 0;
462  }
464  run = 0;
465  }
466  if (r == TM_ECODE_DONE) {
467  run = 0;
468  }
469  }
470  StatsSyncCounters(tv);
471 
473 
475 
477 
478  for (slot = s; slot != NULL; slot = slot->slot_next) {
479  if (slot->SlotThreadExitPrintStats != NULL) {
480  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
481  }
482 
483  if (slot->SlotThreadDeinit != NULL) {
484  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
485  if (r != TM_ECODE_OK) {
487  goto error;
488  }
489  }
490 
491  BUG_ON(slot->slot_pre_pq.len);
492  BUG_ON(slot->slot_post_pq.len);
493  }
494 
495  tv->stream_pq = NULL;
496  SCLogDebug("%s ending", tv->name);
498  return NULL;
499 
500 error:
501  tv->stream_pq = NULL;
502  return NULL;
503 }
504 #endif
505 
506 /**
507  * \todo Only the first "slot" currently makes the "post_pq" available
508  * to the thread module.
509  */
510 static void *TmThreadsSlotVar(void *td)
511 {
512  ThreadVars *tv = (ThreadVars *)td;
513  TmSlot *s = (TmSlot *)tv->tm_slots;
514  Packet *p = NULL;
515  char run = 1;
516  TmEcode r = TM_ECODE_OK;
517 
519 
520  /* Set the thread name */
521  if (SCSetThreadName(tv->name) < 0) {
522  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
523  }
524 
525  if (tv->thread_setup_flags != 0)
527 
528  /* Drop the capabilities for this thread */
529  SCDropCaps(tv);
530 
531  /* check if we are setup properly */
532  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
534  pthread_exit((void *) -1);
535  return NULL;
536  }
537 
538  for (; s != NULL; s = s->slot_next) {
539  if (s->SlotThreadInit != NULL) {
540  void *slot_data = NULL;
541  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
542  if (r != TM_ECODE_OK) {
544  goto error;
545  }
546  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
547  }
548  memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
549  SCMutexInit(&s->slot_pre_pq.mutex_q, NULL);
550  memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
551  SCMutexInit(&s->slot_post_pq.mutex_q, NULL);
552 
553  /* special case: we need to access the stream queue
554  * from the flow timeout code */
555 
556  /* get the 'pre qeueue' from module before the stream module */
557  if (s->slot_next != NULL && (s->slot_next->tm_id == TMM_FLOWWORKER)) {
558  SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq);
559  tv->stream_pq = &s->slot_pre_pq;
560  /* if the stream module is the first, get the threads input queue */
561  } else if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
562  tv->stream_pq = &trans_q[tv->inq->id];
563  SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq);
564  }
565  }
566 
567  StatsSetupPrivate(tv);
568 
570 
571  s = (TmSlot *)tv->tm_slots;
572 
573  while (run) {
574  if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
578  }
579 
580  /* input a packet */
581  p = tv->tmqh_in(tv);
582 
583  if (p != NULL) {
584  /* run the thread module(s) */
585  r = TmThreadsSlotVarRun(tv, p, s);
586  if (r == TM_ECODE_FAILED) {
587  TmqhOutputPacketpool(tv, p);
589  break;
590  }
591 
592  /* output the packet */
593  tv->tmqh_out(tv, p);
594 
595  } /* if (p != NULL) */
596 
597  /* now handle the post_pq packets */
598  TmSlot *slot;
599  for (slot = s; slot != NULL; slot = slot->slot_next) {
600  if (slot->slot_post_pq.top != NULL) {
601  while (1) {
603  Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
605 
606  if (extra_p == NULL)
607  break;
608 
609  if (slot->slot_next != NULL) {
610  r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
611  if (r == TM_ECODE_FAILED) {
615 
616  TmqhOutputPacketpool(tv, extra_p);
618  break;
619  }
620  }
621  /* output the packet */
622  tv->tmqh_out(tv, extra_p);
623  } /* while */
624  } /* if */
625  } /* for */
626 
627  if (TmThreadsCheckFlag(tv, THV_KILL)) {
628  run = 0;
629  }
630  } /* while (run) */
631  StatsSyncCounters(tv);
632 
635 
637 
638  s = (TmSlot *)tv->tm_slots;
639 
640  for ( ; s != NULL; s = s->slot_next) {
641  if (s->SlotThreadExitPrintStats != NULL) {
642  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
643  }
644 
645  if (s->SlotThreadDeinit != NULL) {
646  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
647  if (r != TM_ECODE_OK) {
649  goto error;
650  }
651  }
652  BUG_ON(s->slot_pre_pq.len);
653  BUG_ON(s->slot_post_pq.len);
654  }
655 
656  SCLogDebug("%s ending", tv->name);
657  tv->stream_pq = NULL;
659  pthread_exit((void *) 0);
660  return NULL;
661 
662 error:
663  tv->stream_pq = NULL;
664  pthread_exit((void *) -1);
665  return NULL;
666 }
667 
668 static void *TmThreadsManagement(void *td)
669 {
670  ThreadVars *tv = (ThreadVars *)td;
671  TmSlot *s = (TmSlot *)tv->tm_slots;
672  TmEcode r = TM_ECODE_OK;
673 
674  BUG_ON(s == NULL);
675 
676  /* Set the thread name */
677  if (SCSetThreadName(tv->name) < 0) {
678  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
679  }
680 
681  if (tv->thread_setup_flags != 0)
683 
684  /* Drop the capabilities for this thread */
685  SCDropCaps(tv);
686 
687  SCLogDebug("%s starting", tv->name);
688 
689  if (s->SlotThreadInit != NULL) {
690  void *slot_data = NULL;
691  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
692  if (r != TM_ECODE_OK) {
694  pthread_exit((void *) -1);
695  return NULL;
696  }
697  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
698  }
699  memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
700  memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
701 
702  StatsSetupPrivate(tv);
703 
705 
706  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
707  /* handle error */
708  if (r == TM_ECODE_FAILED) {
710  }
711 
712  if (TmThreadsCheckFlag(tv, THV_KILL)) {
713  StatsSyncCounters(tv);
714  }
715 
718 
719  if (s->SlotThreadExitPrintStats != NULL) {
720  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
721  }
722 
723  if (s->SlotThreadDeinit != NULL) {
724  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
725  if (r != TM_ECODE_OK) {
727  pthread_exit((void *) -1);
728  return NULL;
729  }
730  }
731 
733  pthread_exit((void *) 0);
734  return NULL;
735 }
736 
737 /**
738  * \brief We set the slot functions.
739  *
740  * \param tv Pointer to the TV to set the slot function for.
741  * \param name Name of the slot variant.
742  * \param fn_p Pointer to a custom slot function. Used only if slot variant
743  * "name" is "custom".
744  *
745  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
746  */
747 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
748 {
749  if (name == NULL) {
750  if (fn_p == NULL) {
751  printf("Both slot name and function pointer can't be NULL inside "
752  "TmThreadSetSlots\n");
753  goto error;
754  } else {
755  name = "custom";
756  }
757  }
758 
759  if (strcmp(name, "varslot") == 0) {
760  tv->tm_func = TmThreadsSlotVar;
761  } else if (strcmp(name, "pktacqloop") == 0) {
762 #ifndef AFLFUZZ_PCAP_RUNMODE
763  tv->tm_func = TmThreadsSlotPktAcqLoop;
764 #else
765  tv->tm_func = TmThreadsSlotPktAcqLoopAFL;
766 #endif
767  } else if (strcmp(name, "management") == 0) {
768  tv->tm_func = TmThreadsManagement;
769  } else if (strcmp(name, "command") == 0) {
770  tv->tm_func = TmThreadsManagement;
771  } else if (strcmp(name, "custom") == 0) {
772  if (fn_p == NULL)
773  goto error;
774  tv->tm_func = fn_p;
775  } else {
776  printf("Error: Slot \"%s\" not supported\n", name);
777  goto error;
778  }
779 
780  return TM_ECODE_OK;
781 
782 error:
783  return TM_ECODE_FAILED;
784 }
785 
787 {
789  for (int i = 0; i < TVT_MAX; i++) {
790  ThreadVars *tv = tv_root[i];
791  while (tv) {
792  TmSlot *slots = tv->tm_slots;
793  while (slots != NULL) {
794  if (slots == tm_slot) {
796  return tv;
797  }
798  slots = slots->slot_next;
799  }
800  tv = tv->next;
801  }
802  }
804  return NULL;
805 }
806 
807 /**
808  * \brief Appends a new entry to the slots.
809  *
810  * \param tv TV the slot is attached to.
811  * \param tm TM to append.
812  * \param data Data to be passed on to the slot init function.
813  *
814  * \retval The allocated TmSlot or NULL if there is an error
815  */
816 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
817 {
818  TmSlot *slot = SCMalloc(sizeof(TmSlot));
819  if (unlikely(slot == NULL))
820  return;
821  memset(slot, 0, sizeof(TmSlot));
822  SC_ATOMIC_INIT(slot->slot_data);
823  slot->tv = tv;
824  slot->SlotThreadInit = tm->ThreadInit;
825  slot->slot_initdata = data;
826  SC_ATOMIC_INIT(slot->SlotFunc);
827  (void)SC_ATOMIC_SET(slot->SlotFunc, tm->Func);
828  slot->PktAcqLoop = tm->PktAcqLoop;
829  slot->Management = tm->Management;
831  slot->SlotThreadDeinit = tm->ThreadDeinit;
832  /* we don't have to check for the return value "-1". We wouldn't have
833  * received a TM as arg, if it didn't exist */
834  slot->tm_id = TmModuleGetIDForTM(tm);
835 
836  tv->tmm_flags |= tm->flags;
837  tv->cap_flags |= tm->cap_flags;
838 
839  if (tv->tm_slots == NULL) {
840  tv->tm_slots = slot;
841  slot->id = 0;
842  } else {
843  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
844 
845  /* get the last slot */
846  for ( ; a != NULL; a = a->slot_next) {
847  b = a;
848  }
849  /* append the new slot */
850  if (b != NULL) {
851  b->slot_next = slot;
852  slot->id = b->id + 1;
853  }
854  }
855  return;
856 }
857 
858 /**
859  * \brief Returns the slot holding a TM with the particular tm_id.
860  *
861  * \param tm_id TM id of the TM whose slot has to be returned.
862  *
863  * \retval slots Pointer to the slot.
864  */
866 {
868  for (int i = 0; i < TVT_MAX; i++) {
869  ThreadVars *tv = tv_root[i];
870  while (tv) {
871  TmSlot *slots = tv->tm_slots;
872  while (slots != NULL) {
873  if (slots->tm_id == tm_id) {
875  return slots;
876  }
877  slots = slots->slot_next;
878  }
879  tv = tv->next;
880  }
881  }
883  return NULL;
884 }
885 
886 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
887 static int SetCPUAffinitySet(cpu_set_t *cs)
888 {
889 #if defined OS_FREEBSD
890  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
891  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
892 #elif OS_DARWIN
893  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
894  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
895 #else
896  pid_t tid = syscall(SYS_gettid);
897  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
898 #endif /* OS_FREEBSD */
899 
900  if (r != 0) {
901  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
902  strerror(errno));
903  return -1;
904  }
905 
906  return 0;
907 }
908 #endif
909 
910 
911 /**
912  * \brief Set the thread affinity on the calling thread.
913  *
914  * \param cpuid Id of the core/cpu to setup the affinity.
915  *
916  * \retval 0 If all goes well; -1 if something is wrong.
917  */
918 static int SetCPUAffinity(uint16_t cpuid)
919 {
920 #if defined __OpenBSD__ || defined sun
921  return 0;
922 #else
923  int cpu = (int)cpuid;
924 
925 #if defined OS_WIN32 || defined __CYGWIN__
926  DWORD cs = 1 << cpu;
927 
928  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
929  if (r != 0) {
930  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
931  strerror(errno));
932  return -1;
933  }
934  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
935  SCGetThreadIdLong(), cpu);
936 
937  return 0;
938 
939 #else
940  cpu_set_t cs;
941 
942  CPU_ZERO(&cs);
943  CPU_SET(cpu, &cs);
944  return SetCPUAffinitySet(&cs);
945 #endif /* windows */
946 #endif /* not supported */
947 }
948 
949 
950 /**
951  * \brief Set the thread options (thread priority).
952  *
953  * \param tv Pointer to the ThreadVars to setup the thread priority.
954  *
955  * \retval TM_ECODE_OK.
956  */
958 {
960  tv->thread_priority = prio;
961 
962  return TM_ECODE_OK;
963 }
964 
965 /**
966  * \brief Adjusting nice value for threads.
967  */
969 {
970  SCEnter();
971 #ifndef __CYGWIN__
972 #ifdef OS_WIN32
973  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
974  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for "
975  "thread %s: %s", tv->name, strerror(errno));
976  } else {
977  SCLogDebug("Priority set to %"PRId32" for thread %s",
978  tv->thread_priority, tv->name);
979  }
980 #else
981  int ret = nice(tv->thread_priority);
982  if (ret == -1) {
983  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value %d "
984  "for thread %s: %s", tv->thread_priority, tv->name,
985  strerror(errno));
986  } else {
987  SCLogDebug("Nice value set to %"PRId32" for thread %s",
988  tv->thread_priority, tv->name);
989  }
990 #endif /* OS_WIN32 */
991 #endif
992  SCReturn;
993 }
994 
995 
996 /**
997  * \brief Set the thread options (cpu affinity).
998  *
999  * \param tv pointer to the ThreadVars to setup the affinity.
1000  * \param cpu cpu on which affinity is set.
1001  *
1002  * \retval TM_ECODE_OK
1003  */
1005 {
1007  tv->cpu_affinity = cpu;
1008 
1009  return TM_ECODE_OK;
1010 }
1011 
1012 
1014 {
1016  return TM_ECODE_OK;
1017 
1018  if (type > MAX_CPU_SET) {
1019  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
1020  return TM_ECODE_FAILED;
1021  }
1022 
1024  tv->cpu_affinity = type;
1025 
1026  return TM_ECODE_OK;
1027 }
1028 
1030 {
1031  if (type >= MAX_CPU_SET) {
1032  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
1033  return 0;
1034  }
1035 
1037 }
1038 
1039 /**
1040  * \brief Set the thread options (cpu affinitythread).
1041  * Priority should be already set by pthread_create.
1042  *
1043  * \param tv pointer to the ThreadVars of the calling thread.
1044  */
1046 {
1048  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
1049  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
1050  SCGetThreadIdLong());
1051  SetCPUAffinity(tv->cpu_affinity);
1052  }
1053 
1054 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
1056  TmThreadSetPrio(tv);
1059  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
1060  int cpu = AffinityGetNextCPU(taf);
1061  SetCPUAffinity(cpu);
1062  /* If CPU is in a set overwrite the default thread prio */
1063  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
1064  tv->thread_priority = PRIO_LOW;
1065  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
1067  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
1068  tv->thread_priority = PRIO_HIGH;
1069  } else {
1070  tv->thread_priority = taf->prio;
1071  }
1072  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
1073  "%d, thread id %lu", tv->thread_priority,
1074  tv->name, cpu, SCGetThreadIdLong());
1075  } else {
1076  SetCPUAffinitySet(&taf->cpu_set);
1077  tv->thread_priority = taf->prio;
1078  SCLogPerf("Setting prio %d for thread \"%s\", "
1079  "thread id %lu", tv->thread_priority,
1080  tv->name, SCGetThreadIdLong());
1081  }
1082  TmThreadSetPrio(tv);
1083  }
1084 #endif
1085 
1086  return TM_ECODE_OK;
1087 }
1088 
1089 /**
1090  * \brief Creates and returns the TV instance for a new thread.
1091  *
1092  * \param name Name of this TV instance
1093  * \param inq_name Incoming queue name
1094  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1095  * \param outq_name Outgoing queue name
1096  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1097  * \param slots String representation for the slot function to be used
1098  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1099  * \param mucond Flag to indicate whether to initialize the condition
1100  * and the mutex variables for this newly created TV.
1101  *
1102  * \retval the newly created TV instance, or NULL on error
1103  */
1104 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
1105  const char *outq_name, const char *outqh_name, const char *slots,
1106  void * (*fn_p)(void *), int mucond)
1107 {
1108  ThreadVars *tv = NULL;
1109  Tmq *tmq = NULL;
1110  Tmqh *tmqh = NULL;
1111 
1112  SCLogDebug("creating thread \"%s\"...", name);
1113 
1114  /* XXX create separate function for this: allocate a thread container */
1115  tv = SCMalloc(sizeof(ThreadVars));
1116  if (unlikely(tv == NULL))
1117  goto error;
1118  memset(tv, 0, sizeof(ThreadVars));
1119 
1120  SC_ATOMIC_INIT(tv->flags);
1121  SCMutexInit(&tv->perf_public_ctx.m, NULL);
1122 
1123  strlcpy(tv->name, name, sizeof(tv->name));
1124 
1125  /* default state for every newly created thread */
1128 
1129  /* set the incoming queue */
1130  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
1131  SCLogDebug("inq_name \"%s\"", inq_name);
1132 
1133  tmq = TmqGetQueueByName(inq_name);
1134  if (tmq == NULL) {
1135  tmq = TmqCreateQueue(inq_name);
1136  if (tmq == NULL)
1137  goto error;
1138  }
1139  SCLogDebug("tmq %p", tmq);
1140 
1141  tv->inq = tmq;
1142  tv->inq->reader_cnt++;
1143  SCLogDebug("tv->inq %p", tv->inq);
1144  }
1145  if (inqh_name != NULL) {
1146  SCLogDebug("inqh_name \"%s\"", inqh_name);
1147 
1148  tmqh = TmqhGetQueueHandlerByName(inqh_name);
1149  if (tmqh == NULL)
1150  goto error;
1151 
1152  tv->tmqh_in = tmqh->InHandler;
1154  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
1155  }
1156 
1157  /* set the outgoing queue */
1158  if (outqh_name != NULL) {
1159  SCLogDebug("outqh_name \"%s\"", outqh_name);
1160 
1161  tmqh = TmqhGetQueueHandlerByName(outqh_name);
1162  if (tmqh == NULL)
1163  goto error;
1164 
1165  tv->tmqh_out = tmqh->OutHandler;
1166  tv->outqh_name = tmqh->name;
1167 
1168  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1169  SCLogDebug("outq_name \"%s\"", outq_name);
1170 
1171  if (tmqh->OutHandlerCtxSetup != NULL) {
1172  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1173  if (tv->outctx == NULL)
1174  goto error;
1175  tv->outq = NULL;
1176  } else {
1177  tmq = TmqGetQueueByName(outq_name);
1178  if (tmq == NULL) {
1179  tmq = TmqCreateQueue(outq_name);
1180  if (tmq == NULL)
1181  goto error;
1182  }
1183  SCLogDebug("tmq %p", tmq);
1184 
1185  tv->outq = tmq;
1186  tv->outctx = NULL;
1187  tv->outq->writer_cnt++;
1188  }
1189  }
1190  }
1191 
1192  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1193  goto error;
1194  }
1195 
1196  if (mucond != 0)
1197  TmThreadInitMC(tv);
1198 
1199  return tv;
1200 
1201 error:
1202  SCLogError(SC_ERR_THREAD_CREATE, "failed to setup a thread");
1203 
1204  if (tv != NULL)
1205  SCFree(tv);
1206  return NULL;
1207 }
1208 
1209 /**
1210  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1211  * This function doesn't support custom slots, and hence shouldn't be
1212  * supplied \"custom\" as its slot type. All PPT threads are created
1213  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1214  * conditional variables are not used to kill the thread.
1215  *
1216  * \param name Name of this TV instance
1217  * \param inq_name Incoming queue name
1218  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1219  * \param outq_name Outgoing queue name
1220  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1221  * \param slots String representation for the slot function to be used
1222  *
1223  * \retval the newly created TV instance, or NULL on error
1224  */
1225 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1226  const char *inqh_name, const char *outq_name,
1227  const char *outqh_name, const char *slots)
1228 {
1229  ThreadVars *tv = NULL;
1230 
1231  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1232  slots, NULL, 0);
1233 
1234  if (tv != NULL) {
1235  tv->type = TVT_PPT;
1236  tv->id = TmThreadsRegisterThread(tv, tv->type);
1237  }
1238 
1239 
1240  return tv;
1241 }
1242 
1243 /**
1244  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1245  * This function supports only custom slot functions and hence a
1246  * function pointer should be sent as an argument.
1247  *
1248  * \param name Name of this TV instance
1249  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1250  * \param mucond Flag to indicate whether to initialize the condition
1251  * and the mutex variables for this newly created TV.
1252  *
1253  * \retval the newly created TV instance, or NULL on error
1254  */
1255 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1256  int mucond)
1257 {
1258  ThreadVars *tv = NULL;
1259 
1260  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1261 
1262  if (tv != NULL) {
1263  tv->type = TVT_MGMT;
1264  tv->id = TmThreadsRegisterThread(tv, tv->type);
1266  }
1267 
1268  return tv;
1269 }
1270 
1271 /**
1272  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1273  * This function supports only custom slot functions and hence a
1274  * function pointer should be sent as an argument.
1275  *
1276  * \param name Name of this TV instance
1277  * \param module Name of TmModule with MANAGEMENT flag set.
1278  * \param mucond Flag to indicate whether to initialize the condition
1279  * and the mutex variables for this newly created TV.
1280  *
1281  * \retval the newly created TV instance, or NULL on error
1282  */
1283 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1284  int mucond)
1285 {
1286  ThreadVars *tv = NULL;
1287 
1288  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1289 
1290  if (tv != NULL) {
1291  tv->type = TVT_MGMT;
1292  tv->id = TmThreadsRegisterThread(tv, tv->type);
1294 
1295  TmModule *m = TmModuleGetByName(module);
1296  if (m) {
1297  TmSlotSetFuncAppend(tv, m, NULL);
1298  }
1299  }
1300 
1301  return tv;
1302 }
1303 
1304 /**
1305  * \brief Creates and returns the TV instance for a Command thread (CMD).
1306  * This function supports only custom slot functions and hence a
1307  * function pointer should be sent as an argument.
1308  *
1309  * \param name Name of this TV instance
1310  * \param module Name of TmModule with COMMAND flag set.
1311  * \param mucond Flag to indicate whether to initialize the condition
1312  * and the mutex variables for this newly created TV.
1313  *
1314  * \retval the newly created TV instance, or NULL on error
1315  */
1316 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1317  int mucond)
1318 {
1319  ThreadVars *tv = NULL;
1320 
1321  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1322 
1323  if (tv != NULL) {
1324  tv->type = TVT_CMD;
1325  tv->id = TmThreadsRegisterThread(tv, tv->type);
1327 
1328  TmModule *m = TmModuleGetByName(module);
1329  if (m) {
1330  TmSlotSetFuncAppend(tv, m, NULL);
1331  }
1332  }
1333 
1334  return tv;
1335 }
1336 
1337 /**
1338  * \brief Appends this TV to tv_root based on its type
1339  *
1340  * \param type holds the type this TV belongs to.
1341  */
1343 {
1345 
1346  if (tv_root[type] == NULL) {
1347  tv_root[type] = tv;
1348  tv->next = NULL;
1349  tv->prev = NULL;
1350 
1352 
1353  return;
1354  }
1355 
1356  ThreadVars *t = tv_root[type];
1357 
1358  while (t) {
1359  if (t->next == NULL) {
1360  t->next = tv;
1361  tv->prev = t;
1362  tv->next = NULL;
1363  break;
1364  }
1365 
1366  t = t->next;
1367  }
1368 
1370 
1371  return;
1372 }
1373 
1374 /**
1375  * \brief Removes this TV from tv_root based on its type
1376  *
1377  * \param tv The tv instance to remove from the global tv list.
1378  * \param type Holds the type this TV belongs to.
1379  */
1381 {
1383 
1384  if (tv_root[type] == NULL) {
1386 
1387  return;
1388  }
1389 
1390  ThreadVars *t = tv_root[type];
1391  while (t != tv) {
1392  t = t->next;
1393  }
1394 
1395  if (t != NULL) {
1396  if (t->prev != NULL)
1397  t->prev->next = t->next;
1398  if (t->next != NULL)
1399  t->next->prev = t->prev;
1400 
1401  if (t == tv_root[type])
1402  tv_root[type] = t->next;;
1403  }
1404 
1406 
1407  return;
1408 }
1409 
1410 /**
1411  * \brief Kill a thread.
1412  *
1413  * \param tv A ThreadVars instance corresponding to the thread that has to be
1414  * killed.
1415  *
1416  * \retval r 1 killed succesfully
1417  * 0 not yet ready, needs another look
1418  */
1419 static int TmThreadKillThread(ThreadVars *tv)
1420 {
1421  int i = 0;
1422 
1423  BUG_ON(tv == NULL);
1424 
1425  /* kill only once :) */
1426  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1427  return 1;
1428  }
1429 
1430  if (tv->inq != NULL) {
1431  /* we wait till we dry out all the inq packets, before we
1432  * kill this thread. Do note that you should have disabled
1433  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1434  if (!(strlen(tv->inq->name) == strlen("packetpool") &&
1435  strcasecmp(tv->inq->name, "packetpool") == 0)) {
1436  PacketQueue *q = &trans_q[tv->inq->id];
1437  if (q->len != 0) {
1438  return 0;
1439  }
1440  }
1441  }
1442 
1443  /* set the thread flag informing the thread that it needs to be
1444  * terminated */
1447 
1448  /* to be sure, signal more */
1449  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1450  if (tv->InShutdownHandler != NULL) {
1451  tv->InShutdownHandler(tv);
1452  }
1453  if (tv->inq != NULL) {
1454  for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1455  SCCondSignal(&trans_q[tv->inq->id].cond_q);
1456  }
1457  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1458  }
1459 
1460  if (tv->ctrl_cond != NULL ) {
1461  pthread_cond_broadcast(tv->ctrl_cond);
1462  }
1463  return 0;
1464  }
1465 
1466  if (tv->outctx != NULL) {
1468  if (tmqh == NULL)
1469  BUG_ON(1);
1470 
1471  if (tmqh->OutHandlerCtxFree != NULL) {
1472  tmqh->OutHandlerCtxFree(tv->outctx);
1473  tv->outctx = NULL;
1474  }
1475  }
1476 
1477  /* join it and flag it as dead */
1478  pthread_join(tv->t, NULL);
1479  SCLogDebug("thread %s stopped", tv->name);
1481  return 1;
1482 }
1483 
1484 /** \internal
1485  *
1486  * \brief make sure that all packet threads are done processing their
1487  * in-flight packets
1488  */
1489 static void TmThreadDrainPacketThreads(void)
1490 {
1491  ThreadVars *tv = NULL;
1492  struct timeval start_ts;
1493  struct timeval cur_ts;
1494  gettimeofday(&start_ts, NULL);
1495 
1496 again:
1497  gettimeofday(&cur_ts, NULL);
1498  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1499  SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads "
1500  "to process their packets in time");
1501  return;
1502  }
1503 
1505 
1506  /* all receive threads are part of packet processing threads */
1507  tv = tv_root[TVT_PPT];
1508  while (tv) {
1509  if (tv->inq != NULL) {
1510  /* we wait till we dry out all the inq packets, before we
1511  * kill this thread. Do note that you should have disabled
1512  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1513  if (!(strlen(tv->inq->name) == strlen("packetpool") &&
1514  strcasecmp(tv->inq->name, "packetpool") == 0)) {
1515  PacketQueue *q = &trans_q[tv->inq->id];
1516  if (q->len != 0) {
1518 
1519  /* sleep outside lock */
1520  SleepMsec(1);
1521  goto again;
1522  }
1523  }
1524  }
1525 
1526  tv = tv->next;
1527  }
1528 
1530  return;
1531 }
1532 
1533 /**
1534  * \brief Disable all threads having the specified TMs.
1535  *
1536  * Breaks out of the packet acquisition loop, and bumps
1537  * into the 'flow loop', where it will process packets
1538  * from the flow engine's shutdown handling.
1539  */
1541 {
1542  ThreadVars *tv = NULL;
1543  struct timeval start_ts;
1544  struct timeval cur_ts;
1545  gettimeofday(&start_ts, NULL);
1546 
1547 again:
1548  gettimeofday(&cur_ts, NULL);
1549  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1550  FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1551  "thread - \"%s\". Killing engine", tv->name);
1552  }
1553 
1555 
1556  /* all receive threads are part of packet processing threads */
1557  tv = tv_root[TVT_PPT];
1558 
1559  /* we do have to keep in mind that TVs are arranged in the order
1560  * right from receive to log. The moment we fail to find a
1561  * receive TM amongst the slots in a tv, it indicates we are done
1562  * with all receive threads */
1563  while (tv) {
1564  int disable = 0;
1565  TmModule *tm = NULL;
1566  /* obtain the slots for this TV */
1567  TmSlot *slots = tv->tm_slots;
1568  while (slots != NULL) {
1569  tm = TmModuleGetById(slots->tm_id);
1570 
1571  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1572  disable = 1;
1573  break;
1574  }
1575 
1576  slots = slots->slot_next;
1577  continue;
1578  }
1579 
1580  if (disable) {
1581  if (tv->inq != NULL) {
1582  /* we wait till we dry out all the inq packets, before we
1583  * kill this thread. Do note that you should have disabled
1584  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1585  if (!(strlen(tv->inq->name) == strlen("packetpool") &&
1586  strcasecmp(tv->inq->name, "packetpool") == 0)) {
1587  PacketQueue *q = &trans_q[tv->inq->id];
1588  if (q->len != 0) {
1590  /* don't sleep while holding a lock */
1591  SleepMsec(1);
1592  goto again;
1593  }
1594  }
1595  }
1596 
1597  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1598  if (tm && tm->PktAcqBreakLoop != NULL) {
1599  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1600  }
1602 
1603  if (tv->inq != NULL) {
1604  int i;
1605  for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1606  SCCondSignal(&trans_q[tv->inq->id].cond_q);
1607  }
1608  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1609  }
1610 
1611  /* wait for it to enter the 'flow loop' stage */
1612  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1614 
1615  SleepMsec(1);
1616  goto again;
1617  }
1618  }
1619 
1620  tv = tv->next;
1621  }
1622 
1624 
1625  /* finally wait for all packet threads to have
1626  * processed all of their 'live' packets so we
1627  * don't process the last live packets together
1628  * with FFR packets */
1629  TmThreadDrainPacketThreads();
1630  return;
1631 }
1632 
1633 /**
1634  * \brief Disable all threads having the specified TMs.
1635  */
1637 {
1638  ThreadVars *tv = NULL;
1639  struct timeval start_ts;
1640  struct timeval cur_ts;
1641 
1642  /* first drain all packet threads of their packets */
1643  TmThreadDrainPacketThreads();
1644 
1645  gettimeofday(&start_ts, NULL);
1646 again:
1647  gettimeofday(&cur_ts, NULL);
1648  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1649  FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1650  "thread - \"%s\". Killing engine",
1651  tv ? tv->name : "<unknown>");
1652  }
1653 
1655 
1656  /* all receive threads are part of packet processing threads */
1657  tv = tv_root[TVT_PPT];
1658 
1659  /* we do have to keep in mind that TVs are arranged in the order
1660  * right from receive to log. The moment we fail to find a
1661  * receive TM amongst the slots in a tv, it indicates we are done
1662  * with all receive threads */
1663  while (tv) {
1664  if (tv->inq != NULL) {
1665  /* we wait till we dry out all the inq packets, before we
1666  * kill this thread. Do note that you should have disabled
1667  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1668  if (!(strlen(tv->inq->name) == strlen("packetpool") &&
1669  strcasecmp(tv->inq->name, "packetpool") == 0)) {
1670  PacketQueue *q = &trans_q[tv->inq->id];
1671  if (q->len != 0) {
1673  /* don't sleep while holding a lock */
1674  SleepMsec(1);
1675  goto again;
1676  }
1677  }
1678  }
1679 
1680  /* we found our receive TV. Send it a KILL signal. This is all
1681  * we need to do to kill receive threads */
1683 
1684  if (tv->inq != NULL) {
1685  int i;
1686  for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1687  SCCondSignal(&trans_q[tv->inq->id].cond_q);
1688  }
1689  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1690  }
1691 
1692  while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
1694 
1695  SleepMsec(1);
1696  goto again;
1697  }
1698 
1699  tv = tv->next;
1700  }
1701 
1703 
1704  return;
1705 }
1706 
1708 {
1709  ThreadVars *tv = NULL;
1710  TmSlot *slots = NULL;
1711 
1713 
1714  /* all receive threads are part of packet processing threads */
1715  tv = tv_root[TVT_PPT];
1716 
1717  while (tv) {
1718  slots = tv->tm_slots;
1719 
1720  while (slots != NULL) {
1721  TmModule *tm = TmModuleGetById(slots->tm_id);
1722 
1723  char *found = strstr(tm->name, tm_name);
1724  if (found != NULL)
1725  goto end;
1726 
1727  slots = slots->slot_next;
1728  }
1729 
1730  tv = tv->next;
1731  }
1732 
1733  end:
1735  return slots;
1736 }
1737 
1738 #define MIN_WAIT_TIME 100
1739 #define MAX_WAIT_TIME 999999
1741 {
1742  ThreadVars *tv = NULL;
1743  unsigned int sleep_usec = MIN_WAIT_TIME;
1744 
1745  BUG_ON((family < 0) || (family >= TVT_MAX));
1746 
1747 again:
1749  tv = tv_root[family];
1750 
1751  while (tv) {
1752  int r = TmThreadKillThread(tv);
1753  if (r == 0) {
1755  SleepUsec(sleep_usec);
1756  sleep_usec *= 2; /* slowly back off */
1757  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1758  goto again;
1759  }
1760  sleep_usec = MIN_WAIT_TIME; /* reset */
1761 
1762  tv = tv->next;
1763  }
1765 }
1766 #undef MIN_WAIT_TIME
1767 #undef MAX_WAIT_TIME
1768 
1770 {
1771  int i = 0;
1772 
1773  for (i = 0; i < TVT_MAX; i++) {
1775  }
1776 
1777  return;
1778 }
1779 
1780 static void TmThreadFree(ThreadVars *tv)
1781 {
1782  TmSlot *s;
1783  TmSlot *ps;
1784  if (tv == NULL)
1785  return;
1786 
1787  SCLogDebug("Freeing thread '%s'.", tv->name);
1788 
1789  StatsThreadCleanup(tv);
1790 
1791  TmThreadDeinitMC(tv);
1792 
1793  if (tv->thread_group_name) {
1795  }
1796 
1797  if (tv->printable_name) {
1798  SCFree(tv->printable_name);
1799  }
1800 
1801  s = (TmSlot *)tv->tm_slots;
1802  while (s) {
1803  ps = s;
1804  s = s->slot_next;
1805  SCFree(ps);
1806  }
1807 
1809  SCFree(tv);
1810 }
1811 
1812 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1813 {
1814  char *thread_group_name = NULL;
1815 
1816  if (name == NULL)
1817  return;
1818 
1819  if (tv == NULL)
1820  return;
1821 
1822  thread_group_name = SCStrdup(name);
1823  if (unlikely(thread_group_name == NULL)) {
1824  SCLogError(SC_ERR_RUNMODE, "error allocating memory");
1825  return;
1826  }
1827  tv->thread_group_name = thread_group_name;
1828 }
1829 
1831 {
1832  ThreadVars *tv = NULL;
1833  ThreadVars *ptv = NULL;
1834 
1835  if ((family < 0) || (family >= TVT_MAX))
1836  return;
1837 
1839  tv = tv_root[family];
1840 
1841  while (tv) {
1842  ptv = tv;
1843  tv = tv->next;
1844  TmThreadFree(ptv);
1845  }
1846  tv_root[family] = NULL;
1848 }
1849 
1850 /**
1851  * \brief Spawns a thread associated with the ThreadVars instance tv
1852  *
1853  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1854  */
1856 {
1857  pthread_attr_t attr;
1858  if (tv->tm_func == NULL) {
1859  printf("ERROR: no thread function set\n");
1860  return TM_ECODE_FAILED;
1861  }
1862 
1863  /* Initialize and set thread detached attribute */
1864  pthread_attr_init(&attr);
1865 
1866  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1867 
1868  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1869  if (rc) {
1870  printf("ERROR; return code from pthread_create() is %" PRId32 "\n", rc);
1871  return TM_ECODE_FAILED;
1872  }
1873 
1875 
1876  TmThreadAppend(tv, tv->type);
1877  return TM_ECODE_OK;
1878 }
1879 
1880 /**
1881  * \brief Sets the thread flags for a thread instance(tv)
1882  *
1883  * \param tv Pointer to the thread instance for which the flag has to be set
1884  * \param flags Holds the thread state this thread instance has to be set to
1885  */
1886 #if 0
1887 void TmThreadSetFlags(ThreadVars *tv, uint8_t flags)
1888 {
1889  if (tv != NULL)
1890  tv->flags = flags;
1891 
1892  return;
1893 }
1894 #endif
1895 
1896 /**
1897  * \brief Initializes the mutex and condition variables for this TV
1898  *
1899  * It can be used by a thread to control a wait loop that can also be
1900  * influenced by other threads.
1901  *
1902  * \param tv Pointer to a TV instance
1903  */
1905 {
1906  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1907  SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. "
1908  "Exiting...");
1909  exit(EXIT_FAILURE);
1910  }
1911 
1912  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1913  printf("Error initializing the tv->m mutex\n");
1914  exit(EXIT_FAILURE);
1915  }
1916 
1917  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1918  SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. "
1919  "Exiting...");
1920  exit(EXIT_FAILURE);
1921  }
1922 
1923  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1924  SCLogError(SC_ERR_FATAL, "Error initializing the tv->cond condition "
1925  "variable");
1926  exit(EXIT_FAILURE);
1927  }
1928 
1929  return;
1930 }
1931 
1932 static void TmThreadDeinitMC(ThreadVars *tv)
1933 {
1934  if (tv->ctrl_mutex) {
1936  SCFree(tv->ctrl_mutex);
1937  }
1938  if (tv->ctrl_cond) {
1940  SCFree(tv->ctrl_cond);
1941  }
1942  return;
1943 }
1944 
1945 /**
1946  * \brief Tests if the thread represented in the arg has been unpaused or not.
1947  *
1948  * The function would return if the thread tv has been unpaused or if the
1949  * kill flag for the thread has been set.
1950  *
1951  * \param tv Pointer to the TV instance.
1952  */
1954 {
1955  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1956  SleepUsec(100);
1957 
1958  if (TmThreadsCheckFlag(tv, THV_KILL))
1959  break;
1960  }
1961 
1962  return;
1963 }
1964 
1965 /**
1966  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1967  * the kill flag has been set or not on the thread.
1968  *
1969  * \param tv Pointer to the TV instance.
1970  */
1971 void TmThreadWaitForFlag(ThreadVars *tv, uint16_t flags)
1972 {
1973  while (!TmThreadsCheckFlag(tv, flags)) {
1974  SleepUsec(100);
1975  }
1976 
1977  return;
1978 }
1979 
1980 /**
1981  * \brief Unpauses a thread
1982  *
1983  * \param tv Pointer to a TV instance that has to be unpaused
1984  */
1986 {
1988 
1989  return;
1990 }
1991 
1992 /**
1993  * \brief Unpauses all threads present in tv_root
1994  */
1996 {
1997  ThreadVars *tv = NULL;
1998  int i = 0;
1999 
2001  for (i = 0; i < TVT_MAX; i++) {
2002  tv = tv_root[i];
2003  while (tv != NULL) {
2004  TmThreadContinue(tv);
2005  tv = tv->next;
2006  }
2007  }
2009 
2010  return;
2011 }
2012 
2013 /**
2014  * \brief Pauses a thread
2015  *
2016  * \param tv Pointer to a TV instance that has to be paused
2017  */
2019 {
2021 
2022  return;
2023 }
2024 
2025 /**
2026  * \brief Pauses all threads present in tv_root
2027  */
2029 {
2030  ThreadVars *tv = NULL;
2031  int i = 0;
2032 
2034 
2036  for (i = 0; i < TVT_MAX; i++) {
2037  tv = tv_root[i];
2038  while (tv != NULL) {
2039  TmThreadPause(tv);
2040  tv = tv->next;
2041  }
2042  }
2044 
2045  return;
2046 }
2047 
2048 /**
2049  * \brief Used to check the thread for certain conditions of failure.
2050  */
2052 {
2053  ThreadVars *tv = NULL;
2054  int i = 0;
2055 
2057  for (i = 0; i < TVT_MAX; i++) {
2058  tv = tv_root[i];
2059 
2060  while (tv) {
2061  if (TmThreadsCheckFlag(tv, THV_FAILED)) {
2062  FatalError(SC_ERR_FATAL, "thread %s failed", tv->name);
2063  }
2064  tv = tv->next;
2065  }
2066  }
2068  return;
2069 }
2070 
2071 /**
2072  * \brief Used to check if all threads have finished their initialization. On
2073  * finding an un-initialized thread, it waits till that thread completes
2074  * its initialization, before proceeding to the next thread.
2075  *
2076  * \retval TM_ECODE_OK all initialized properly
2077  * \retval TM_ECODE_FAILED failure
2078  */
2080 {
2081  ThreadVars *tv = NULL;
2082  int i = 0;
2083  uint16_t mgt_num = 0;
2084  uint16_t ppt_num = 0;
2085 
2086  struct timeval start_ts;
2087  struct timeval cur_ts;
2088  gettimeofday(&start_ts, NULL);
2089 
2090 again:
2092  for (i = 0; i < TVT_MAX; i++) {
2093  tv = tv_root[i];
2094  while (tv != NULL) {
2095  if (TmThreadsCheckFlag(tv, (THV_CLOSED|THV_DEAD))) {
2097 
2098  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
2099  "initialize: flags %04x", tv->name,
2100  SC_ATOMIC_GET(tv->flags));
2101  return TM_ECODE_FAILED;
2102  }
2103 
2104  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
2106 
2107  gettimeofday(&cur_ts, NULL);
2108  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
2109  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
2110  "initialize in time: flags %04x", tv->name,
2111  SC_ATOMIC_GET(tv->flags));
2112  return TM_ECODE_FAILED;
2113  }
2114 
2115  /* sleep a little to give the thread some
2116  * time to finish initialization */
2117  SleepUsec(100);
2118  goto again;
2119  }
2120 
2121  if (TmThreadsCheckFlag(tv, THV_FAILED)) {
2123  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
2124  "initialize.", tv->name);
2125  return TM_ECODE_FAILED;
2126  }
2127  if (TmThreadsCheckFlag(tv, THV_CLOSED)) {
2129  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" closed on "
2130  "initialization.", tv->name);
2131  return TM_ECODE_FAILED;
2132  }
2133 
2134  if (i == TVT_MGMT)
2135  mgt_num++;
2136  else if (i == TVT_PPT)
2137  ppt_num++;
2138 
2139  tv = tv->next;
2140  }
2141  }
2143 
2144  SCLogNotice("all %"PRIu16" packet processing threads, %"PRIu16" management "
2145  "threads initialized, engine started.", ppt_num, mgt_num);
2146 
2147  return TM_ECODE_OK;
2148 }
2149 
2150 /**
2151  * \brief Returns the TV for the calling thread.
2152  *
2153  * \retval tv Pointer to the ThreadVars instance for the calling thread;
2154  * NULL on no match
2155  */
2157 {
2158  pthread_t self = pthread_self();
2159  ThreadVars *tv = NULL;
2160  int i = 0;
2161 
2163 
2164  for (i = 0; i < TVT_MAX; i++) {
2165  tv = tv_root[i];
2166  while (tv) {
2167  if (pthread_equal(self, tv->t)) {
2169  return tv;
2170  }
2171  tv = tv->next;
2172  }
2173  }
2174 
2176 
2177  return NULL;
2178 }
2179 
2180 /**
2181  * \brief returns a count of all the threads that match the flag
2182  */
2183 uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
2184 {
2185  ThreadVars *tv = NULL;
2186  int i = 0;
2187  uint32_t cnt = 0;
2188 
2190  for (i = 0; i < TVT_MAX; i++) {
2191  tv = tv_root[i];
2192  while (tv != NULL) {
2193  if ((tv->tmm_flags & flags) == flags)
2194  cnt++;
2195 
2196  tv = tv->next;
2197  }
2198  }
2200  return cnt;
2201 }
2202 
2203 typedef struct Thread_ {
2204  ThreadVars *tv; /**< threadvars structure */
2205  const char *name;
2206  int type;
2207  int in_use; /**< bool to indicate this is in use */
2208 
2209  struct timeval ts; /**< current time of this thread (offline mode) */
2210 } Thread;
2211 
2212 typedef struct Threads_ {
2213  Thread *threads;
2216 } Threads;
2217 
2218 static Threads thread_store = { NULL, 0, 0 };
2219 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2220 
2222 {
2223  Thread *t;
2224  size_t s;
2225 
2226  SCMutexLock(&thread_store_lock);
2227 
2228  for (s = 0; s < thread_store.threads_size; s++) {
2229  t = &thread_store.threads[s];
2230  if (t == NULL || t->in_use == 0)
2231  continue;
2232  SCLogInfo("Thread %"PRIuMAX", %s type %d, tv %p", (uintmax_t)s+1, t->name, t->type, t->tv);
2233  }
2234 
2235  SCMutexUnlock(&thread_store_lock);
2236 }
2237 
2238 #define STEP 32
2239 /**
2240  * \retval id thread id, or 0 if not found
2241  */
2243 {
2244  SCMutexLock(&thread_store_lock);
2245  if (thread_store.threads == NULL) {
2246  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2247  BUG_ON(thread_store.threads == NULL);
2248  thread_store.threads_size = STEP;
2249  }
2250 
2251  size_t s;
2252  for (s = 0; s < thread_store.threads_size; s++) {
2253  if (thread_store.threads[s].in_use == 0) {
2254  Thread *t = &thread_store.threads[s];
2255  t->name = tv->name;
2256  t->type = type;
2257  t->tv = tv;
2258  t->in_use = 1;
2259 
2260  SCMutexUnlock(&thread_store_lock);
2261  return (int)(s+1);
2262  }
2263  }
2264 
2265  /* if we get here the array is completely filled */
2266  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2267  BUG_ON(newmem == NULL);
2268  thread_store.threads = newmem;
2269  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2270 
2271  Thread *t = &thread_store.threads[thread_store.threads_size];
2272  t->name = tv->name;
2273  t->type = type;
2274  t->tv = tv;
2275  t->in_use = 1;
2276 
2277  s = thread_store.threads_size;
2278  thread_store.threads_size += STEP;
2279 
2280  SCMutexUnlock(&thread_store_lock);
2281  return (int)(s+1);
2282 }
2283 #undef STEP
2284 
2285 void TmThreadsUnregisterThread(const int id)
2286 {
2287  SCMutexLock(&thread_store_lock);
2288  if (id <= 0 || id > (int)thread_store.threads_size) {
2289  SCMutexUnlock(&thread_store_lock);
2290  return;
2291  }
2292 
2293  /* id is one higher than index */
2294  int idx = id - 1;
2295 
2296  /* reset thread_id, which serves as clearing the record */
2297  thread_store.threads[idx].in_use = 0;
2298 
2299  /* check if we have at least one registered thread left */
2300  size_t s;
2301  for (s = 0; s < thread_store.threads_size; s++) {
2302  Thread *t = &thread_store.threads[s];
2303  if (t->in_use == 1) {
2304  goto end;
2305  }
2306  }
2307 
2308  /* if we get here no threads are registered */
2309  SCFree(thread_store.threads);
2310  thread_store.threads = NULL;
2311  thread_store.threads_size = 0;
2312  thread_store.threads_cnt = 0;
2313 
2314 end:
2315  SCMutexUnlock(&thread_store_lock);
2316 }
2317 
2318 void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
2319 {
2320  SCMutexLock(&thread_store_lock);
2321  if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2322  SCMutexUnlock(&thread_store_lock);
2323  return;
2324  }
2325 
2326  int idx = id - 1;
2327  Thread *t = &thread_store.threads[idx];
2328  t->ts.tv_sec = ts->tv_sec;
2329  t->ts.tv_usec = ts->tv_usec;
2330  SCMutexUnlock(&thread_store_lock);
2331 }
2332 
2333 #define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec) // XXX unify with flow-util.h
2334 void TmreadsGetMinimalTimestamp(struct timeval *ts)
2335 {
2336  struct timeval local, nullts;
2337  memset(&local, 0, sizeof(local));
2338  memset(&nullts, 0, sizeof(nullts));
2339  int set = 0;
2340  size_t s;
2341 
2342  SCMutexLock(&thread_store_lock);
2343  for (s = 0; s < thread_store.threads_size; s++) {
2344  Thread *t = &thread_store.threads[s];
2345  if (t == NULL || t->in_use == 0)
2346  continue;
2347  if (!(timercmp(&t->ts, &nullts, ==))) {
2348  if (!set) {
2349  local.tv_sec = t->ts.tv_sec;
2350  local.tv_usec = t->ts.tv_usec;
2351  set = 1;
2352  } else {
2353  if (timercmp(&t->ts, &local, <)) {
2354  COPY_TIMESTAMP(&t->ts, &local);
2355  }
2356  }
2357  }
2358  }
2359  SCMutexUnlock(&thread_store_lock);
2360  COPY_TIMESTAMP(&local, ts);
2361  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2362 }
2363 #undef COPY_TIMESTAMP
2364 
2365 /**
2366  * \retval r 1 if packet was accepted, 0 otherwise
2367  * \note if packet was not accepted, it's still the responsibility
2368  * of the caller.
2369  */
2370 int TmThreadsInjectPacketsById(Packet **packets, const int id)
2371 {
2372  if (id <= 0 || id > (int)thread_store.threads_size)
2373  return 0;
2374 
2375  int idx = id - 1;
2376 
2377  Thread *t = &thread_store.threads[idx];
2378  ThreadVars *tv = t->tv;
2379 
2380  if (tv == NULL || tv->stream_pq == NULL)
2381  return 0;
2382 
2383  SCMutexLock(&tv->stream_pq->mutex_q);
2384  while (*packets != NULL) {
2385  PacketEnqueue(tv->stream_pq, *packets);
2386  packets++;
2387  }
2389 
2390  /* wake up listening thread(s) if necessary */
2391  if (tv->inq != NULL) {
2392  SCCondSignal(&trans_q[tv->inq->id].cond_q);
2393  }
2394  return 1;
2395 }
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value from our atomic variable.
Definition: util-atomic.h:154
Packet *(* InHandler)(ThreadVars *)
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:88
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:816
#define THREAD_SET_PRIORITY
Definition: threadvars.h:117
const char * name
#define SCMutex
#define SCDropCaps(...)
Definition: util-privs.h:37
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:56
uint16_t flags
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:437
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:1013
#define SCCtrlMutexDestroy
#define SCLogDebug(...)
Definition: util-debug.h:335
void TmThreadSetFlags(ThreadVars *, uint8_t)
uint8_t type
Definition: threadvars.h:92
int AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
__thread uint64_t rww_lock_contention
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:1029
uint8_t cap_flags
Definition: tm-modules.h:67
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:2051
void TmThreadPause(ThreadVars *tv)
Pauses a thread.
Definition: tm-threads.c:2018
SCCondT cond_q
Definition: decode.h:628
uint16_t writer_cnt
Definition: tm-queues.h:31
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:64
int thread_priority
Definition: threadvars.h:96
#define BUG_ON(x)
#define THV_KILL_PKTACQ
Definition: threadvars.h:47
uint8_t flags
Definition: tm-modules.h:70
#define SCSetThreadName(n)
Definition: threads.h:299
#define THV_RUNNING_DONE
Definition: threadvars.h:45
#define STEP
Definition: tm-threads.c:2238
struct ThreadVars_ * prev
Definition: threadvars.h:112
void TmThreadDisablePacketThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1636
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1255
int TmThreadsInjectPacketsById(Packet **packets, const int id)
Definition: tm-threads.c:2370
#define unlikely(expr)
Definition: util-optimize.h:35
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1285
TmEcode(* Func)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *)
Definition: tm-modules.h:52
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:957
pthread_t t
Definition: threadvars.h:58
TmSlot * TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name)
Definition: tm-threads.c:1707
#define THV_DEAD
Definition: threadvars.h:54
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:60
uint16_t id
Definition: tm-queues.h:29
#define SCCtrlCondInit
#define MIN(x, y)
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:107
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn&#39;t support custo...
Definition: tm-threads.c:1225
int type
Definition: tm-threads.c:2206
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1812
#define THV_PAUSED
Definition: threadvars.h:38
#define COPY_TIMESTAMP(src, dst)
Definition: tm-threads.c:2333
__thread uint64_t spin_lock_wait_ticks
struct Thread_ Thread
#define THV_FAILED
Definition: threadvars.h:40
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:201
void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
Definition: tm-threads.c:2318
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value from our atomic variable.
Definition: util-atomic.h:141
__thread uint64_t spin_lock_contention
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1316
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1830
#define THREAD_SET_AFFINITY
Definition: threadvars.h:116
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:1985
void(* InShutdownHandler)(struct ThreadVars_ *)
Definition: threadvars.h:79
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
#define SCMutexLock(mut)
__thread uint64_t rwr_lock_wait_ticks
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread...
Definition: tm-threads.c:2079
ThreadVars * TmThreadsGetCallingThread(void)
Returns the TV for the calling thread.
Definition: tm-threads.c:2156
char * thread_group_name
Definition: threadvars.h:61
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:106
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:31
uint8_t thread_setup_flags
Definition: threadvars.h:89
void TmThreadsListThreads(void)
Definition: tm-threads.c:2221
void(* OutHandlerCtxFree)(void *)
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag)
Set a thread flag.
Definition: tm-threads.c:98
#define SC_ATOMIC_INIT(name)
Initialize the previously declared atomic variable and it&#39;s lock.
Definition: util-atomic.h:81
#define SCCalloc(nm, a)
Definition: util-mem.h:253
struct TmSlot_ * tm_slots
Definition: threadvars.h:84
#define SCMutexUnlock(mut)
void TmThreadWaitForFlag(ThreadVars *tv, uint16_t flags)
Waits till the specified flag(s) is(are) set. We don&#39;t bother if the kill flag has been set or not on...
Definition: tm-threads.c:1971
#define THV_INIT_DONE
Definition: threadvars.h:36
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tm-queues.h:27
void TmThreadRemove(ThreadVars *tv, int type)
Removes this TV from tv_root based on its type.
Definition: tm-threads.c:1380
#define SCCtrlCondDestroy
#define THV_CLOSED
Definition: threadvars.h:41
SCMutex tv_root_lock
Definition: tm-threads.c:82
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
#define SCMUTEX_INITIALIZER
uint32_t len
Definition: decode.h:623
uint8_t type
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1740
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:294
#define PACKET_PROFILING_TMM_END(p, id)
struct ThreadVars_ * next
Definition: threadvars.h:111
#define SCGetThreadIdLong(...)
Definition: threads.h:253
const void * slot_initdata
Definition: tm-threads.h:67
#define SleepMsec(msec)
Definition: tm-threads.h:44
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:80
__thread uint64_t mutex_lock_wait_ticks
size_t threads_size
Definition: tm-threads.c:2214
PacketQueue slot_post_pq
Definition: tm-threads.h:78
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
#define SCEnter(...)
Definition: util-debug.h:337
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:90
#define PACKET_PROFILING_TMM_START(p, id)
Packet * top
Definition: decode.h:621
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1283
int tm_id
Definition: tm-threads.h:81
struct TmSlot_ * slot_next
Definition: tm-threads.h:87
#define THV_USE
Definition: threadvars.h:35
void TmThreadPauseThreads()
Pauses all threads present in tv_root.
Definition: tm-threads.c:2028
void TmThreadContinueThreads()
Unpauses all threads present in tv_root.
Definition: tm-threads.c:1995
void PacketPoolInitEmpty(void)
void TmThreadsUnsetFlag(ThreadVars *tv, uint16_t flag)
Unset a thread flag.
Definition: tm-threads.c:106
PacketQueue slot_pre_pq
Definition: tm-threads.h:73
Tmq * outq
Definition: threadvars.h:73
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1342
uint8_t tmm_flags
Definition: threadvars.h:66
#define SCMutexInit(mut, mutattrs)
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:116
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2242
void PacketPoolDestroy(void)
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:62
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
#define THV_PAUSE
Definition: threadvars.h:37
void(* OutHandler)(ThreadVars *, Packet *)
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:63
struct Threads_ Threads
#define THV_KILL
Definition: threadvars.h:39
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:281
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:118
__thread uint64_t mutex_lock_contention
__thread uint64_t rwr_lock_cnt
#define SCRealloc(x, a)
Definition: util-mem.h:238
uint16_t cpu_affinity
Definition: threadvars.h:94
#define THV_FLOW_LOOP
Definition: threadvars.h:48
TmSlot * TmSlotGetSlotForTM(int tm_id)
Returns the slot holding a TM with the particular tm_id.
Definition: tm-threads.c:865
void PacketPoolInit(void)
void TmThreadKillThreads(void)
Definition: tm-threads.c:1769
uint16_t reader_cnt
Definition: tm-queues.h:30
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:34
const char * name
Definition: tm-modules.h:44
Tmqh * TmqhGetQueueHandlerByName(const char *name)
#define MAX_WAIT_TIME
Definition: tm-threads.c:1739
void TmreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2334
#define SCMalloc(a)
Definition: util-mem.h:222
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:207
ThreadVars * tv
Definition: tm-threads.h:55
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2285
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:254
ThreadVars * tv
Definition: tm-threads.c:2204
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:101
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:1045
#define SCFree(a)
Definition: util-mem.h:322
void *(* tm_func)(void *)
Definition: threadvars.h:83
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:269
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1540
#define SCCondSignal
const char * outqh_name
Definition: threadvars.h:75
char * printable_name
Definition: threadvars.h:60
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2183
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:968
#define SleepUsec(usec)
Definition: tm-threads.h:43
#define MIN_WAIT_TIME
Definition: tm-threads.c:1738
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:107
PacketQueue trans_q[256]
Definition: suricata.h:132
#define SCLogPerf(...)
Definition: util-debug.h:261
#define FatalError(x,...)
Definition: util-debug.h:539
ThreadVars * TmThreadsGetTVContainingSlot(TmSlot *tm_slot)
Definition: tm-threads.c:786
__thread uint64_t rwr_lock_contention
__thread uint64_t rww_lock_cnt
void(* InShutdownHandler)(ThreadVars *)
TmEcode(* TmSlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *)
Definition: tm-threads.h:50
SCMutex m
Definition: flow-hash.h:105
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1186
__thread uint64_t rww_lock_wait_ticks
void * outctx
Definition: threadvars.h:74
struct timeval ts
Definition: tm-threads.c:2209
int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:90
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:192
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:1004
#define StatsSyncCounters(tv)
Definition: counters.h:133
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:167
int in_use
Definition: tm-threads.c:2207
void TmThreadInitMC(ThreadVars *tv)
Sets the thread flags for a thread instance(tv)
Definition: tm-threads.c:1904
const char * name
Definition: tm-threads.c:2205
#define SCStrdup(a)
Definition: util-mem.h:268
char name[16]
Definition: threadvars.h:59
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:51
Thread * threads
Definition: tm-threads.c:2213
struct PacketQueue_ * stream_pq
Definition: threadvars.h:87
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:79
#define SCReturn
Definition: util-debug.h:339
#define THV_DEINIT
Definition: threadvars.h:44
uint8_t len
Per thread variable structure.
Definition: threadvars.h:57
__thread uint64_t mutex_lock_cnt
#define SCCtrlMutexInit(mut, mutattr)
char * name
Definition: tm-queues.h:28
void *(* OutHandlerCtxSetup)(const char *)
Tmq * inq
Definition: threadvars.h:72
#define likely(expr)
Definition: util-optimize.h:32
uint8_t cap_flags
Definition: threadvars.h:109
int id
Definition: tm-threads.h:84
int threading_set_cpu_affinity
Definition: runmodes.h:112
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:78
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1855
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:1104
Tmq * TmqCreateQueue(const char *name)
Definition: tm-queues.c:36
int threads_cnt
Definition: tm-threads.c:2215
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:139
__thread uint64_t spin_lock_cnt
SCMutex mutex_q
Definition: decode.h:627
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1953
void TmqhReleasePacketsToPacketPool(PacketQueue *pq)
Release all the packets in the queue back to the packetpool. Mainly used by threads that have failed...