suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2017 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-debug.h"
39 #include "util-privs.h"
40 #include "util-cpu.h"
41 #include "util-optimize.h"
42 #include "util-profiling.h"
43 #include "util-signal.h"
44 #include "queue.h"
45 
46 #ifdef PROFILE_LOCKING
47 __thread uint64_t mutex_lock_contention;
48 __thread uint64_t mutex_lock_wait_ticks;
49 __thread uint64_t mutex_lock_cnt;
50 
51 __thread uint64_t spin_lock_contention;
52 __thread uint64_t spin_lock_wait_ticks;
53 __thread uint64_t spin_lock_cnt;
54 
55 __thread uint64_t rww_lock_contention;
56 __thread uint64_t rww_lock_wait_ticks;
57 __thread uint64_t rww_lock_cnt;
58 
59 __thread uint64_t rwr_lock_contention;
60 __thread uint64_t rwr_lock_wait_ticks;
61 __thread uint64_t rwr_lock_cnt;
62 #endif
63 
64 #ifdef OS_FREEBSD
65 #include <sched.h>
66 #include <sys/param.h>
67 #include <sys/resource.h>
68 #include <sys/cpuset.h>
69 #include <sys/thr.h>
70 #define cpu_set_t cpuset_t
71 #endif /* OS_FREEBSD */
72 
73 /* prototypes */
74 static int SetCPUAffinity(uint16_t cpu);
75 static void TmThreadDeinitMC(ThreadVars *tv);
76 
77 /* root of the threadvars list */
78 ThreadVars *tv_root[TVT_MAX] = { NULL };
79 
80 /* lock to protect tv_root */
82 
83 /**
84  * \brief Check if a thread flag is set.
85  *
86  * \retval 1 flag is set.
87  * \retval 0 flag is not set.
88  */
89 int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag)
90 {
91  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
92 }
93 
94 /**
95  * \brief Set a thread flag.
96  */
97 void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag)
98 {
99  SC_ATOMIC_OR(tv->flags, flag);
100 }
101 
102 /**
103  * \brief Unset a thread flag.
104  */
105 void TmThreadsUnsetFlag(ThreadVars *tv, uint16_t flag)
106 {
107  SC_ATOMIC_AND(tv->flags, ~flag);
108 }
109 
110 /**
111  * \brief Separate run function so we can call it recursively.
112  *
113  * \todo Deal with post_pq for slots beyond the first.
114  */
116  TmSlot *slot)
117 {
118  TmEcode r;
119  TmSlot *s;
120  Packet *extra_p;
121 
122  for (s = slot; s != NULL; s = s->slot_next) {
123  TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
125 
126  if (unlikely(s->id == 0)) {
127  r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq);
128  } else {
129  r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, NULL);
130  }
131 
133 
134  /* handle error */
135  if (unlikely(r == TM_ECODE_FAILED)) {
136  /* Encountered error. Return packets to packetpool and return */
138 
142 
144  return TM_ECODE_FAILED;
145  }
146 
147  /* handle new packets */
148  while (s->slot_pre_pq.top != NULL) {
149  extra_p = PacketDequeue(&s->slot_pre_pq);
150  if (unlikely(extra_p == NULL))
151  continue;
152 
153  /* see if we need to process the packet */
154  if (s->slot_next != NULL) {
155  r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
156  if (unlikely(r == TM_ECODE_FAILED)) {
158 
162 
163  TmqhOutputPacketpool(tv, extra_p);
165  return TM_ECODE_FAILED;
166  }
167  }
168  tv->tmqh_out(tv, extra_p);
169  }
170  }
171 
172  return TM_ECODE_OK;
173 }
174 
175 #ifndef AFLFUZZ_PCAP_RUNMODE
176 
177 /** \internal
178  *
179  * \brief Process flow timeout packets
180  *
181  * Process flow timeout pseudo packets. During shutdown this loop
182  * is run until the flow engine kills the thread and the queue is
183  * empty.
184  */
185 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
186 {
187  TmSlot *fw_slot = NULL;
188  int r = TM_ECODE_OK;
189 
190  for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
191  if (slot->tm_id == TMM_FLOWWORKER) {
192  fw_slot = slot;
193  break;
194  }
195  }
196 
197  if (tv->stream_pq == NULL || fw_slot == NULL) {
198  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, fw_slot);
199  return r;
200  }
201 
202  SCLogDebug("flow end loop starting");
203  while (1) {
205  uint32_t len = tv->stream_pq->len;
207  if (len > 0) {
208  while (len--) {
210  Packet *p = PacketDequeue(tv->stream_pq);
212  if (likely(p)) {
213  if ((r = TmThreadsSlotProcessPkt(tv, fw_slot, p) != TM_ECODE_OK)) {
214  if (r == TM_ECODE_FAILED)
215  break;
216  }
217  }
218  }
219  } else {
220  if (TmThreadsCheckFlag(tv, THV_KILL)) {
221  break;
222  }
223  SleepUsec(1);
224  }
225  }
226  SCLogDebug("flow end loop complete");
227  StatsSyncCounters(tv);
228 
229  return r;
230 }
231 
232 /** \internal
233  * \brief check 'slot' pre_pq and post_pq at thread cleanup
234  * and dump detailed info about the state of the packets
235  * and threads if in a unexpected state.
236  */
237 static void CheckSlot(const TmSlot *slot)
238 {
239  if (slot->slot_pre_pq.len || slot->slot_post_pq.len) {
240  for (Packet *xp = slot->slot_pre_pq.top; xp != NULL; xp = xp->next) {
241  SCLogNotice("pre_pq: slot id %u slot tm_id %u pre_pq.len %u packet src %s",
242  slot->id, slot->tm_id, slot->slot_pre_pq.len, PktSrcToString(xp->pkt_src));
243  }
244  for (Packet *xp = slot->slot_post_pq.top; xp != NULL; xp = xp->next) {
245  SCLogNotice("post_pq: slot id %u slot tm_id %u post_pq.len %u packet src %s",
246  slot->id, slot->tm_id, slot->slot_post_pq.len, PktSrcToString(xp->pkt_src));
247  }
249  abort();
250  }
251 }
252 
253 /*
254 
255  pcap/nfq
256 
257  pkt read
258  callback
259  process_pkt
260 
261  pfring
262 
263  pkt read
264  process_pkt
265 
266  slot:
267  setup
268 
269  pkt_ack_loop(tv, slot_data)
270 
271  deinit
272 
273  process_pkt:
274  while(s)
275  run s;
276  queue;
277 
278  */
279 
280 static void *TmThreadsSlotPktAcqLoop(void *td)
281 {
282  ThreadVars *tv = (ThreadVars *)td;
283  TmSlot *s = tv->tm_slots;
284  char run = 1;
285  TmEcode r = TM_ECODE_OK;
286  TmSlot *slot = NULL;
287 
288  /* Set the thread name */
289  if (SCSetThreadName(tv->name) < 0) {
290  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
291  }
292 
293  if (tv->thread_setup_flags != 0)
295 
296  /* Drop the capabilities for this thread */
297  SCDropCaps(tv);
298 
299  PacketPoolInit();
300 
301  /* check if we are setup properly */
302  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
303  SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
304  " PktAcqLoop=%p, tmqh_in=%p,"
305  " tmqh_out=%p",
306  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
308  pthread_exit((void *) -1);
309  return NULL;
310  }
311 
312  for (slot = s; slot != NULL; slot = slot->slot_next) {
313  if (slot->SlotThreadInit != NULL) {
314  void *slot_data = NULL;
315  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
316  if (r != TM_ECODE_OK) {
317  if (r == TM_ECODE_DONE) {
318  EngineDone();
320  goto error;
321  } else {
323  goto error;
324  }
325  }
326  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
327  }
328  memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
329  SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
330  memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
331  SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
332 
333  /* get the 'pre qeueue' from module before the stream module */
334  if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
335  SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
336  tv->stream_pq = &slot->slot_post_pq;
337  /* if the stream module is the first, get the threads input queue */
338  } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
339  tv->stream_pq = &trans_q[tv->inq->id];
340  SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
341  }
342  }
343 
344  StatsSetupPrivate(tv);
345 
347 
348  while(run) {
349  if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
353  }
354 
355  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
356 
357  if (r == TM_ECODE_FAILED) {
359  run = 0;
360  }
362  run = 0;
363  }
364  if (r == TM_ECODE_DONE) {
365  run = 0;
366  }
367  }
368  StatsSyncCounters(tv);
369 
371 
372  /* process all pseudo packets the flow timeout may throw at us */
373  TmThreadTimeoutLoop(tv, s);
374 
377 
379 
380  for (slot = s; slot != NULL; slot = slot->slot_next) {
381  if (slot->SlotThreadExitPrintStats != NULL) {
382  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
383  }
384 
385  if (slot->SlotThreadDeinit != NULL) {
386  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
387  if (r != TM_ECODE_OK) {
389  goto error;
390  }
391  }
392  CheckSlot(slot);
393  }
394 
395  tv->stream_pq = NULL;
396  SCLogDebug("%s ending", tv->name);
398  pthread_exit((void *) 0);
399  return NULL;
400 
401 error:
402  tv->stream_pq = NULL;
403  pthread_exit((void *) -1);
404  return NULL;
405 }
406 
407 #endif /* NO AFLFUZZ_PCAP_RUNMODE */
408 
409 #ifdef AFLFUZZ_PCAP_RUNMODE
410 /** \brief simplified loop to speed up AFL
411  *
412  * The loop runs in the caller's thread. No separate thread.
413  */
414 static void *TmThreadsSlotPktAcqLoopAFL(void *td)
415 {
416  SCLogNotice("AFL mode starting");
417 
418  ThreadVars *tv = (ThreadVars *)td;
419  TmSlot *s = tv->tm_slots;
420  char run = 1;
421  TmEcode r = TM_ECODE_OK;
422  TmSlot *slot = NULL;
423 
424  PacketPoolInit();
425 
426  /* check if we are setup properly */
427  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
428  SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
429  " PktAcqLoop=%p, tmqh_in=%p,"
430  " tmqh_out=%p",
431  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
433  return NULL;
434  }
435 
436  for (slot = s; slot != NULL; slot = slot->slot_next) {
437  if (slot->SlotThreadInit != NULL) {
438  void *slot_data = NULL;
439  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
440  if (r != TM_ECODE_OK) {
441  if (r == TM_ECODE_DONE) {
442  EngineDone();
444  goto error;
445  } else {
447  goto error;
448  }
449  }
450  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
451  }
452  memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
453  SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
454  memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
455  SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
456 
457  /* get the 'pre qeueue' from module before the stream module */
458  if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
459  SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
460  tv->stream_pq = &slot->slot_post_pq;
461  /* if the stream module is the first, get the threads input queue */
462  } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
463  tv->stream_pq = &trans_q[tv->inq->id];
464  SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
465  }
466  }
467 
468  StatsSetupPrivate(tv);
469 
471 
472  while(run) {
473  /* run right away */
474 
475  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
476 
477  if (r == TM_ECODE_FAILED) {
479  run = 0;
480  }
482  run = 0;
483  }
484  if (r == TM_ECODE_DONE) {
485  run = 0;
486  }
487  }
488  StatsSyncCounters(tv);
489 
491 
493 
495 
496  for (slot = s; slot != NULL; slot = slot->slot_next) {
497  if (slot->SlotThreadExitPrintStats != NULL) {
498  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
499  }
500 
501  if (slot->SlotThreadDeinit != NULL) {
502  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
503  if (r != TM_ECODE_OK) {
505  goto error;
506  }
507  }
508 
509  CheckSlot(slot);
510  }
511 
512  tv->stream_pq = NULL;
513  SCLogDebug("%s ending", tv->name);
515  return NULL;
516 
517 error:
518  tv->stream_pq = NULL;
519  return NULL;
520 }
521 #endif
522 
523 /**
524  * \todo Only the first "slot" currently makes the "post_pq" available
525  * to the thread module.
526  */
527 static void *TmThreadsSlotVar(void *td)
528 {
529  ThreadVars *tv = (ThreadVars *)td;
530  TmSlot *s = (TmSlot *)tv->tm_slots;
531  Packet *p = NULL;
532  char run = 1;
533  TmEcode r = TM_ECODE_OK;
534 
536 
537  /* Set the thread name */
538  if (SCSetThreadName(tv->name) < 0) {
539  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
540  }
541 
542  if (tv->thread_setup_flags != 0)
544 
545  /* Drop the capabilities for this thread */
546  SCDropCaps(tv);
547 
548  /* check if we are setup properly */
549  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
551  pthread_exit((void *) -1);
552  return NULL;
553  }
554 
555  for (; s != NULL; s = s->slot_next) {
556  if (s->SlotThreadInit != NULL) {
557  void *slot_data = NULL;
558  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
559  if (r != TM_ECODE_OK) {
561  goto error;
562  }
563  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
564  }
565  memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
566  SCMutexInit(&s->slot_pre_pq.mutex_q, NULL);
567  memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
568  SCMutexInit(&s->slot_post_pq.mutex_q, NULL);
569 
570  /* special case: we need to access the stream queue
571  * from the flow timeout code */
572 
573  /* get the 'pre qeueue' from module before the stream module */
574  if (s->slot_next != NULL && (s->slot_next->tm_id == TMM_FLOWWORKER)) {
575  SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq);
576  tv->stream_pq = &s->slot_pre_pq;
577  /* if the stream module is the first, get the threads input queue */
578  } else if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
579  tv->stream_pq = &trans_q[tv->inq->id];
580  SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq);
581  }
582  }
583 
584  StatsSetupPrivate(tv);
585 
587 
588  s = (TmSlot *)tv->tm_slots;
589 
590  while (run) {
591  if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
595  }
596 
597  /* input a packet */
598  p = tv->tmqh_in(tv);
599 
600  if (p != NULL) {
601  /* run the thread module(s) */
602  r = TmThreadsSlotVarRun(tv, p, s);
603  if (r == TM_ECODE_FAILED) {
604  TmqhOutputPacketpool(tv, p);
606  break;
607  }
608 
609  /* output the packet */
610  tv->tmqh_out(tv, p);
611 
612  } /* if (p != NULL) */
613 
614  /* now handle the post_pq packets */
615  TmSlot *slot;
616  for (slot = s; slot != NULL; slot = slot->slot_next) {
617  if (slot->slot_post_pq.top != NULL) {
618  while (1) {
620  Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
622 
623  if (extra_p == NULL)
624  break;
625 
626  if (slot->slot_next != NULL) {
627  r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
628  if (r == TM_ECODE_FAILED) {
632 
633  TmqhOutputPacketpool(tv, extra_p);
635  break;
636  }
637  }
638  /* output the packet */
639  tv->tmqh_out(tv, extra_p);
640  } /* while */
641  } /* if */
642  } /* for */
643 
644  if (TmThreadsCheckFlag(tv, THV_KILL)) {
645  run = 0;
646  }
647  } /* while (run) */
648  StatsSyncCounters(tv);
649 
652 
654 
655  s = (TmSlot *)tv->tm_slots;
656 
657  for ( ; s != NULL; s = s->slot_next) {
658  if (s->SlotThreadExitPrintStats != NULL) {
659  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
660  }
661 
662  if (s->SlotThreadDeinit != NULL) {
663  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
664  if (r != TM_ECODE_OK) {
666  goto error;
667  }
668  }
669  CheckSlot(s);
670  }
671 
672  SCLogDebug("%s ending", tv->name);
673  tv->stream_pq = NULL;
675  pthread_exit((void *) 0);
676  return NULL;
677 
678 error:
679  tv->stream_pq = NULL;
680  pthread_exit((void *) -1);
681  return NULL;
682 }
683 
684 static void *TmThreadsManagement(void *td)
685 {
686  ThreadVars *tv = (ThreadVars *)td;
687  TmSlot *s = (TmSlot *)tv->tm_slots;
688  TmEcode r = TM_ECODE_OK;
689 
690  BUG_ON(s == NULL);
691 
692  /* Set the thread name */
693  if (SCSetThreadName(tv->name) < 0) {
694  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
695  }
696 
697  if (tv->thread_setup_flags != 0)
699 
700  /* Drop the capabilities for this thread */
701  SCDropCaps(tv);
702 
703  SCLogDebug("%s starting", tv->name);
704 
705  if (s->SlotThreadInit != NULL) {
706  void *slot_data = NULL;
707  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
708  if (r != TM_ECODE_OK) {
710  pthread_exit((void *) -1);
711  return NULL;
712  }
713  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
714  }
715  memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
716  memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
717 
718  StatsSetupPrivate(tv);
719 
721 
722  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
723  /* handle error */
724  if (r == TM_ECODE_FAILED) {
726  }
727 
728  if (TmThreadsCheckFlag(tv, THV_KILL)) {
729  StatsSyncCounters(tv);
730  }
731 
734 
735  if (s->SlotThreadExitPrintStats != NULL) {
736  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
737  }
738 
739  if (s->SlotThreadDeinit != NULL) {
740  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
741  if (r != TM_ECODE_OK) {
743  pthread_exit((void *) -1);
744  return NULL;
745  }
746  }
747 
749  pthread_exit((void *) 0);
750  return NULL;
751 }
752 
753 /**
754  * \brief We set the slot functions.
755  *
756  * \param tv Pointer to the TV to set the slot function for.
757  * \param name Name of the slot variant.
758  * \param fn_p Pointer to a custom slot function. Used only if slot variant
759  * "name" is "custom".
760  *
761  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
762  */
763 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
764 {
765  if (name == NULL) {
766  if (fn_p == NULL) {
767  printf("Both slot name and function pointer can't be NULL inside "
768  "TmThreadSetSlots\n");
769  goto error;
770  } else {
771  name = "custom";
772  }
773  }
774 
775  if (strcmp(name, "varslot") == 0) {
776  tv->tm_func = TmThreadsSlotVar;
777  } else if (strcmp(name, "pktacqloop") == 0) {
778 #ifndef AFLFUZZ_PCAP_RUNMODE
779  tv->tm_func = TmThreadsSlotPktAcqLoop;
780 #else
781  tv->tm_func = TmThreadsSlotPktAcqLoopAFL;
782 #endif
783  } else if (strcmp(name, "management") == 0) {
784  tv->tm_func = TmThreadsManagement;
785  } else if (strcmp(name, "command") == 0) {
786  tv->tm_func = TmThreadsManagement;
787  } else if (strcmp(name, "custom") == 0) {
788  if (fn_p == NULL)
789  goto error;
790  tv->tm_func = fn_p;
791  } else {
792  printf("Error: Slot \"%s\" not supported\n", name);
793  goto error;
794  }
795 
796  return TM_ECODE_OK;
797 
798 error:
799  return TM_ECODE_FAILED;
800 }
801 
803 {
805  for (int i = 0; i < TVT_MAX; i++) {
806  ThreadVars *tv = tv_root[i];
807  while (tv) {
808  TmSlot *slots = tv->tm_slots;
809  while (slots != NULL) {
810  if (slots == tm_slot) {
812  return tv;
813  }
814  slots = slots->slot_next;
815  }
816  tv = tv->next;
817  }
818  }
820  return NULL;
821 }
822 
823 /**
824  * \brief Appends a new entry to the slots.
825  *
826  * \param tv TV the slot is attached to.
827  * \param tm TM to append.
828  * \param data Data to be passed on to the slot init function.
829  *
830  * \retval The allocated TmSlot or NULL if there is an error
831  */
832 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
833 {
834  TmSlot *slot = SCMalloc(sizeof(TmSlot));
835  if (unlikely(slot == NULL))
836  return;
837  memset(slot, 0, sizeof(TmSlot));
838  SC_ATOMIC_INIT(slot->slot_data);
839  slot->tv = tv;
840  slot->SlotThreadInit = tm->ThreadInit;
841  slot->slot_initdata = data;
842  SC_ATOMIC_INIT(slot->SlotFunc);
843  (void)SC_ATOMIC_SET(slot->SlotFunc, tm->Func);
844  slot->PktAcqLoop = tm->PktAcqLoop;
845  slot->Management = tm->Management;
847  slot->SlotThreadDeinit = tm->ThreadDeinit;
848  /* we don't have to check for the return value "-1". We wouldn't have
849  * received a TM as arg, if it didn't exist */
850  slot->tm_id = TmModuleGetIDForTM(tm);
851 
852  tv->tmm_flags |= tm->flags;
853  tv->cap_flags |= tm->cap_flags;
854 
855  if (tv->tm_slots == NULL) {
856  tv->tm_slots = slot;
857  slot->id = 0;
858  } else {
859  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
860 
861  /* get the last slot */
862  for ( ; a != NULL; a = a->slot_next) {
863  b = a;
864  }
865  /* append the new slot */
866  if (b != NULL) {
867  b->slot_next = slot;
868  slot->id = b->id + 1;
869  }
870  }
871  return;
872 }
873 
874 /**
875  * \brief Returns the slot holding a TM with the particular tm_id.
876  *
877  * \param tm_id TM id of the TM whose slot has to be returned.
878  *
879  * \retval slots Pointer to the slot.
880  */
882 {
884  for (int i = 0; i < TVT_MAX; i++) {
885  ThreadVars *tv = tv_root[i];
886  while (tv) {
887  TmSlot *slots = tv->tm_slots;
888  while (slots != NULL) {
889  if (slots->tm_id == tm_id) {
891  return slots;
892  }
893  slots = slots->slot_next;
894  }
895  tv = tv->next;
896  }
897  }
899  return NULL;
900 }
901 
902 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
903 static int SetCPUAffinitySet(cpu_set_t *cs)
904 {
905 #if defined OS_FREEBSD
906  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
907  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
908 #elif OS_DARWIN
909  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
910  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
911 #else
912  pid_t tid = syscall(SYS_gettid);
913  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
914 #endif /* OS_FREEBSD */
915 
916  if (r != 0) {
917  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
918  strerror(errno));
919  return -1;
920  }
921 
922  return 0;
923 }
924 #endif
925 
926 
927 /**
928  * \brief Set the thread affinity on the calling thread.
929  *
930  * \param cpuid Id of the core/cpu to setup the affinity.
931  *
932  * \retval 0 If all goes well; -1 if something is wrong.
933  */
934 static int SetCPUAffinity(uint16_t cpuid)
935 {
936 #if defined __OpenBSD__ || defined sun
937  return 0;
938 #else
939  int cpu = (int)cpuid;
940 
941 #if defined OS_WIN32 || defined __CYGWIN__
942  DWORD cs = 1 << cpu;
943 
944  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
945  if (r != 0) {
946  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
947  strerror(errno));
948  return -1;
949  }
950  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
951  SCGetThreadIdLong(), cpu);
952 
953  return 0;
954 
955 #else
956  cpu_set_t cs;
957 
958  CPU_ZERO(&cs);
959  CPU_SET(cpu, &cs);
960  return SetCPUAffinitySet(&cs);
961 #endif /* windows */
962 #endif /* not supported */
963 }
964 
965 
966 /**
967  * \brief Set the thread options (thread priority).
968  *
969  * \param tv Pointer to the ThreadVars to setup the thread priority.
970  *
971  * \retval TM_ECODE_OK.
972  */
974 {
976  tv->thread_priority = prio;
977 
978  return TM_ECODE_OK;
979 }
980 
981 /**
982  * \brief Adjusting nice value for threads.
983  */
985 {
986  SCEnter();
987 #ifndef __CYGWIN__
988 #ifdef OS_WIN32
989  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
990  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for "
991  "thread %s: %s", tv->name, strerror(errno));
992  } else {
993  SCLogDebug("Priority set to %"PRId32" for thread %s",
994  tv->thread_priority, tv->name);
995  }
996 #else
997  int ret = nice(tv->thread_priority);
998  if (ret == -1) {
999  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value %d "
1000  "for thread %s: %s", tv->thread_priority, tv->name,
1001  strerror(errno));
1002  } else {
1003  SCLogDebug("Nice value set to %"PRId32" for thread %s",
1004  tv->thread_priority, tv->name);
1005  }
1006 #endif /* OS_WIN32 */
1007 #endif
1008  SCReturn;
1009 }
1010 
1011 
1012 /**
1013  * \brief Set the thread options (cpu affinity).
1014  *
1015  * \param tv pointer to the ThreadVars to setup the affinity.
1016  * \param cpu cpu on which affinity is set.
1017  *
1018  * \retval TM_ECODE_OK
1019  */
1021 {
1023  tv->cpu_affinity = cpu;
1024 
1025  return TM_ECODE_OK;
1026 }
1027 
1028 
1030 {
1032  return TM_ECODE_OK;
1033 
1034  if (type > MAX_CPU_SET) {
1035  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
1036  return TM_ECODE_FAILED;
1037  }
1038 
1040  tv->cpu_affinity = type;
1041 
1042  return TM_ECODE_OK;
1043 }
1044 
1046 {
1047  if (type >= MAX_CPU_SET) {
1048  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
1049  return 0;
1050  }
1051 
1053 }
1054 
1055 /**
1056  * \brief Set the thread options (cpu affinitythread).
1057  * Priority should be already set by pthread_create.
1058  *
1059  * \param tv pointer to the ThreadVars of the calling thread.
1060  */
1062 {
1064  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
1065  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
1066  SCGetThreadIdLong());
1067  SetCPUAffinity(tv->cpu_affinity);
1068  }
1069 
1070 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
1072  TmThreadSetPrio(tv);
1075  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
1076  int cpu = AffinityGetNextCPU(taf);
1077  SetCPUAffinity(cpu);
1078  /* If CPU is in a set overwrite the default thread prio */
1079  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
1080  tv->thread_priority = PRIO_LOW;
1081  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
1083  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
1084  tv->thread_priority = PRIO_HIGH;
1085  } else {
1086  tv->thread_priority = taf->prio;
1087  }
1088  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
1089  "%d, thread id %lu", tv->thread_priority,
1090  tv->name, cpu, SCGetThreadIdLong());
1091  } else {
1092  SetCPUAffinitySet(&taf->cpu_set);
1093  tv->thread_priority = taf->prio;
1094  SCLogPerf("Setting prio %d for thread \"%s\", "
1095  "thread id %lu", tv->thread_priority,
1096  tv->name, SCGetThreadIdLong());
1097  }
1098  TmThreadSetPrio(tv);
1099  }
1100 #endif
1101 
1102  return TM_ECODE_OK;
1103 }
1104 
1105 /**
1106  * \brief Creates and returns the TV instance for a new thread.
1107  *
1108  * \param name Name of this TV instance
1109  * \param inq_name Incoming queue name
1110  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1111  * \param outq_name Outgoing queue name
1112  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1113  * \param slots String representation for the slot function to be used
1114  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1115  * \param mucond Flag to indicate whether to initialize the condition
1116  * and the mutex variables for this newly created TV.
1117  *
1118  * \retval the newly created TV instance, or NULL on error
1119  */
1120 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
1121  const char *outq_name, const char *outqh_name, const char *slots,
1122  void * (*fn_p)(void *), int mucond)
1123 {
1124  ThreadVars *tv = NULL;
1125  Tmq *tmq = NULL;
1126  Tmqh *tmqh = NULL;
1127 
1128  SCLogDebug("creating thread \"%s\"...", name);
1129 
1130  /* XXX create separate function for this: allocate a thread container */
1131  tv = SCMalloc(sizeof(ThreadVars));
1132  if (unlikely(tv == NULL))
1133  goto error;
1134  memset(tv, 0, sizeof(ThreadVars));
1135 
1136  SC_ATOMIC_INIT(tv->flags);
1137  SCMutexInit(&tv->perf_public_ctx.m, NULL);
1138 
1139  strlcpy(tv->name, name, sizeof(tv->name));
1140 
1141  /* default state for every newly created thread */
1144 
1145  /* set the incoming queue */
1146  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
1147  SCLogDebug("inq_name \"%s\"", inq_name);
1148 
1149  tmq = TmqGetQueueByName(inq_name);
1150  if (tmq == NULL) {
1151  tmq = TmqCreateQueue(inq_name);
1152  if (tmq == NULL)
1153  goto error;
1154  }
1155  SCLogDebug("tmq %p", tmq);
1156 
1157  tv->inq = tmq;
1158  tv->inq->reader_cnt++;
1159  SCLogDebug("tv->inq %p", tv->inq);
1160  }
1161  if (inqh_name != NULL) {
1162  SCLogDebug("inqh_name \"%s\"", inqh_name);
1163 
1164  tmqh = TmqhGetQueueHandlerByName(inqh_name);
1165  if (tmqh == NULL)
1166  goto error;
1167 
1168  tv->tmqh_in = tmqh->InHandler;
1170  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
1171  }
1172 
1173  /* set the outgoing queue */
1174  if (outqh_name != NULL) {
1175  SCLogDebug("outqh_name \"%s\"", outqh_name);
1176 
1177  tmqh = TmqhGetQueueHandlerByName(outqh_name);
1178  if (tmqh == NULL)
1179  goto error;
1180 
1181  tv->tmqh_out = tmqh->OutHandler;
1182  tv->outqh_name = tmqh->name;
1183 
1184  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1185  SCLogDebug("outq_name \"%s\"", outq_name);
1186 
1187  if (tmqh->OutHandlerCtxSetup != NULL) {
1188  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1189  if (tv->outctx == NULL)
1190  goto error;
1191  tv->outq = NULL;
1192  } else {
1193  tmq = TmqGetQueueByName(outq_name);
1194  if (tmq == NULL) {
1195  tmq = TmqCreateQueue(outq_name);
1196  if (tmq == NULL)
1197  goto error;
1198  }
1199  SCLogDebug("tmq %p", tmq);
1200 
1201  tv->outq = tmq;
1202  tv->outctx = NULL;
1203  tv->outq->writer_cnt++;
1204  }
1205  }
1206  }
1207 
1208  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1209  goto error;
1210  }
1211 
1212  if (mucond != 0)
1213  TmThreadInitMC(tv);
1214 
1215  return tv;
1216 
1217 error:
1218  SCLogError(SC_ERR_THREAD_CREATE, "failed to setup a thread");
1219 
1220  if (tv != NULL)
1221  SCFree(tv);
1222  return NULL;
1223 }
1224 
1225 /**
1226  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1227  * This function doesn't support custom slots, and hence shouldn't be
1228  * supplied \"custom\" as its slot type. All PPT threads are created
1229  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1230  * conditional variables are not used to kill the thread.
1231  *
1232  * \param name Name of this TV instance
1233  * \param inq_name Incoming queue name
1234  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1235  * \param outq_name Outgoing queue name
1236  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1237  * \param slots String representation for the slot function to be used
1238  *
1239  * \retval the newly created TV instance, or NULL on error
1240  */
1241 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1242  const char *inqh_name, const char *outq_name,
1243  const char *outqh_name, const char *slots)
1244 {
1245  ThreadVars *tv = NULL;
1246 
1247  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1248  slots, NULL, 0);
1249 
1250  if (tv != NULL) {
1251  tv->type = TVT_PPT;
1252  tv->id = TmThreadsRegisterThread(tv, tv->type);
1253  }
1254 
1255 
1256  return tv;
1257 }
1258 
1259 /**
1260  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1261  * This function supports only custom slot functions and hence a
1262  * function pointer should be sent as an argument.
1263  *
1264  * \param name Name of this TV instance
1265  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1266  * \param mucond Flag to indicate whether to initialize the condition
1267  * and the mutex variables for this newly created TV.
1268  *
1269  * \retval the newly created TV instance, or NULL on error
1270  */
1271 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1272  int mucond)
1273 {
1274  ThreadVars *tv = NULL;
1275 
1276  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1277 
1278  if (tv != NULL) {
1279  tv->type = TVT_MGMT;
1280  tv->id = TmThreadsRegisterThread(tv, tv->type);
1282  }
1283 
1284  return tv;
1285 }
1286 
1287 /**
1288  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1289  * This function supports only custom slot functions and hence a
1290  * function pointer should be sent as an argument.
1291  *
1292  * \param name Name of this TV instance
1293  * \param module Name of TmModule with MANAGEMENT flag set.
1294  * \param mucond Flag to indicate whether to initialize the condition
1295  * and the mutex variables for this newly created TV.
1296  *
1297  * \retval the newly created TV instance, or NULL on error
1298  */
1299 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1300  int mucond)
1301 {
1302  ThreadVars *tv = NULL;
1303 
1304  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1305 
1306  if (tv != NULL) {
1307  tv->type = TVT_MGMT;
1308  tv->id = TmThreadsRegisterThread(tv, tv->type);
1310 
1311  TmModule *m = TmModuleGetByName(module);
1312  if (m) {
1313  TmSlotSetFuncAppend(tv, m, NULL);
1314  }
1315  }
1316 
1317  return tv;
1318 }
1319 
1320 /**
1321  * \brief Creates and returns the TV instance for a Command thread (CMD).
1322  * This function supports only custom slot functions and hence a
1323  * function pointer should be sent as an argument.
1324  *
1325  * \param name Name of this TV instance
1326  * \param module Name of TmModule with COMMAND flag set.
1327  * \param mucond Flag to indicate whether to initialize the condition
1328  * and the mutex variables for this newly created TV.
1329  *
1330  * \retval the newly created TV instance, or NULL on error
1331  */
1332 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1333  int mucond)
1334 {
1335  ThreadVars *tv = NULL;
1336 
1337  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1338 
1339  if (tv != NULL) {
1340  tv->type = TVT_CMD;
1341  tv->id = TmThreadsRegisterThread(tv, tv->type);
1343 
1344  TmModule *m = TmModuleGetByName(module);
1345  if (m) {
1346  TmSlotSetFuncAppend(tv, m, NULL);
1347  }
1348  }
1349 
1350  return tv;
1351 }
1352 
1353 /**
1354  * \brief Appends this TV to tv_root based on its type
1355  *
1356  * \param type holds the type this TV belongs to.
1357  */
1359 {
1361 
1362  if (tv_root[type] == NULL) {
1363  tv_root[type] = tv;
1364  tv->next = NULL;
1365  tv->prev = NULL;
1366 
1368 
1369  return;
1370  }
1371 
1372  ThreadVars *t = tv_root[type];
1373 
1374  while (t) {
1375  if (t->next == NULL) {
1376  t->next = tv;
1377  tv->prev = t;
1378  tv->next = NULL;
1379  break;
1380  }
1381 
1382  t = t->next;
1383  }
1384 
1386 
1387  return;
1388 }
1389 
1390 /**
1391  * \brief Removes this TV from tv_root based on its type
1392  *
1393  * \param tv The tv instance to remove from the global tv list.
1394  * \param type Holds the type this TV belongs to.
1395  */
1397 {
1399 
1400  if (tv_root[type] == NULL) {
1402 
1403  return;
1404  }
1405 
1406  ThreadVars *t = tv_root[type];
1407  while (t != tv) {
1408  t = t->next;
1409  }
1410 
1411  if (t != NULL) {
1412  if (t->prev != NULL)
1413  t->prev->next = t->next;
1414  if (t->next != NULL)
1415  t->next->prev = t->prev;
1416 
1417  if (t == tv_root[type])
1418  tv_root[type] = t->next;;
1419  }
1420 
1422 
1423  return;
1424 }
1425 
1426 static bool ThreadStillHasPackets(ThreadVars *tv)
1427 {
1428  if (tv->inq != NULL) {
1429  /* we wait till we dry out all the inq packets, before we
1430  * kill this thread. Do note that you should have disabled
1431  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1432  if (!(strlen(tv->inq->name) == strlen("packetpool") &&
1433  strcasecmp(tv->inq->name, "packetpool") == 0)) {
1434  PacketQueue *q = &trans_q[tv->inq->id];
1435  SCMutexLock(&q->mutex_q);
1436  uint32_t len = q->len;
1437  SCMutexUnlock(&q->mutex_q);
1438  if (len != 0) {
1439  return true;
1440  }
1441  }
1442  }
1443 
1444  if (tv->stream_pq != NULL) {
1445  SCMutexLock(&tv->stream_pq->mutex_q);
1446  uint32_t len = tv->stream_pq->len;
1448 
1449  if (len != 0) {
1450  return true;
1451  }
1452  }
1453  return false;
1454 }
1455 
1456 /**
1457  * \brief Kill a thread.
1458  *
1459  * \param tv A ThreadVars instance corresponding to the thread that has to be
1460  * killed.
1461  *
1462  * \retval r 1 killed succesfully
1463  * 0 not yet ready, needs another look
1464  */
1465 static int TmThreadKillThread(ThreadVars *tv)
1466 {
1467  int i = 0;
1468 
1469  BUG_ON(tv == NULL);
1470 
1471  /* kill only once :) */
1472  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1473  return 1;
1474  }
1475 
1476  /* set the thread flag informing the thread that it needs to be
1477  * terminated */
1480 
1481  /* to be sure, signal more */
1482  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1483  if (tv->InShutdownHandler != NULL) {
1484  tv->InShutdownHandler(tv);
1485  }
1486  if (tv->inq != NULL) {
1487  for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1488  SCCondSignal(&trans_q[tv->inq->id].cond_q);
1489  }
1490  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1491  }
1492 
1493  if (tv->ctrl_cond != NULL ) {
1494  pthread_cond_broadcast(tv->ctrl_cond);
1495  }
1496  return 0;
1497  }
1498 
1499  if (tv->outctx != NULL) {
1501  if (tmqh == NULL)
1502  BUG_ON(1);
1503 
1504  if (tmqh->OutHandlerCtxFree != NULL) {
1505  tmqh->OutHandlerCtxFree(tv->outctx);
1506  tv->outctx = NULL;
1507  }
1508  }
1509 
1510  /* join it and flag it as dead */
1511  pthread_join(tv->t, NULL);
1512  SCLogDebug("thread %s stopped", tv->name);
1514  return 1;
1515 }
1516 
1517 /** \internal
1518  *
1519  * \brief make sure that all packet threads are done processing their
1520  * in-flight packets, including 'injected' flow packets.
1521  */
1522 static void TmThreadDrainPacketThreads(void)
1523 {
1524  ThreadVars *tv = NULL;
1525  struct timeval start_ts;
1526  struct timeval cur_ts;
1527  gettimeofday(&start_ts, NULL);
1528 
1529 again:
1530  gettimeofday(&cur_ts, NULL);
1531  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1532  SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads "
1533  "to process their packets in time");
1534  return;
1535  }
1536 
1538 
1539  /* all receive threads are part of packet processing threads */
1540  tv = tv_root[TVT_PPT];
1541  while (tv) {
1542  if (ThreadStillHasPackets(tv)) {
1543  /* we wait till we dry out all the inq packets, before we
1544  * kill this thread. Do note that you should have disabled
1545  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1547 
1548  /* sleep outside lock */
1549  SleepMsec(1);
1550  goto again;
1551  }
1552  tv = tv->next;
1553  }
1554 
1556  return;
1557 }
1558 
1559 /**
1560  * \brief Disable all threads having the specified TMs.
1561  *
1562  * Breaks out of the packet acquisition loop, and bumps
1563  * into the 'flow loop', where it will process packets
1564  * from the flow engine's shutdown handling.
1565  */
1567 {
1568  ThreadVars *tv = NULL;
1569  struct timeval start_ts;
1570  struct timeval cur_ts;
1571  gettimeofday(&start_ts, NULL);
1572 
1573 again:
1574  gettimeofday(&cur_ts, NULL);
1575  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1576  FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1577  "thread - \"%s\". Killing engine", tv->name);
1578  }
1579 
1581 
1582  /* all receive threads are part of packet processing threads */
1583  tv = tv_root[TVT_PPT];
1584 
1585  /* we do have to keep in mind that TVs are arranged in the order
1586  * right from receive to log. The moment we fail to find a
1587  * receive TM amongst the slots in a tv, it indicates we are done
1588  * with all receive threads */
1589  while (tv) {
1590  int disable = 0;
1591  TmModule *tm = NULL;
1592  /* obtain the slots for this TV */
1593  TmSlot *slots = tv->tm_slots;
1594  while (slots != NULL) {
1595  tm = TmModuleGetById(slots->tm_id);
1596 
1597  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1598  disable = 1;
1599  break;
1600  }
1601 
1602  slots = slots->slot_next;
1603  continue;
1604  }
1605 
1606  if (disable) {
1607  if (ThreadStillHasPackets(tv)) {
1608  /* we wait till we dry out all the inq packets, before we
1609  * kill this thread. Do note that you should have disabled
1610  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1612  /* don't sleep while holding a lock */
1613  SleepMsec(1);
1614  goto again;
1615  }
1616 
1617  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1618  if (tm && tm->PktAcqBreakLoop != NULL) {
1619  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1620  }
1622 
1623  if (tv->inq != NULL) {
1624  int i;
1625  for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1626  SCCondSignal(&trans_q[tv->inq->id].cond_q);
1627  }
1628  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1629  }
1630 
1631  /* wait for it to enter the 'flow loop' stage */
1632  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1634 
1635  SleepMsec(1);
1636  goto again;
1637  }
1638  }
1639 
1640  tv = tv->next;
1641  }
1642 
1644 
1645  /* finally wait for all packet threads to have
1646  * processed all of their 'live' packets so we
1647  * don't process the last live packets together
1648  * with FFR packets */
1649  TmThreadDrainPacketThreads();
1650  return;
1651 }
1652 
1653 static void TmThreadDebugValidateNoMorePackets(void)
1654 {
1655 #ifdef DEBUG_VALIDATION
1657  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
1658  if (ThreadStillHasPackets(tv)) {
1661  abort();
1662  }
1663  }
1665 #endif
1666 }
1667 
1668 /**
1669  * \brief Disable all threads having the specified TMs.
1670  */
1672 {
1673  ThreadVars *tv = NULL;
1674  struct timeval start_ts;
1675  struct timeval cur_ts;
1676 
1677  /* first drain all packet threads of their packets */
1678  TmThreadDrainPacketThreads();
1679  TmThreadDebugValidateNoMorePackets();
1680 
1681  gettimeofday(&start_ts, NULL);
1682 again:
1683  gettimeofday(&cur_ts, NULL);
1684  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1685  FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1686  "thread - \"%s\". Killing engine",
1687  tv ? tv->name : "<unknown>");
1688  }
1689 
1691 
1692  /* all receive threads are part of packet processing threads */
1693  tv = tv_root[TVT_PPT];
1694 
1695  /* we do have to keep in mind that TVs are arranged in the order
1696  * right from receive to log. The moment we fail to find a
1697  * receive TM amongst the slots in a tv, it indicates we are done
1698  * with all receive threads */
1699  while (tv) {
1700  /* we found our receive TV. Send it a KILL signal. This is all
1701  * we need to do to kill receive threads */
1703 
1704  if (tv->inq != NULL) {
1705  int i;
1706  for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1707  SCCondSignal(&trans_q[tv->inq->id].cond_q);
1708  }
1709  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1710  }
1711 
1712  while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
1714 
1715  SleepMsec(1);
1716  goto again;
1717  }
1718 
1719  tv = tv->next;
1720  }
1721 
1723 
1724  return;
1725 }
1726 
1728 {
1729  ThreadVars *tv = NULL;
1730  TmSlot *slots = NULL;
1731 
1733 
1734  /* all receive threads are part of packet processing threads */
1735  tv = tv_root[TVT_PPT];
1736 
1737  while (tv) {
1738  slots = tv->tm_slots;
1739 
1740  while (slots != NULL) {
1741  TmModule *tm = TmModuleGetById(slots->tm_id);
1742 
1743  char *found = strstr(tm->name, tm_name);
1744  if (found != NULL)
1745  goto end;
1746 
1747  slots = slots->slot_next;
1748  }
1749 
1750  tv = tv->next;
1751  }
1752 
1753  end:
1755  return slots;
1756 }
1757 
1758 #define MIN_WAIT_TIME 100
1759 #define MAX_WAIT_TIME 999999
1761 {
1762  ThreadVars *tv = NULL;
1763  unsigned int sleep_usec = MIN_WAIT_TIME;
1764 
1765  BUG_ON((family < 0) || (family >= TVT_MAX));
1766 
1767 again:
1769  tv = tv_root[family];
1770 
1771  while (tv) {
1772  int r = TmThreadKillThread(tv);
1773  if (r == 0) {
1775  SleepUsec(sleep_usec);
1776  sleep_usec *= 2; /* slowly back off */
1777  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1778  goto again;
1779  }
1780  sleep_usec = MIN_WAIT_TIME; /* reset */
1781 
1782  tv = tv->next;
1783  }
1785 }
1786 #undef MIN_WAIT_TIME
1787 #undef MAX_WAIT_TIME
1788 
1790 {
1791  int i = 0;
1792 
1793  for (i = 0; i < TVT_MAX; i++) {
1795  }
1796 
1797  return;
1798 }
1799 
1800 static void TmThreadFree(ThreadVars *tv)
1801 {
1802  TmSlot *s;
1803  TmSlot *ps;
1804  if (tv == NULL)
1805  return;
1806 
1807  SCLogDebug("Freeing thread '%s'.", tv->name);
1808 
1809  StatsThreadCleanup(tv);
1810 
1811  TmThreadDeinitMC(tv);
1812 
1813  if (tv->thread_group_name) {
1815  }
1816 
1817  if (tv->printable_name) {
1818  SCFree(tv->printable_name);
1819  }
1820 
1821  s = (TmSlot *)tv->tm_slots;
1822  while (s) {
1823  ps = s;
1824  s = s->slot_next;
1825  SCFree(ps);
1826  }
1827 
1829  SCFree(tv);
1830 }
1831 
1832 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1833 {
1834  char *thread_group_name = NULL;
1835 
1836  if (name == NULL)
1837  return;
1838 
1839  if (tv == NULL)
1840  return;
1841 
1842  thread_group_name = SCStrdup(name);
1843  if (unlikely(thread_group_name == NULL)) {
1844  SCLogError(SC_ERR_RUNMODE, "error allocating memory");
1845  return;
1846  }
1847  tv->thread_group_name = thread_group_name;
1848 }
1849 
1851 {
1852  ThreadVars *tv = NULL;
1853  ThreadVars *ptv = NULL;
1854 
1855  if ((family < 0) || (family >= TVT_MAX))
1856  return;
1857 
1859  tv = tv_root[family];
1860 
1861  while (tv) {
1862  ptv = tv;
1863  tv = tv->next;
1864  TmThreadFree(ptv);
1865  }
1866  tv_root[family] = NULL;
1868 }
1869 
1870 /**
1871  * \brief Spawns a thread associated with the ThreadVars instance tv
1872  *
1873  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1874  */
1876 {
1877  pthread_attr_t attr;
1878  if (tv->tm_func == NULL) {
1879  printf("ERROR: no thread function set\n");
1880  return TM_ECODE_FAILED;
1881  }
1882 
1883  /* Initialize and set thread detached attribute */
1884  pthread_attr_init(&attr);
1885 
1886  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1887 
1888  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1889  if (rc) {
1890  printf("ERROR; return code from pthread_create() is %" PRId32 "\n", rc);
1891  return TM_ECODE_FAILED;
1892  }
1893 
1895 
1896  TmThreadAppend(tv, tv->type);
1897  return TM_ECODE_OK;
1898 }
1899 
1900 /**
1901  * \brief Sets the thread flags for a thread instance(tv)
1902  *
1903  * \param tv Pointer to the thread instance for which the flag has to be set
1904  * \param flags Holds the thread state this thread instance has to be set to
1905  */
1906 #if 0
1907 void TmThreadSetFlags(ThreadVars *tv, uint8_t flags)
1908 {
1909  if (tv != NULL)
1910  tv->flags = flags;
1911 
1912  return;
1913 }
1914 #endif
1915 
1916 /**
1917  * \brief Initializes the mutex and condition variables for this TV
1918  *
1919  * It can be used by a thread to control a wait loop that can also be
1920  * influenced by other threads.
1921  *
1922  * \param tv Pointer to a TV instance
1923  */
1925 {
1926  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1927  SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. "
1928  "Exiting...");
1929  exit(EXIT_FAILURE);
1930  }
1931 
1932  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1933  printf("Error initializing the tv->m mutex\n");
1934  exit(EXIT_FAILURE);
1935  }
1936 
1937  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1938  SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. "
1939  "Exiting...");
1940  exit(EXIT_FAILURE);
1941  }
1942 
1943  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1944  SCLogError(SC_ERR_FATAL, "Error initializing the tv->cond condition "
1945  "variable");
1946  exit(EXIT_FAILURE);
1947  }
1948 
1949  return;
1950 }
1951 
1952 static void TmThreadDeinitMC(ThreadVars *tv)
1953 {
1954  if (tv->ctrl_mutex) {
1956  SCFree(tv->ctrl_mutex);
1957  }
1958  if (tv->ctrl_cond) {
1960  SCFree(tv->ctrl_cond);
1961  }
1962  return;
1963 }
1964 
1965 /**
1966  * \brief Tests if the thread represented in the arg has been unpaused or not.
1967  *
1968  * The function would return if the thread tv has been unpaused or if the
1969  * kill flag for the thread has been set.
1970  *
1971  * \param tv Pointer to the TV instance.
1972  */
1974 {
1975  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1976  SleepUsec(100);
1977 
1978  if (TmThreadsCheckFlag(tv, THV_KILL))
1979  break;
1980  }
1981 
1982  return;
1983 }
1984 
1985 /**
1986  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1987  * the kill flag has been set or not on the thread.
1988  *
1989  * \param tv Pointer to the TV instance.
1990  */
1991 void TmThreadWaitForFlag(ThreadVars *tv, uint16_t flags)
1992 {
1993  while (!TmThreadsCheckFlag(tv, flags)) {
1994  SleepUsec(100);
1995  }
1996 
1997  return;
1998 }
1999 
2000 /**
2001  * \brief Unpauses a thread
2002  *
2003  * \param tv Pointer to a TV instance that has to be unpaused
2004  */
2006 {
2008 
2009  return;
2010 }
2011 
2012 /**
2013  * \brief Unpauses all threads present in tv_root
2014  */
2016 {
2017  ThreadVars *tv = NULL;
2018  int i = 0;
2019 
2021  for (i = 0; i < TVT_MAX; i++) {
2022  tv = tv_root[i];
2023  while (tv != NULL) {
2024  TmThreadContinue(tv);
2025  tv = tv->next;
2026  }
2027  }
2029 
2030  return;
2031 }
2032 
2033 /**
2034  * \brief Pauses a thread
2035  *
2036  * \param tv Pointer to a TV instance that has to be paused
2037  */
2039 {
2041 
2042  return;
2043 }
2044 
2045 /**
2046  * \brief Pauses all threads present in tv_root
2047  */
2049 {
2050  ThreadVars *tv = NULL;
2051  int i = 0;
2052 
2054 
2056  for (i = 0; i < TVT_MAX; i++) {
2057  tv = tv_root[i];
2058  while (tv != NULL) {
2059  TmThreadPause(tv);
2060  tv = tv->next;
2061  }
2062  }
2064 
2065  return;
2066 }
2067 
2068 /**
2069  * \brief Used to check the thread for certain conditions of failure.
2070  */
2072 {
2073  ThreadVars *tv = NULL;
2074  int i = 0;
2075 
2077  for (i = 0; i < TVT_MAX; i++) {
2078  tv = tv_root[i];
2079 
2080  while (tv) {
2081  if (TmThreadsCheckFlag(tv, THV_FAILED)) {
2082  FatalError(SC_ERR_FATAL, "thread %s failed", tv->name);
2083  }
2084  tv = tv->next;
2085  }
2086  }
2088  return;
2089 }
2090 
2091 /**
2092  * \brief Used to check if all threads have finished their initialization. On
2093  * finding an un-initialized thread, it waits till that thread completes
2094  * its initialization, before proceeding to the next thread.
2095  *
2096  * \retval TM_ECODE_OK all initialized properly
2097  * \retval TM_ECODE_FAILED failure
2098  */
2100 {
2101  ThreadVars *tv = NULL;
2102  int i = 0;
2103  uint16_t mgt_num = 0;
2104  uint16_t ppt_num = 0;
2105 
2106  struct timeval start_ts;
2107  struct timeval cur_ts;
2108  gettimeofday(&start_ts, NULL);
2109 
2110 again:
2112  for (i = 0; i < TVT_MAX; i++) {
2113  tv = tv_root[i];
2114  while (tv != NULL) {
2115  if (TmThreadsCheckFlag(tv, (THV_CLOSED|THV_DEAD))) {
2117 
2118  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
2119  "initialize: flags %04x", tv->name,
2120  SC_ATOMIC_GET(tv->flags));
2121  return TM_ECODE_FAILED;
2122  }
2123 
2124  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
2126 
2127  gettimeofday(&cur_ts, NULL);
2128  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
2129  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
2130  "initialize in time: flags %04x", tv->name,
2131  SC_ATOMIC_GET(tv->flags));
2132  return TM_ECODE_FAILED;
2133  }
2134 
2135  /* sleep a little to give the thread some
2136  * time to finish initialization */
2137  SleepUsec(100);
2138  goto again;
2139  }
2140 
2141  if (TmThreadsCheckFlag(tv, THV_FAILED)) {
2143  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
2144  "initialize.", tv->name);
2145  return TM_ECODE_FAILED;
2146  }
2147  if (TmThreadsCheckFlag(tv, THV_CLOSED)) {
2149  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" closed on "
2150  "initialization.", tv->name);
2151  return TM_ECODE_FAILED;
2152  }
2153 
2154  if (i == TVT_MGMT)
2155  mgt_num++;
2156  else if (i == TVT_PPT)
2157  ppt_num++;
2158 
2159  tv = tv->next;
2160  }
2161  }
2163 
2164  SCLogNotice("all %"PRIu16" packet processing threads, %"PRIu16" management "
2165  "threads initialized, engine started.", ppt_num, mgt_num);
2166 
2167  return TM_ECODE_OK;
2168 }
2169 
2170 /**
2171  * \brief Returns the TV for the calling thread.
2172  *
2173  * \retval tv Pointer to the ThreadVars instance for the calling thread;
2174  * NULL on no match
2175  */
2177 {
2178  pthread_t self = pthread_self();
2179  ThreadVars *tv = NULL;
2180  int i = 0;
2181 
2183 
2184  for (i = 0; i < TVT_MAX; i++) {
2185  tv = tv_root[i];
2186  while (tv) {
2187  if (pthread_equal(self, tv->t)) {
2189  return tv;
2190  }
2191  tv = tv->next;
2192  }
2193  }
2194 
2196 
2197  return NULL;
2198 }
2199 
2200 /**
2201  * \brief returns a count of all the threads that match the flag
2202  */
2203 uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
2204 {
2205  ThreadVars *tv = NULL;
2206  int i = 0;
2207  uint32_t cnt = 0;
2208 
2210  for (i = 0; i < TVT_MAX; i++) {
2211  tv = tv_root[i];
2212  while (tv != NULL) {
2213  if ((tv->tmm_flags & flags) == flags)
2214  cnt++;
2215 
2216  tv = tv->next;
2217  }
2218  }
2220  return cnt;
2221 }
2222 
2223 static void TmThreadDoDumpSlots(const ThreadVars *tv)
2224 {
2225  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
2227  SCLogNotice("tv %p: -> slot %p id %d tm_id %d name %s %s",
2228  tv, s, s->id, s->tm_id, m->name, (tv->type == 0 && tv->stream_pq == &s->slot_post_pq) ? "<==== stream_pq" : "");
2229  if (tv->type == 0 && tv->stream_pq == &s->slot_pre_pq) {
2230  SCLogNotice("tv %p: -> slot %p/%d holds stream_pq %p IN PRE_PQ SUPER WEIRD", tv, s, s->id, tv->stream_pq);
2231  }
2232  for (Packet *xp = s->slot_pre_pq.top; xp != NULL; xp = xp->next) {
2233  SCLogNotice("tv %p: ==> pre_pq: slot id %u slot tm_id %u pre_pq.len %u packet src %s",
2234  tv, s->id, s->tm_id, s->slot_pre_pq.len, PktSrcToString(xp->pkt_src));
2235  }
2236  for (Packet *xp = s->slot_post_pq.top; xp != NULL; xp = xp->next) {
2237  SCLogNotice("tv %p: ==> post_pq: slot id %u slot tm_id %u post_pq.len %u packet src %s",
2238  tv, s->id, s->tm_id, s->slot_post_pq.len, PktSrcToString(xp->pkt_src));
2239  }
2240  }
2241 }
2242 
2244 {
2246  for (int i = 0; i < TVT_MAX; i++) {
2247  ThreadVars *tv = tv_root[i];
2248  while (tv != NULL) {
2249  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2250  SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
2251  tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
2252  if (tv->inq && tv->stream_pq == &trans_q[tv->inq->id]) {
2253  SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
2254  }
2255  TmThreadDoDumpSlots(tv);
2256  tv = tv->next;
2257  }
2258  }
2261 }
2262 
2263 typedef struct Thread_ {
2264  ThreadVars *tv; /**< threadvars structure */
2265  const char *name;
2266  int type;
2267  int in_use; /**< bool to indicate this is in use */
2268 
2269  struct timeval ts; /**< current time of this thread (offline mode) */
2270 } Thread;
2271 
2272 typedef struct Threads_ {
2273  Thread *threads;
2276 } Threads;
2277 
2278 static Threads thread_store = { NULL, 0, 0 };
2279 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2280 
2282 {
2283  Thread *t;
2284  size_t s;
2285 
2286  SCMutexLock(&thread_store_lock);
2287 
2288  for (s = 0; s < thread_store.threads_size; s++) {
2289  t = &thread_store.threads[s];
2290  if (t == NULL || t->in_use == 0)
2291  continue;
2292 
2293  SCLogNotice("Thread %"PRIuMAX", %s type %d, tv %p in_use %d",
2294  (uintmax_t)s+1, t->name, t->type, t->tv, t->in_use);
2295  if (t->tv) {
2296  ThreadVars *tv = t->tv;
2297  const uint32_t flags = SC_ATOMIC_GET(tv->flags);
2298  SCLogNotice("tv %p type %u name %s tmm_flags %02X flags %X",
2299  tv, tv->type, tv->name, tv->tmm_flags, flags);
2300  }
2301  }
2302 
2303  SCMutexUnlock(&thread_store_lock);
2304 }
2305 
2306 #define STEP 32
2307 /**
2308  * \retval id thread id, or 0 if not found
2309  */
2311 {
2312  SCMutexLock(&thread_store_lock);
2313  if (thread_store.threads == NULL) {
2314  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2315  BUG_ON(thread_store.threads == NULL);
2316  thread_store.threads_size = STEP;
2317  }
2318 
2319  size_t s;
2320  for (s = 0; s < thread_store.threads_size; s++) {
2321  if (thread_store.threads[s].in_use == 0) {
2322  Thread *t = &thread_store.threads[s];
2323  t->name = tv->name;
2324  t->type = type;
2325  t->tv = tv;
2326  t->in_use = 1;
2327 
2328  SCMutexUnlock(&thread_store_lock);
2329  return (int)(s+1);
2330  }
2331  }
2332 
2333  /* if we get here the array is completely filled */
2334  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2335  BUG_ON(newmem == NULL);
2336  thread_store.threads = newmem;
2337  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2338 
2339  Thread *t = &thread_store.threads[thread_store.threads_size];
2340  t->name = tv->name;
2341  t->type = type;
2342  t->tv = tv;
2343  t->in_use = 1;
2344 
2345  s = thread_store.threads_size;
2346  thread_store.threads_size += STEP;
2347 
2348  SCMutexUnlock(&thread_store_lock);
2349  return (int)(s+1);
2350 }
2351 #undef STEP
2352 
2353 void TmThreadsUnregisterThread(const int id)
2354 {
2355  SCMutexLock(&thread_store_lock);
2356  if (id <= 0 || id > (int)thread_store.threads_size) {
2357  SCMutexUnlock(&thread_store_lock);
2358  return;
2359  }
2360 
2361  /* id is one higher than index */
2362  int idx = id - 1;
2363 
2364  /* reset thread_id, which serves as clearing the record */
2365  thread_store.threads[idx].in_use = 0;
2366 
2367  /* check if we have at least one registered thread left */
2368  size_t s;
2369  for (s = 0; s < thread_store.threads_size; s++) {
2370  Thread *t = &thread_store.threads[s];
2371  if (t->in_use == 1) {
2372  goto end;
2373  }
2374  }
2375 
2376  /* if we get here no threads are registered */
2377  SCFree(thread_store.threads);
2378  thread_store.threads = NULL;
2379  thread_store.threads_size = 0;
2380  thread_store.threads_cnt = 0;
2381 
2382 end:
2383  SCMutexUnlock(&thread_store_lock);
2384 }
2385 
2386 void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
2387 {
2388  SCMutexLock(&thread_store_lock);
2389  if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2390  SCMutexUnlock(&thread_store_lock);
2391  return;
2392  }
2393 
2394  int idx = id - 1;
2395  Thread *t = &thread_store.threads[idx];
2396  t->ts.tv_sec = ts->tv_sec;
2397  t->ts.tv_usec = ts->tv_usec;
2398  SCMutexUnlock(&thread_store_lock);
2399 }
2400 
2401 #define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec) // XXX unify with flow-util.h
2402 void TmreadsGetMinimalTimestamp(struct timeval *ts)
2403 {
2404  struct timeval local, nullts;
2405  memset(&local, 0, sizeof(local));
2406  memset(&nullts, 0, sizeof(nullts));
2407  int set = 0;
2408  size_t s;
2409 
2410  SCMutexLock(&thread_store_lock);
2411  for (s = 0; s < thread_store.threads_size; s++) {
2412  Thread *t = &thread_store.threads[s];
2413  if (t == NULL || t->in_use == 0)
2414  continue;
2415  if (!(timercmp(&t->ts, &nullts, ==))) {
2416  if (!set) {
2417  local.tv_sec = t->ts.tv_sec;
2418  local.tv_usec = t->ts.tv_usec;
2419  set = 1;
2420  } else {
2421  if (timercmp(&t->ts, &local, <)) {
2422  COPY_TIMESTAMP(&t->ts, &local);
2423  }
2424  }
2425  }
2426  }
2427  SCMutexUnlock(&thread_store_lock);
2428  COPY_TIMESTAMP(&local, ts);
2429  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2430 }
2431 #undef COPY_TIMESTAMP
2432 
2433 /**
2434  * \retval r 1 if packet was accepted, 0 otherwise
2435  * \note if packet was not accepted, it's still the responsibility
2436  * of the caller.
2437  */
2438 int TmThreadsInjectPacketsById(Packet **packets, const int id)
2439 {
2440  if (id <= 0 || id > (int)thread_store.threads_size)
2441  return 0;
2442 
2443  int idx = id - 1;
2444 
2445  Thread *t = &thread_store.threads[idx];
2446  ThreadVars *tv = t->tv;
2447 
2448  if (tv == NULL || tv->stream_pq == NULL)
2449  return 0;
2450 
2451  SCMutexLock(&tv->stream_pq->mutex_q);
2452  while (*packets != NULL) {
2453  PacketEnqueue(tv->stream_pq, *packets);
2454  packets++;
2455  }
2457 
2458  /* wake up listening thread(s) if necessary */
2459  if (tv->inq != NULL) {
2460  SCCondSignal(&trans_q[tv->inq->id].cond_q);
2461  }
2462  return 1;
2463 }
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value from our atomic variable.
Definition: util-atomic.h:154
Packet *(* InHandler)(ThreadVars *)
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:88
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:832
#define THREAD_SET_PRIORITY
Definition: threadvars.h:117
const char * name
#define SCMutex
#define SCDropCaps(...)
Definition: util-privs.h:37
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:56
uint16_t flags
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:442
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:1029
#define SCCtrlMutexDestroy
#define SCLogDebug(...)
Definition: util-debug.h:335
void TmThreadSetFlags(ThreadVars *, uint8_t)
uint8_t type
Definition: threadvars.h:92
int AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
__thread uint64_t rww_lock_contention
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:1045
uint8_t cap_flags
Definition: tm-modules.h:67
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:2071
void TmThreadPause(ThreadVars *tv)
Pauses a thread.
Definition: tm-threads.c:2038
SCCondT cond_q
Definition: decode.h:629
uint16_t writer_cnt
Definition: tm-queues.h:31
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:64
int thread_priority
Definition: threadvars.h:96
#define BUG_ON(x)
#define THV_KILL_PKTACQ
Definition: threadvars.h:47
uint8_t flags
Definition: tm-modules.h:70
#define SCSetThreadName(n)
Definition: threads.h:299
#define THV_RUNNING_DONE
Definition: threadvars.h:45
#define STEP
Definition: tm-threads.c:2306
struct ThreadVars_ * prev
Definition: threadvars.h:112
void TmThreadDisablePacketThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1671
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1271
int TmThreadsInjectPacketsById(Packet **packets, const int id)
Definition: tm-threads.c:2438
#define unlikely(expr)
Definition: util-optimize.h:35
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1297
TmEcode(* Func)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *)
Definition: tm-modules.h:52
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:973
pthread_t t
Definition: threadvars.h:58
struct Packet_ * next
Definition: decode.h:571
TmSlot * TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name)
Definition: tm-threads.c:1727
#define THV_DEAD
Definition: threadvars.h:54
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:60
uint16_t id
Definition: tm-queues.h:29
#define SCCtrlCondInit
#define MIN(x, y)
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:107
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn&#39;t support custo...
Definition: tm-threads.c:1241
int type
Definition: tm-threads.c:2266
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1832
#define THV_PAUSED
Definition: threadvars.h:38
#define COPY_TIMESTAMP(src, dst)
Definition: tm-threads.c:2401
__thread uint64_t spin_lock_wait_ticks
struct Thread_ Thread
#define THV_FAILED
Definition: threadvars.h:40
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:201
void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
Definition: tm-threads.c:2386
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value from our atomic variable.
Definition: util-atomic.h:141
__thread uint64_t spin_lock_contention
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1332
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1850
#define THREAD_SET_AFFINITY
Definition: threadvars.h:116
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:2005
void(* InShutdownHandler)(struct ThreadVars_ *)
Definition: threadvars.h:79
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
#define SCMutexLock(mut)
__thread uint64_t rwr_lock_wait_ticks
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread...
Definition: tm-threads.c:2099
ThreadVars * TmThreadsGetCallingThread(void)
Returns the TV for the calling thread.
Definition: tm-threads.c:2176
char * thread_group_name
Definition: threadvars.h:61
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:106
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:31
uint8_t thread_setup_flags
Definition: threadvars.h:89
void TmThreadsListThreads(void)
Definition: tm-threads.c:2281
void(* OutHandlerCtxFree)(void *)
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag)
Set a thread flag.
Definition: tm-threads.c:97
#define SC_ATOMIC_INIT(name)
Initialize the previously declared atomic variable and it&#39;s lock.
Definition: util-atomic.h:81
#define SCCalloc(nm, a)
Definition: util-mem.h:253
struct TmSlot_ * tm_slots
Definition: threadvars.h:84
#define SCMutexUnlock(mut)
void TmThreadWaitForFlag(ThreadVars *tv, uint16_t flags)
Waits till the specified flag(s) is(are) set. We don&#39;t bother if the kill flag has been set or not on...
Definition: tm-threads.c:1991
#define THV_INIT_DONE
Definition: threadvars.h:36
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tm-queues.h:27
void TmThreadRemove(ThreadVars *tv, int type)
Removes this TV from tv_root based on its type.
Definition: tm-threads.c:1396
#define SCCtrlCondDestroy
#define THV_CLOSED
Definition: threadvars.h:41
SCMutex tv_root_lock
Definition: tm-threads.c:81
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
#define SCMUTEX_INITIALIZER
uint32_t len
Definition: decode.h:624
uint8_t type
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1760
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:294
#define PACKET_PROFILING_TMM_END(p, id)
struct ThreadVars_ * next
Definition: threadvars.h:111
#define SCGetThreadIdLong(...)
Definition: threads.h:253
const void * slot_initdata
Definition: tm-threads.h:67
#define SleepMsec(msec)
Definition: tm-threads.h:44
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:80
__thread uint64_t mutex_lock_wait_ticks
size_t threads_size
Definition: tm-threads.c:2274
PacketQueue slot_post_pq
Definition: tm-threads.h:78
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
#define SCEnter(...)
Definition: util-debug.h:337
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:90
#define PACKET_PROFILING_TMM_START(p, id)
Packet * top
Definition: decode.h:622
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1299
int tm_id
Definition: tm-threads.h:81
struct TmSlot_ * slot_next
Definition: tm-threads.h:87
#define THV_USE
Definition: threadvars.h:35
void TmThreadPauseThreads()
Pauses all threads present in tv_root.
Definition: tm-threads.c:2048
void TmThreadContinueThreads()
Unpauses all threads present in tv_root.
Definition: tm-threads.c:2015
void PacketPoolInitEmpty(void)
void TmThreadsUnsetFlag(ThreadVars *tv, uint16_t flag)
Unset a thread flag.
Definition: tm-threads.c:105
PacketQueue slot_pre_pq
Definition: tm-threads.h:73
Tmq * outq
Definition: threadvars.h:73
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1358
uint8_t tmm_flags
Definition: threadvars.h:66
#define SCMutexInit(mut, mutattrs)
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:115
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2310
void PacketPoolDestroy(void)
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:62
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
#define THV_PAUSE
Definition: threadvars.h:37
void(* OutHandler)(ThreadVars *, Packet *)
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:63
struct Threads_ Threads
#define THV_KILL
Definition: threadvars.h:39
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:281
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:118
__thread uint64_t mutex_lock_contention
__thread uint64_t rwr_lock_cnt
#define SCRealloc(x, a)
Definition: util-mem.h:238
uint16_t cpu_affinity
Definition: threadvars.h:94
#define THV_FLOW_LOOP
Definition: threadvars.h:48
TmSlot * TmSlotGetSlotForTM(int tm_id)
Returns the slot holding a TM with the particular tm_id.
Definition: tm-threads.c:881
void PacketPoolInit(void)
void TmThreadKillThreads(void)
Definition: tm-threads.c:1789
uint16_t reader_cnt
Definition: tm-queues.h:30
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:34
const char * name
Definition: tm-modules.h:44
Tmqh * TmqhGetQueueHandlerByName(const char *name)
#define MAX_WAIT_TIME
Definition: tm-threads.c:1759
void TmreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2402
#define SCMalloc(a)
Definition: util-mem.h:222
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:207
ThreadVars * tv
Definition: tm-threads.h:55
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2353
ThreadVars * tv
Definition: tm-threads.c:2264
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:101
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:1061
#define SCFree(a)
Definition: util-mem.h:322
void *(* tm_func)(void *)
Definition: threadvars.h:83
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:269
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1566
#define SCCondSignal
const char * outqh_name
Definition: threadvars.h:75
char * printable_name
Definition: threadvars.h:60
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2203
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:984
#define SleepUsec(usec)
Definition: tm-threads.h:43
#define MIN_WAIT_TIME
Definition: tm-threads.c:1758
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:107
PacketQueue trans_q[256]
Definition: suricata.h:132
#define SCLogPerf(...)
Definition: util-debug.h:261
#define FatalError(x,...)
Definition: util-debug.h:559
ThreadVars * TmThreadsGetTVContainingSlot(TmSlot *tm_slot)
Definition: tm-threads.c:802
__thread uint64_t rwr_lock_contention
__thread uint64_t rww_lock_cnt
const char * PktSrcToString(enum PktSrcEnum pkt_src)
Definition: decode.c:665
void(* InShutdownHandler)(ThreadVars *)
TmEcode(* TmSlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *)
Definition: tm-threads.h:50
SCMutex m
Definition: flow-hash.h:105
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1198
__thread uint64_t rww_lock_wait_ticks
void * outctx
Definition: threadvars.h:74
struct timeval ts
Definition: tm-threads.c:2269
int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:89
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:192
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:1020
#define StatsSyncCounters(tv)
Definition: counters.h:134
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:167
int in_use
Definition: tm-threads.c:2267
void TmThreadInitMC(ThreadVars *tv)
Sets the thread flags for a thread instance(tv)
Definition: tm-threads.c:1924
const char * name
Definition: tm-threads.c:2265
#define SCStrdup(a)
Definition: util-mem.h:268
char name[16]
Definition: threadvars.h:59
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:51
Thread * threads
Definition: tm-threads.c:2273
struct PacketQueue_ * stream_pq
Definition: threadvars.h:87
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:78
#define SCReturn
Definition: util-debug.h:339
#define THV_DEINIT
Definition: threadvars.h:44
uint8_t len
Per thread variable structure.
Definition: threadvars.h:57
__thread uint64_t mutex_lock_cnt
#define SCCtrlMutexInit(mut, mutattr)
char * name
Definition: tm-queues.h:28
void *(* OutHandlerCtxSetup)(const char *)
Tmq * inq
Definition: threadvars.h:72
#define likely(expr)
Definition: util-optimize.h:32
uint8_t cap_flags
Definition: threadvars.h:109
int id
Definition: tm-threads.h:84
int threading_set_cpu_affinity
Definition: runmodes.h:112
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:78
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1875
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:1120
Tmq * TmqCreateQueue(const char *name)
Definition: tm-queues.c:36
int threads_cnt
Definition: tm-threads.c:2275
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:139
void TmThreadDumpThreads(void)
Definition: tm-threads.c:2243
__thread uint64_t spin_lock_cnt
SCMutex mutex_q
Definition: decode.h:628
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1973
void TmqhReleasePacketsToPacketPool(PacketQueue *pq)
Release all the packets in the queue back to the packetpool. Mainly used by threads that have failed...