suricata
tm-threads.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2017 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Anoop Saldanha <anoopsaldanha@gmail.com>
23  * \author Eric Leblond <eric@regit.org>
24  *
25  * Thread management functions.
26  */
27 
28 #include "suricata-common.h"
29 #include "suricata.h"
30 #include "stream.h"
31 #include "runmodes.h"
32 #include "threadvars.h"
33 #include "tm-queues.h"
34 #include "tm-queuehandlers.h"
35 #include "tm-threads.h"
36 #include "tmqh-packetpool.h"
37 #include "threads.h"
38 #include "util-debug.h"
39 #include "util-privs.h"
40 #include "util-cpu.h"
41 #include "util-optimize.h"
42 #include "util-profiling.h"
43 #include "util-signal.h"
44 #include "queue.h"
45 
46 #ifdef PROFILE_LOCKING
47 __thread uint64_t mutex_lock_contention;
48 __thread uint64_t mutex_lock_wait_ticks;
49 __thread uint64_t mutex_lock_cnt;
50 
51 __thread uint64_t spin_lock_contention;
52 __thread uint64_t spin_lock_wait_ticks;
53 __thread uint64_t spin_lock_cnt;
54 
55 __thread uint64_t rww_lock_contention;
56 __thread uint64_t rww_lock_wait_ticks;
57 __thread uint64_t rww_lock_cnt;
58 
59 __thread uint64_t rwr_lock_contention;
60 __thread uint64_t rwr_lock_wait_ticks;
61 __thread uint64_t rwr_lock_cnt;
62 #endif
63 
64 #ifdef OS_FREEBSD
65 #include <sched.h>
66 #include <sys/param.h>
67 #include <sys/resource.h>
68 #include <sys/cpuset.h>
69 #include <sys/thr.h>
70 #define cpu_set_t cpuset_t
71 #endif /* OS_FREEBSD */
72 
73 #ifdef OS_WIN32
74 static inline void SleepUsec(uint64_t usec)
75 {
76  uint64_t msec = 1;
77  if (usec > 1000) {
78  msec = usec / 1000;
79  }
80  Sleep(msec);
81 }
82 #define SleepMsec(msec) Sleep((msec))
83 #else
84 #define SleepUsec(usec) usleep((usec))
85 #define SleepMsec(msec) usleep((msec) * 1000)
86 #endif
87 
88 /* prototypes */
89 static int SetCPUAffinity(uint16_t cpu);
90 
91 static void TmThreadDeinitMC(ThreadVars *tv);
92 
93 /* root of the threadvars list */
94 ThreadVars *tv_root[TVT_MAX] = { NULL };
95 
96 /* lock to protect tv_root */
98 
99 /**
100  * \brief Check if a thread flag is set.
101  *
102  * \retval 1 flag is set.
103  * \retval 0 flag is not set.
104  */
105 int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag)
106 {
107  return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
108 }
109 
110 /**
111  * \brief Set a thread flag.
112  */
113 void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag)
114 {
115  SC_ATOMIC_OR(tv->flags, flag);
116 }
117 
118 /**
119  * \brief Unset a thread flag.
120  */
121 void TmThreadsUnsetFlag(ThreadVars *tv, uint16_t flag)
122 {
123  SC_ATOMIC_AND(tv->flags, ~flag);
124 }
125 
126 /**
127  * \brief Separate run function so we can call it recursively.
128  *
129  * \todo Deal with post_pq for slots beyond the first.
130  */
132  TmSlot *slot)
133 {
134  TmEcode r;
135  TmSlot *s;
136  Packet *extra_p;
137 
138  for (s = slot; s != NULL; s = s->slot_next) {
139  TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
141 
142  if (unlikely(s->id == 0)) {
143  r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq);
144  } else {
145  r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, NULL);
146  }
147 
149 
150  /* handle error */
151  if (unlikely(r == TM_ECODE_FAILED)) {
152  /* Encountered error. Return packets to packetpool and return */
154 
158 
160  return TM_ECODE_FAILED;
161  }
162 
163  /* handle new packets */
164  while (s->slot_pre_pq.top != NULL) {
165  extra_p = PacketDequeue(&s->slot_pre_pq);
166  if (unlikely(extra_p == NULL))
167  continue;
168 
169  /* see if we need to process the packet */
170  if (s->slot_next != NULL) {
171  r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
172  if (unlikely(r == TM_ECODE_FAILED)) {
174 
178 
179  TmqhOutputPacketpool(tv, extra_p);
181  return TM_ECODE_FAILED;
182  }
183  }
184  tv->tmqh_out(tv, extra_p);
185  }
186  }
187 
188  return TM_ECODE_OK;
189 }
190 
191 #ifndef AFLFUZZ_PCAP_RUNMODE
192 
193 /** \internal
194  *
195  * \brief Process flow timeout packets
196  *
197  * Process flow timeout pseudo packets. During shutdown this loop
198  * is run until the flow engine kills the thread and the queue is
199  * empty.
200  */
201 static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
202 {
203  TmSlot *stream_slot = NULL, *slot = NULL;
204  int run = 1;
205  int r = TM_ECODE_OK;
206 
207  for (slot = s; slot != NULL; slot = slot->slot_next) {
208  if (slot->tm_id == TMM_FLOWWORKER)
209  {
210  stream_slot = slot;
211  break;
212  }
213  }
214 
215  if (tv->stream_pq == NULL || stream_slot == NULL) {
216  SCLogDebug("not running TmThreadTimeoutLoop %p/%p", tv->stream_pq, stream_slot);
217  return r;
218  }
219 
220  SCLogDebug("flow end loop starting");
221  while(run) {
222  Packet *p;
223  if (tv->stream_pq->len != 0) {
225  p = PacketDequeue(tv->stream_pq);
227  BUG_ON(p == NULL);
228 
229  if ((r = TmThreadsSlotProcessPkt(tv, stream_slot, p) != TM_ECODE_OK)) {
230  if (r == TM_ECODE_FAILED)
231  run = 0;
232  }
233  } else {
234  SleepUsec(1);
235  }
236 
237  if (tv->stream_pq->len == 0 && TmThreadsCheckFlag(tv, THV_KILL)) {
238  run = 0;
239  }
240  }
241  SCLogDebug("flow end loop complete");
242 
243  return r;
244 }
245 
246 /*
247 
248  pcap/nfq
249 
250  pkt read
251  callback
252  process_pkt
253 
254  pfring
255 
256  pkt read
257  process_pkt
258 
259  slot:
260  setup
261 
262  pkt_ack_loop(tv, slot_data)
263 
264  deinit
265 
266  process_pkt:
267  while(s)
268  run s;
269  queue;
270 
271  */
272 
273 static void *TmThreadsSlotPktAcqLoop(void *td)
274 {
275  ThreadVars *tv = (ThreadVars *)td;
276  TmSlot *s = tv->tm_slots;
277  char run = 1;
278  TmEcode r = TM_ECODE_OK;
279  TmSlot *slot = NULL;
280 
281  /* Set the thread name */
282  if (SCSetThreadName(tv->name) < 0) {
283  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
284  }
285 
286  if (tv->thread_setup_flags != 0)
288 
289  /* Drop the capabilities for this thread */
290  SCDropCaps(tv);
291 
292  PacketPoolInit();
293 
294  /* check if we are setup properly */
295  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
296  SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
297  " PktAcqLoop=%p, tmqh_in=%p,"
298  " tmqh_out=%p",
299  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
301  pthread_exit((void *) -1);
302  return NULL;
303  }
304 
305  for (slot = s; slot != NULL; slot = slot->slot_next) {
306  if (slot->SlotThreadInit != NULL) {
307  void *slot_data = NULL;
308  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
309  if (r != TM_ECODE_OK) {
310  if (r == TM_ECODE_DONE) {
311  EngineDone();
313  goto error;
314  } else {
316  goto error;
317  }
318  }
319  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
320  }
321  memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
322  SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
323  memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
324  SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
325 
326  /* get the 'pre qeueue' from module before the stream module */
327  if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
328  SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
329  tv->stream_pq = &slot->slot_post_pq;
330  /* if the stream module is the first, get the threads input queue */
331  } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
332  tv->stream_pq = &trans_q[tv->inq->id];
333  SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
334  }
335  }
336 
337  StatsSetupPrivate(tv);
338 
340 
341  while(run) {
342  if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
346  }
347 
348  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
349 
350  if (r == TM_ECODE_FAILED) {
352  run = 0;
353  }
355  run = 0;
356  }
357  if (r == TM_ECODE_DONE) {
358  run = 0;
359  }
360  }
361  StatsSyncCounters(tv);
362 
364 
365  /* process all pseudo packets the flow timeout may throw at us */
366  TmThreadTimeoutLoop(tv, s);
367 
370 
372 
373  for (slot = s; slot != NULL; slot = slot->slot_next) {
374  if (slot->SlotThreadExitPrintStats != NULL) {
375  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
376  }
377 
378  if (slot->SlotThreadDeinit != NULL) {
379  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
380  if (r != TM_ECODE_OK) {
382  goto error;
383  }
384  }
385 
386  BUG_ON(slot->slot_pre_pq.len);
387  BUG_ON(slot->slot_post_pq.len);
388  }
389 
390  tv->stream_pq = NULL;
391  SCLogDebug("%s ending", tv->name);
393  pthread_exit((void *) 0);
394  return NULL;
395 
396 error:
397  tv->stream_pq = NULL;
398  pthread_exit((void *) -1);
399  return NULL;
400 }
401 
402 #endif /* NO AFLFUZZ_PCAP_RUNMODE */
403 
404 #ifdef AFLFUZZ_PCAP_RUNMODE
405 /** \brief simplified loop to speed up AFL
406  *
407  * The loop runs in the caller's thread. No separate thread.
408  */
409 static void *TmThreadsSlotPktAcqLoopAFL(void *td)
410 {
411  SCLogNotice("AFL mode starting");
412 
413  ThreadVars *tv = (ThreadVars *)td;
414  TmSlot *s = tv->tm_slots;
415  char run = 1;
416  TmEcode r = TM_ECODE_OK;
417  TmSlot *slot = NULL;
418 
419  PacketPoolInit();
420 
421  /* check if we are setup properly */
422  if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
423  SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p,"
424  " PktAcqLoop=%p, tmqh_in=%p,"
425  " tmqh_out=%p",
426  s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
428  return NULL;
429  }
430 
431  for (slot = s; slot != NULL; slot = slot->slot_next) {
432  if (slot->SlotThreadInit != NULL) {
433  void *slot_data = NULL;
434  r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
435  if (r != TM_ECODE_OK) {
436  if (r == TM_ECODE_DONE) {
437  EngineDone();
439  goto error;
440  } else {
442  goto error;
443  }
444  }
445  (void)SC_ATOMIC_SET(slot->slot_data, slot_data);
446  }
447  memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
448  SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL);
449  memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
450  SCMutexInit(&slot->slot_post_pq.mutex_q, NULL);
451 
452  /* get the 'pre qeueue' from module before the stream module */
453  if (slot->slot_next != NULL && (slot->slot_next->tm_id == TMM_FLOWWORKER)) {
454  SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq);
455  tv->stream_pq = &slot->slot_post_pq;
456  /* if the stream module is the first, get the threads input queue */
457  } else if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
458  tv->stream_pq = &trans_q[tv->inq->id];
459  SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq);
460  }
461  }
462 
463  StatsSetupPrivate(tv);
464 
466 
467  while(run) {
468  /* run right away */
469 
470  r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
471 
472  if (r == TM_ECODE_FAILED) {
474  run = 0;
475  }
477  run = 0;
478  }
479  if (r == TM_ECODE_DONE) {
480  run = 0;
481  }
482  }
483  StatsSyncCounters(tv);
484 
486 
488 
490 
491  for (slot = s; slot != NULL; slot = slot->slot_next) {
492  if (slot->SlotThreadExitPrintStats != NULL) {
493  slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
494  }
495 
496  if (slot->SlotThreadDeinit != NULL) {
497  r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
498  if (r != TM_ECODE_OK) {
500  goto error;
501  }
502  }
503 
504  BUG_ON(slot->slot_pre_pq.len);
505  BUG_ON(slot->slot_post_pq.len);
506  }
507 
508  tv->stream_pq = NULL;
509  SCLogDebug("%s ending", tv->name);
511  return NULL;
512 
513 error:
514  tv->stream_pq = NULL;
515  return NULL;
516 }
517 #endif
518 
519 /**
520  * \todo Only the first "slot" currently makes the "post_pq" available
521  * to the thread module.
522  */
523 static void *TmThreadsSlotVar(void *td)
524 {
525  ThreadVars *tv = (ThreadVars *)td;
526  TmSlot *s = (TmSlot *)tv->tm_slots;
527  Packet *p = NULL;
528  char run = 1;
529  TmEcode r = TM_ECODE_OK;
530 
532 
533  /* Set the thread name */
534  if (SCSetThreadName(tv->name) < 0) {
535  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
536  }
537 
538  if (tv->thread_setup_flags != 0)
540 
541  /* Drop the capabilities for this thread */
542  SCDropCaps(tv);
543 
544  /* check if we are setup properly */
545  if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
547  pthread_exit((void *) -1);
548  return NULL;
549  }
550 
551  for (; s != NULL; s = s->slot_next) {
552  if (s->SlotThreadInit != NULL) {
553  void *slot_data = NULL;
554  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
555  if (r != TM_ECODE_OK) {
557  goto error;
558  }
559  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
560  }
561  memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
562  SCMutexInit(&s->slot_pre_pq.mutex_q, NULL);
563  memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
564  SCMutexInit(&s->slot_post_pq.mutex_q, NULL);
565 
566  /* special case: we need to access the stream queue
567  * from the flow timeout code */
568 
569  /* get the 'pre qeueue' from module before the stream module */
570  if (s->slot_next != NULL && (s->slot_next->tm_id == TMM_FLOWWORKER)) {
571  SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq);
572  tv->stream_pq = &s->slot_pre_pq;
573  /* if the stream module is the first, get the threads input queue */
574  } else if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
575  tv->stream_pq = &trans_q[tv->inq->id];
576  SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq);
577  }
578  }
579 
580  StatsSetupPrivate(tv);
581 
583 
584  s = (TmSlot *)tv->tm_slots;
585 
586  while (run) {
587  if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
591  }
592 
593  /* input a packet */
594  p = tv->tmqh_in(tv);
595 
596  if (p != NULL) {
597  /* run the thread module(s) */
598  r = TmThreadsSlotVarRun(tv, p, s);
599  if (r == TM_ECODE_FAILED) {
600  TmqhOutputPacketpool(tv, p);
602  break;
603  }
604 
605  /* output the packet */
606  tv->tmqh_out(tv, p);
607 
608  } /* if (p != NULL) */
609 
610  /* now handle the post_pq packets */
611  TmSlot *slot;
612  for (slot = s; slot != NULL; slot = slot->slot_next) {
613  if (slot->slot_post_pq.top != NULL) {
614  while (1) {
616  Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
618 
619  if (extra_p == NULL)
620  break;
621 
622  if (slot->slot_next != NULL) {
623  r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
624  if (r == TM_ECODE_FAILED) {
628 
629  TmqhOutputPacketpool(tv, extra_p);
631  break;
632  }
633  }
634  /* output the packet */
635  tv->tmqh_out(tv, extra_p);
636  } /* while */
637  } /* if */
638  } /* for */
639 
640  if (TmThreadsCheckFlag(tv, THV_KILL)) {
641  run = 0;
642  }
643  } /* while (run) */
644  StatsSyncCounters(tv);
645 
648 
650 
651  s = (TmSlot *)tv->tm_slots;
652 
653  for ( ; s != NULL; s = s->slot_next) {
654  if (s->SlotThreadExitPrintStats != NULL) {
655  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
656  }
657 
658  if (s->SlotThreadDeinit != NULL) {
659  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
660  if (r != TM_ECODE_OK) {
662  goto error;
663  }
664  }
665  BUG_ON(s->slot_pre_pq.len);
666  BUG_ON(s->slot_post_pq.len);
667  }
668 
669  SCLogDebug("%s ending", tv->name);
670  tv->stream_pq = NULL;
672  pthread_exit((void *) 0);
673  return NULL;
674 
675 error:
676  tv->stream_pq = NULL;
677  pthread_exit((void *) -1);
678  return NULL;
679 }
680 
681 static void *TmThreadsManagement(void *td)
682 {
683  ThreadVars *tv = (ThreadVars *)td;
684  TmSlot *s = (TmSlot *)tv->tm_slots;
685  TmEcode r = TM_ECODE_OK;
686 
687  BUG_ON(s == NULL);
688 
689  /* Set the thread name */
690  if (SCSetThreadName(tv->name) < 0) {
691  SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
692  }
693 
694  if (tv->thread_setup_flags != 0)
696 
697  /* Drop the capabilities for this thread */
698  SCDropCaps(tv);
699 
700  SCLogDebug("%s starting", tv->name);
701 
702  if (s->SlotThreadInit != NULL) {
703  void *slot_data = NULL;
704  r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
705  if (r != TM_ECODE_OK) {
707  pthread_exit((void *) -1);
708  return NULL;
709  }
710  (void)SC_ATOMIC_SET(s->slot_data, slot_data);
711  }
712  memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
713  memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
714 
715  StatsSetupPrivate(tv);
716 
718 
719  r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
720  /* handle error */
721  if (r == TM_ECODE_FAILED) {
723  }
724 
725  if (TmThreadsCheckFlag(tv, THV_KILL)) {
726  StatsSyncCounters(tv);
727  }
728 
731 
732  if (s->SlotThreadExitPrintStats != NULL) {
733  s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
734  }
735 
736  if (s->SlotThreadDeinit != NULL) {
737  r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
738  if (r != TM_ECODE_OK) {
740  pthread_exit((void *) -1);
741  return NULL;
742  }
743  }
744 
746  pthread_exit((void *) 0);
747  return NULL;
748 }
749 
750 /**
751  * \brief We set the slot functions.
752  *
753  * \param tv Pointer to the TV to set the slot function for.
754  * \param name Name of the slot variant.
755  * \param fn_p Pointer to a custom slot function. Used only if slot variant
756  * "name" is "custom".
757  *
758  * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure.
759  */
760 static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(void *))
761 {
762  if (name == NULL) {
763  if (fn_p == NULL) {
764  printf("Both slot name and function pointer can't be NULL inside "
765  "TmThreadSetSlots\n");
766  goto error;
767  } else {
768  name = "custom";
769  }
770  }
771 
772  if (strcmp(name, "varslot") == 0) {
773  tv->tm_func = TmThreadsSlotVar;
774  } else if (strcmp(name, "pktacqloop") == 0) {
775 #ifndef AFLFUZZ_PCAP_RUNMODE
776  tv->tm_func = TmThreadsSlotPktAcqLoop;
777 #else
778  tv->tm_func = TmThreadsSlotPktAcqLoopAFL;
779 #endif
780  } else if (strcmp(name, "management") == 0) {
781  tv->tm_func = TmThreadsManagement;
782  } else if (strcmp(name, "command") == 0) {
783  tv->tm_func = TmThreadsManagement;
784  } else if (strcmp(name, "custom") == 0) {
785  if (fn_p == NULL)
786  goto error;
787  tv->tm_func = fn_p;
788  } else {
789  printf("Error: Slot \"%s\" not supported\n", name);
790  goto error;
791  }
792 
793  return TM_ECODE_OK;
794 
795 error:
796  return TM_ECODE_FAILED;
797 }
798 
800 {
801  ThreadVars *tv;
802  int i;
803 
804  SCMutexLock(&tv_root_lock);
805 
806  for (i = 0; i < TVT_MAX; i++) {
807  tv = tv_root[i];
808 
809  while (tv) {
810  TmSlot *slots = tv->tm_slots;
811  while (slots != NULL) {
812  if (slots == tm_slot) {
813  SCMutexUnlock(&tv_root_lock);
814  return tv;
815  }
816  slots = slots->slot_next;
817  }
818  tv = tv->next;
819  }
820  }
821 
822  SCMutexUnlock(&tv_root_lock);
823 
824  return NULL;
825 }
826 
827 /**
828  * \brief Appends a new entry to the slots.
829  *
830  * \param tv TV the slot is attached to.
831  * \param tm TM to append.
832  * \param data Data to be passed on to the slot init function.
833  *
834  * \retval The allocated TmSlot or NULL if there is an error
835  */
836 void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
837 {
838  TmSlot *slot = SCMalloc(sizeof(TmSlot));
839  if (unlikely(slot == NULL))
840  return;
841  memset(slot, 0, sizeof(TmSlot));
842  SC_ATOMIC_INIT(slot->slot_data);
843  slot->tv = tv;
844  slot->SlotThreadInit = tm->ThreadInit;
845  slot->slot_initdata = data;
846  SC_ATOMIC_INIT(slot->SlotFunc);
847  (void)SC_ATOMIC_SET(slot->SlotFunc, tm->Func);
848  slot->PktAcqLoop = tm->PktAcqLoop;
849  slot->Management = tm->Management;
851  slot->SlotThreadDeinit = tm->ThreadDeinit;
852  /* we don't have to check for the return value "-1". We wouldn't have
853  * received a TM as arg, if it didn't exist */
854  slot->tm_id = TmModuleGetIDForTM(tm);
855 
856  tv->tmm_flags |= tm->flags;
857  tv->cap_flags |= tm->cap_flags;
858 
859  if (tv->tm_slots == NULL) {
860  tv->tm_slots = slot;
861  slot->id = 0;
862  } else {
863  TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL;
864 
865  /* get the last slot */
866  for ( ; a != NULL; a = a->slot_next) {
867  b = a;
868  }
869  /* append the new slot */
870  if (b != NULL) {
871  b->slot_next = slot;
872  slot->id = b->id + 1;
873  }
874  }
875  return;
876 }
877 
878 /**
879  * \brief Returns the slot holding a TM with the particular tm_id.
880  *
881  * \param tm_id TM id of the TM whose slot has to be returned.
882  *
883  * \retval slots Pointer to the slot.
884  */
886 {
887  ThreadVars *tv = NULL;
888  TmSlot *slots;
889  int i;
890 
891  SCMutexLock(&tv_root_lock);
892 
893  for (i = 0; i < TVT_MAX; i++) {
894  tv = tv_root[i];
895  while (tv) {
896  slots = tv->tm_slots;
897  while (slots != NULL) {
898  if (slots->tm_id == tm_id) {
899  SCMutexUnlock(&tv_root_lock);
900  return slots;
901  }
902  slots = slots->slot_next;
903  }
904  tv = tv->next;
905  }
906  }
907 
908  SCMutexUnlock(&tv_root_lock);
909 
910  return NULL;
911 }
912 
913 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
914 static int SetCPUAffinitySet(cpu_set_t *cs)
915 {
916 #if defined OS_FREEBSD
917  int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID,
918  SCGetThreadIdLong(), sizeof(cpu_set_t),cs);
919 #elif OS_DARWIN
920  int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
921  (void*)cs, THREAD_AFFINITY_POLICY_COUNT);
922 #else
923  pid_t tid = syscall(SYS_gettid);
924  int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs);
925 #endif /* OS_FREEBSD */
926 
927  if (r != 0) {
928  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
929  strerror(errno));
930  return -1;
931  }
932 
933  return 0;
934 }
935 #endif
936 
937 
938 /**
939  * \brief Set the thread affinity on the calling thread.
940  *
941  * \param cpuid Id of the core/cpu to setup the affinity.
942  *
943  * \retval 0 If all goes well; -1 if something is wrong.
944  */
945 static int SetCPUAffinity(uint16_t cpuid)
946 {
947 #if defined __OpenBSD__ || defined sun
948  return 0;
949 #else
950  int cpu = (int)cpuid;
951 
952 #if defined OS_WIN32 || defined __CYGWIN__
953  DWORD cs = 1 << cpu;
954 
955  int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs));
956  if (r != 0) {
957  printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r,
958  strerror(errno));
959  return -1;
960  }
961  SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32,
962  SCGetThreadIdLong(), cpu);
963 
964  return 0;
965 
966 #else
967  cpu_set_t cs;
968 
969  CPU_ZERO(&cs);
970  CPU_SET(cpu, &cs);
971  return SetCPUAffinitySet(&cs);
972 #endif /* windows */
973 #endif /* not supported */
974 }
975 
976 
977 /**
978  * \brief Set the thread options (thread priority).
979  *
980  * \param tv Pointer to the ThreadVars to setup the thread priority.
981  *
982  * \retval TM_ECODE_OK.
983  */
985 {
987  tv->thread_priority = prio;
988 
989  return TM_ECODE_OK;
990 }
991 
992 /**
993  * \brief Adjusting nice value for threads.
994  */
996 {
997  SCEnter();
998 #ifndef __CYGWIN__
999 #ifdef OS_WIN32
1000  if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) {
1001  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for "
1002  "thread %s: %s", tv->name, strerror(errno));
1003  } else {
1004  SCLogDebug("Priority set to %"PRId32" for thread %s",
1005  tv->thread_priority, tv->name);
1006  }
1007 #else
1008  int ret = nice(tv->thread_priority);
1009  if (ret == -1) {
1010  SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value %d "
1011  "for thread %s: %s", tv->thread_priority, tv->name,
1012  strerror(errno));
1013  } else {
1014  SCLogDebug("Nice value set to %"PRId32" for thread %s",
1015  tv->thread_priority, tv->name);
1016  }
1017 #endif /* OS_WIN32 */
1018 #endif
1019  SCReturn;
1020 }
1021 
1022 
1023 /**
1024  * \brief Set the thread options (cpu affinity).
1025  *
1026  * \param tv pointer to the ThreadVars to setup the affinity.
1027  * \param cpu cpu on which affinity is set.
1028  *
1029  * \retval TM_ECODE_OK
1030  */
1032 {
1034  tv->cpu_affinity = cpu;
1035 
1036  return TM_ECODE_OK;
1037 }
1038 
1039 
1041 {
1043  return TM_ECODE_OK;
1044 
1045  if (type > MAX_CPU_SET) {
1046  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
1047  return TM_ECODE_FAILED;
1048  }
1049 
1051  tv->cpu_affinity = type;
1052 
1053  return TM_ECODE_OK;
1054 }
1055 
1057 {
1058  if (type >= MAX_CPU_SET) {
1059  SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family");
1060  return 0;
1061  }
1062 
1064 }
1065 
1066 /**
1067  * \brief Set the thread options (cpu affinitythread).
1068  * Priority should be already set by pthread_create.
1069  *
1070  * \param tv pointer to the ThreadVars of the calling thread.
1071  */
1073 {
1075  SCLogPerf("Setting affinity for thread \"%s\"to cpu/core "
1076  "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity,
1077  SCGetThreadIdLong());
1078  SetCPUAffinity(tv->cpu_affinity);
1079  }
1080 
1081 #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ && !defined sun
1083  TmThreadSetPrio(tv);
1086  if (taf->mode_flag == EXCLUSIVE_AFFINITY) {
1087  int cpu = AffinityGetNextCPU(taf);
1088  SetCPUAffinity(cpu);
1089  /* If CPU is in a set overwrite the default thread prio */
1090  if (CPU_ISSET(cpu, &taf->lowprio_cpu)) {
1091  tv->thread_priority = PRIO_LOW;
1092  } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) {
1094  } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) {
1095  tv->thread_priority = PRIO_HIGH;
1096  } else {
1097  tv->thread_priority = taf->prio;
1098  }
1099  SCLogPerf("Setting prio %d for thread \"%s\" to cpu/core "
1100  "%d, thread id %lu", tv->thread_priority,
1101  tv->name, cpu, SCGetThreadIdLong());
1102  } else {
1103  SetCPUAffinitySet(&taf->cpu_set);
1104  tv->thread_priority = taf->prio;
1105  SCLogPerf("Setting prio %d for thread \"%s\", "
1106  "thread id %lu", tv->thread_priority,
1107  tv->name, SCGetThreadIdLong());
1108  }
1109  TmThreadSetPrio(tv);
1110  }
1111 #endif
1112 
1113  return TM_ECODE_OK;
1114 }
1115 
1116 /**
1117  * \brief Creates and returns the TV instance for a new thread.
1118  *
1119  * \param name Name of this TV instance
1120  * \param inq_name Incoming queue name
1121  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1122  * \param outq_name Outgoing queue name
1123  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1124  * \param slots String representation for the slot function to be used
1125  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1126  * \param mucond Flag to indicate whether to initialize the condition
1127  * and the mutex variables for this newly created TV.
1128  *
1129  * \retval the newly created TV instance, or NULL on error
1130  */
1131 ThreadVars *TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name,
1132  const char *outq_name, const char *outqh_name, const char *slots,
1133  void * (*fn_p)(void *), int mucond)
1134 {
1135  ThreadVars *tv = NULL;
1136  Tmq *tmq = NULL;
1137  Tmqh *tmqh = NULL;
1138 
1139  SCLogDebug("creating thread \"%s\"...", name);
1140 
1141  /* XXX create separate function for this: allocate a thread container */
1142  tv = SCMalloc(sizeof(ThreadVars));
1143  if (unlikely(tv == NULL))
1144  goto error;
1145  memset(tv, 0, sizeof(ThreadVars));
1146 
1147  SC_ATOMIC_INIT(tv->flags);
1148  SCMutexInit(&tv->perf_public_ctx.m, NULL);
1149 
1150  strlcpy(tv->name, name, sizeof(tv->name));
1151 
1152  /* default state for every newly created thread */
1155 
1156  /* set the incoming queue */
1157  if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) {
1158  SCLogDebug("inq_name \"%s\"", inq_name);
1159 
1160  tmq = TmqGetQueueByName(inq_name);
1161  if (tmq == NULL) {
1162  tmq = TmqCreateQueue(inq_name);
1163  if (tmq == NULL)
1164  goto error;
1165  }
1166  SCLogDebug("tmq %p", tmq);
1167 
1168  tv->inq = tmq;
1169  tv->inq->reader_cnt++;
1170  SCLogDebug("tv->inq %p", tv->inq);
1171  }
1172  if (inqh_name != NULL) {
1173  SCLogDebug("inqh_name \"%s\"", inqh_name);
1174 
1175  tmqh = TmqhGetQueueHandlerByName(inqh_name);
1176  if (tmqh == NULL)
1177  goto error;
1178 
1179  tv->tmqh_in = tmqh->InHandler;
1181  SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
1182  }
1183 
1184  /* set the outgoing queue */
1185  if (outqh_name != NULL) {
1186  SCLogDebug("outqh_name \"%s\"", outqh_name);
1187 
1188  tmqh = TmqhGetQueueHandlerByName(outqh_name);
1189  if (tmqh == NULL)
1190  goto error;
1191 
1192  tv->tmqh_out = tmqh->OutHandler;
1193  tv->outqh_name = tmqh->name;
1194 
1195  if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) {
1196  SCLogDebug("outq_name \"%s\"", outq_name);
1197 
1198  if (tmqh->OutHandlerCtxSetup != NULL) {
1199  tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
1200  if (tv->outctx == NULL)
1201  goto error;
1202  tv->outq = NULL;
1203  } else {
1204  tmq = TmqGetQueueByName(outq_name);
1205  if (tmq == NULL) {
1206  tmq = TmqCreateQueue(outq_name);
1207  if (tmq == NULL)
1208  goto error;
1209  }
1210  SCLogDebug("tmq %p", tmq);
1211 
1212  tv->outq = tmq;
1213  tv->outctx = NULL;
1214  tv->outq->writer_cnt++;
1215  }
1216  }
1217  }
1218 
1219  if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
1220  goto error;
1221  }
1222 
1223  if (mucond != 0)
1224  TmThreadInitMC(tv);
1225 
1226  return tv;
1227 
1228 error:
1229  SCLogError(SC_ERR_THREAD_CREATE, "failed to setup a thread");
1230 
1231  if (tv != NULL)
1232  SCFree(tv);
1233  return NULL;
1234 }
1235 
1236 /**
1237  * \brief Creates and returns a TV instance for a Packet Processing Thread.
1238  * This function doesn't support custom slots, and hence shouldn't be
1239  * supplied \"custom\" as its slot type. All PPT threads are created
1240  * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv
1241  * conditional variables are not used to kill the thread.
1242  *
1243  * \param name Name of this TV instance
1244  * \param inq_name Incoming queue name
1245  * \param inqh_name Incoming queue handler name as set by TmqhSetup()
1246  * \param outq_name Outgoing queue name
1247  * \param outqh_name Outgoing queue handler as set by TmqhSetup()
1248  * \param slots String representation for the slot function to be used
1249  *
1250  * \retval the newly created TV instance, or NULL on error
1251  */
1252 ThreadVars *TmThreadCreatePacketHandler(const char *name, const char *inq_name,
1253  const char *inqh_name, const char *outq_name,
1254  const char *outqh_name, const char *slots)
1255 {
1256  ThreadVars *tv = NULL;
1257 
1258  tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name,
1259  slots, NULL, 0);
1260 
1261  if (tv != NULL) {
1262  tv->type = TVT_PPT;
1263  tv->id = TmThreadsRegisterThread(tv, tv->type);
1264  }
1265 
1266 
1267  return tv;
1268 }
1269 
1270 /**
1271  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1272  * This function supports only custom slot functions and hence a
1273  * function pointer should be sent as an argument.
1274  *
1275  * \param name Name of this TV instance
1276  * \param fn_p Pointer to function when \"slots\" is of type \"custom\"
1277  * \param mucond Flag to indicate whether to initialize the condition
1278  * and the mutex variables for this newly created TV.
1279  *
1280  * \retval the newly created TV instance, or NULL on error
1281  */
1282 ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *),
1283  int mucond)
1284 {
1285  ThreadVars *tv = NULL;
1286 
1287  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond);
1288 
1289  if (tv != NULL) {
1290  tv->type = TVT_MGMT;
1291  tv->id = TmThreadsRegisterThread(tv, tv->type);
1293  }
1294 
1295  return tv;
1296 }
1297 
1298 /**
1299  * \brief Creates and returns the TV instance for a Management thread(MGMT).
1300  * This function supports only custom slot functions and hence a
1301  * function pointer should be sent as an argument.
1302  *
1303  * \param name Name of this TV instance
1304  * \param module Name of TmModule with MANAGEMENT flag set.
1305  * \param mucond Flag to indicate whether to initialize the condition
1306  * and the mutex variables for this newly created TV.
1307  *
1308  * \retval the newly created TV instance, or NULL on error
1309  */
1310 ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
1311  int mucond)
1312 {
1313  ThreadVars *tv = NULL;
1314 
1315  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
1316 
1317  if (tv != NULL) {
1318  tv->type = TVT_MGMT;
1319  tv->id = TmThreadsRegisterThread(tv, tv->type);
1321 
1322  TmModule *m = TmModuleGetByName(module);
1323  if (m) {
1324  TmSlotSetFuncAppend(tv, m, NULL);
1325  }
1326  }
1327 
1328  return tv;
1329 }
1330 
1331 /**
1332  * \brief Creates and returns the TV instance for a Command thread (CMD).
1333  * This function supports only custom slot functions and hence a
1334  * function pointer should be sent as an argument.
1335  *
1336  * \param name Name of this TV instance
1337  * \param module Name of TmModule with COMMAND flag set.
1338  * \param mucond Flag to indicate whether to initialize the condition
1339  * and the mutex variables for this newly created TV.
1340  *
1341  * \retval the newly created TV instance, or NULL on error
1342  */
1343 ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
1344  int mucond)
1345 {
1346  ThreadVars *tv = NULL;
1347 
1348  tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond);
1349 
1350  if (tv != NULL) {
1351  tv->type = TVT_CMD;
1352  tv->id = TmThreadsRegisterThread(tv, tv->type);
1354 
1355  TmModule *m = TmModuleGetByName(module);
1356  if (m) {
1357  TmSlotSetFuncAppend(tv, m, NULL);
1358  }
1359  }
1360 
1361  return tv;
1362 }
1363 
1364 /**
1365  * \brief Appends this TV to tv_root based on its type
1366  *
1367  * \param type holds the type this TV belongs to.
1368  */
1370 {
1371  SCMutexLock(&tv_root_lock);
1372 
1373  if (tv_root[type] == NULL) {
1374  tv_root[type] = tv;
1375  tv->next = NULL;
1376  tv->prev = NULL;
1377 
1378  SCMutexUnlock(&tv_root_lock);
1379 
1380  return;
1381  }
1382 
1383  ThreadVars *t = tv_root[type];
1384 
1385  while (t) {
1386  if (t->next == NULL) {
1387  t->next = tv;
1388  tv->prev = t;
1389  tv->next = NULL;
1390  break;
1391  }
1392 
1393  t = t->next;
1394  }
1395 
1396  SCMutexUnlock(&tv_root_lock);
1397 
1398  return;
1399 }
1400 
1401 /**
1402  * \brief Removes this TV from tv_root based on its type
1403  *
1404  * \param tv The tv instance to remove from the global tv list.
1405  * \param type Holds the type this TV belongs to.
1406  */
1408 {
1409  SCMutexLock(&tv_root_lock);
1410 
1411  if (tv_root[type] == NULL) {
1412  SCMutexUnlock(&tv_root_lock);
1413 
1414  return;
1415  }
1416 
1417  ThreadVars *t = tv_root[type];
1418  while (t != tv) {
1419  t = t->next;
1420  }
1421 
1422  if (t != NULL) {
1423  if (t->prev != NULL)
1424  t->prev->next = t->next;
1425  if (t->next != NULL)
1426  t->next->prev = t->prev;
1427 
1428  if (t == tv_root[type])
1429  tv_root[type] = t->next;;
1430  }
1431 
1432  SCMutexUnlock(&tv_root_lock);
1433 
1434  return;
1435 }
1436 
1437 /**
1438  * \brief Kill a thread.
1439  *
1440  * \param tv A ThreadVars instance corresponding to the thread that has to be
1441  * killed.
1442  *
1443  * \retval r 1 killed succesfully
1444  * 0 not yet ready, needs another look
1445  */
1446 static int TmThreadKillThread(ThreadVars *tv)
1447 {
1448  int i = 0;
1449 
1450  BUG_ON(tv == NULL);
1451 
1452  /* kill only once :) */
1453  if (TmThreadsCheckFlag(tv, THV_DEAD)) {
1454  return 1;
1455  }
1456 
1457  if (tv->inq != NULL) {
1458  /* we wait till we dry out all the inq packets, before we
1459  * kill this thread. Do note that you should have disabled
1460  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1461  if (!(strlen(tv->inq->name) == strlen("packetpool") &&
1462  strcasecmp(tv->inq->name, "packetpool") == 0)) {
1463  PacketQueue *q = &trans_q[tv->inq->id];
1464  if (q->len != 0) {
1465  return 0;
1466  }
1467  }
1468  }
1469 
1470  /* set the thread flag informing the thread that it needs to be
1471  * terminated */
1474 
1475  /* to be sure, signal more */
1476  if (!(TmThreadsCheckFlag(tv, THV_CLOSED))) {
1477  if (tv->InShutdownHandler != NULL) {
1478  tv->InShutdownHandler(tv);
1479  }
1480  if (tv->inq != NULL) {
1481  for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1482  SCCondSignal(&trans_q[tv->inq->id].cond_q);
1483  }
1484  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1485  }
1486 
1487  if (tv->ctrl_cond != NULL ) {
1488  pthread_cond_broadcast(tv->ctrl_cond);
1489  }
1490  return 0;
1491  }
1492 
1493  if (tv->outctx != NULL) {
1495  if (tmqh == NULL)
1496  BUG_ON(1);
1497 
1498  if (tmqh->OutHandlerCtxFree != NULL) {
1499  tmqh->OutHandlerCtxFree(tv->outctx);
1500  tv->outctx = NULL;
1501  }
1502  }
1503 
1504  /* join it and flag it as dead */
1505  pthread_join(tv->t, NULL);
1506  SCLogDebug("thread %s stopped", tv->name);
1508  return 1;
1509 }
1510 
1511 /** \internal
1512  *
1513  * \brief make sure that all packet threads are done processing their
1514  * in-flight packets
1515  */
1516 static void TmThreadDrainPacketThreads(void)
1517 {
1518  ThreadVars *tv = NULL;
1519  struct timeval start_ts;
1520  struct timeval cur_ts;
1521  gettimeofday(&start_ts, NULL);
1522 
1523 again:
1524  gettimeofday(&cur_ts, NULL);
1525  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1526  SCLogWarning(SC_ERR_SHUTDOWN, "unable to get all packet threads "
1527  "to process their packets in time");
1528  return;
1529  }
1530 
1531  SCMutexLock(&tv_root_lock);
1532 
1533  /* all receive threads are part of packet processing threads */
1534  tv = tv_root[TVT_PPT];
1535  while (tv) {
1536  if (tv->inq != NULL) {
1537  /* we wait till we dry out all the inq packets, before we
1538  * kill this thread. Do note that you should have disabled
1539  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1540  if (!(strlen(tv->inq->name) == strlen("packetpool") &&
1541  strcasecmp(tv->inq->name, "packetpool") == 0)) {
1542  PacketQueue *q = &trans_q[tv->inq->id];
1543  if (q->len != 0) {
1544  SCMutexUnlock(&tv_root_lock);
1545 
1546  /* sleep outside lock */
1547  SleepMsec(1);
1548  goto again;
1549  }
1550  }
1551  }
1552 
1553  tv = tv->next;
1554  }
1555 
1556  SCMutexUnlock(&tv_root_lock);
1557  return;
1558 }
1559 
1560 /**
1561  * \brief Disable all threads having the specified TMs.
1562  *
1563  * Breaks out of the packet acquisition loop, and bumps
1564  * into the 'flow loop', where it will process packets
1565  * from the flow engine's shutdown handling.
1566  */
1568 {
1569  ThreadVars *tv = NULL;
1570  struct timeval start_ts;
1571  struct timeval cur_ts;
1572  gettimeofday(&start_ts, NULL);
1573 
1574 again:
1575  gettimeofday(&cur_ts, NULL);
1576  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1577  FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1578  "thread - \"%s\". Killing engine", tv->name);
1579  }
1580 
1581  SCMutexLock(&tv_root_lock);
1582 
1583  /* all receive threads are part of packet processing threads */
1584  tv = tv_root[TVT_PPT];
1585 
1586  /* we do have to keep in mind that TVs are arranged in the order
1587  * right from receive to log. The moment we fail to find a
1588  * receive TM amongst the slots in a tv, it indicates we are done
1589  * with all receive threads */
1590  while (tv) {
1591  int disable = 0;
1592  TmModule *tm = NULL;
1593  /* obtain the slots for this TV */
1594  TmSlot *slots = tv->tm_slots;
1595  while (slots != NULL) {
1596  tm = TmModuleGetById(slots->tm_id);
1597 
1598  if (tm->flags & TM_FLAG_RECEIVE_TM) {
1599  disable = 1;
1600  break;
1601  }
1602 
1603  slots = slots->slot_next;
1604  continue;
1605  }
1606 
1607  if (disable) {
1608  if (tv->inq != NULL) {
1609  /* we wait till we dry out all the inq packets, before we
1610  * kill this thread. Do note that you should have disabled
1611  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1612  if (!(strlen(tv->inq->name) == strlen("packetpool") &&
1613  strcasecmp(tv->inq->name, "packetpool") == 0)) {
1614  PacketQueue *q = &trans_q[tv->inq->id];
1615  if (q->len != 0) {
1616  SCMutexUnlock(&tv_root_lock);
1617  /* don't sleep while holding a lock */
1618  SleepMsec(1);
1619  goto again;
1620  }
1621  }
1622  }
1623 
1624  /* we found a receive TV. Send it a KILL_PKTACQ signal. */
1625  if (tm && tm->PktAcqBreakLoop != NULL) {
1626  tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(slots->slot_data));
1627  }
1629 
1630  if (tv->inq != NULL) {
1631  int i;
1632  for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1633  SCCondSignal(&trans_q[tv->inq->id].cond_q);
1634  }
1635  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1636  }
1637 
1638  /* wait for it to enter the 'flow loop' stage */
1639  while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
1640  SCMutexUnlock(&tv_root_lock);
1641 
1642  SleepMsec(1);
1643  goto again;
1644  }
1645  }
1646 
1647  tv = tv->next;
1648  }
1649 
1650  SCMutexUnlock(&tv_root_lock);
1651 
1652  /* finally wait for all packet threads to have
1653  * processed all of their 'live' packets so we
1654  * don't process the last live packets together
1655  * with FFR packets */
1656  TmThreadDrainPacketThreads();
1657  return;
1658 }
1659 
1660 /**
1661  * \brief Disable all threads having the specified TMs.
1662  */
1664 {
1665  ThreadVars *tv = NULL;
1666  struct timeval start_ts;
1667  struct timeval cur_ts;
1668 
1669  /* first drain all packet threads of their packets */
1670  TmThreadDrainPacketThreads();
1671 
1672  gettimeofday(&start_ts, NULL);
1673 again:
1674  gettimeofday(&cur_ts, NULL);
1675  if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1676  FatalError(SC_ERR_FATAL, "Engine unable to disable detect "
1677  "thread - \"%s\". Killing engine",
1678  tv ? tv->name : "<unknown>");
1679  }
1680 
1681  SCMutexLock(&tv_root_lock);
1682 
1683  /* all receive threads are part of packet processing threads */
1684  tv = tv_root[TVT_PPT];
1685 
1686  /* we do have to keep in mind that TVs are arranged in the order
1687  * right from receive to log. The moment we fail to find a
1688  * receive TM amongst the slots in a tv, it indicates we are done
1689  * with all receive threads */
1690  while (tv) {
1691  if (tv->inq != NULL) {
1692  /* we wait till we dry out all the inq packets, before we
1693  * kill this thread. Do note that you should have disabled
1694  * packet acquire by now using TmThreadDisableReceiveThreads()*/
1695  if (!(strlen(tv->inq->name) == strlen("packetpool") &&
1696  strcasecmp(tv->inq->name, "packetpool") == 0)) {
1697  PacketQueue *q = &trans_q[tv->inq->id];
1698  if (q->len != 0) {
1699  SCMutexUnlock(&tv_root_lock);
1700  /* don't sleep while holding a lock */
1701  SleepMsec(1);
1702  goto again;
1703  }
1704  }
1705  }
1706 
1707  /* we found our receive TV. Send it a KILL signal. This is all
1708  * we need to do to kill receive threads */
1710 
1711  if (tv->inq != NULL) {
1712  int i;
1713  for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
1714  SCCondSignal(&trans_q[tv->inq->id].cond_q);
1715  }
1716  SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
1717  }
1718 
1719  while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
1720  SCMutexUnlock(&tv_root_lock);
1721 
1722  SleepMsec(1);
1723  goto again;
1724  }
1725 
1726  tv = tv->next;
1727  }
1728 
1729  SCMutexUnlock(&tv_root_lock);
1730 
1731  return;
1732 }
1733 
1735 {
1736  ThreadVars *tv = NULL;
1737  TmSlot *slots = NULL;
1738 
1739  SCMutexLock(&tv_root_lock);
1740 
1741  /* all receive threads are part of packet processing threads */
1742  tv = tv_root[TVT_PPT];
1743 
1744  while (tv) {
1745  slots = tv->tm_slots;
1746 
1747  while (slots != NULL) {
1748  TmModule *tm = TmModuleGetById(slots->tm_id);
1749 
1750  char *found = strstr(tm->name, tm_name);
1751  if (found != NULL)
1752  goto end;
1753 
1754  slots = slots->slot_next;
1755  }
1756 
1757  tv = tv->next;
1758  }
1759 
1760  end:
1761  SCMutexUnlock(&tv_root_lock);
1762  return slots;
1763 }
1764 
1765 #define MIN_WAIT_TIME 100
1766 #define MAX_WAIT_TIME 999999
1768 {
1769  ThreadVars *tv = NULL;
1770  unsigned int sleep_usec = MIN_WAIT_TIME;
1771 
1772  BUG_ON((family < 0) || (family >= TVT_MAX));
1773 
1774 again:
1775  SCMutexLock(&tv_root_lock);
1776  tv = tv_root[family];
1777 
1778  while (tv) {
1779  int r = TmThreadKillThread(tv);
1780  if (r == 0) {
1781  SCMutexUnlock(&tv_root_lock);
1782  SleepUsec(sleep_usec);
1783  sleep_usec *= 2; /* slowly back off */
1784  sleep_usec = MIN(sleep_usec, MAX_WAIT_TIME);
1785  goto again;
1786  }
1787  sleep_usec = MIN_WAIT_TIME; /* reset */
1788 
1789  tv = tv->next;
1790  }
1791  SCMutexUnlock(&tv_root_lock);
1792 }
1793 #undef MIN_WAIT_TIME
1794 #undef MAX_WAIT_TIME
1795 
1797 {
1798  int i = 0;
1799 
1800  for (i = 0; i < TVT_MAX; i++) {
1802  }
1803 
1804  return;
1805 }
1806 
1807 static void TmThreadFree(ThreadVars *tv)
1808 {
1809  TmSlot *s;
1810  TmSlot *ps;
1811  if (tv == NULL)
1812  return;
1813 
1814  SCLogDebug("Freeing thread '%s'.", tv->name);
1815 
1816  StatsThreadCleanup(tv);
1817 
1818  TmThreadDeinitMC(tv);
1819 
1820  if (tv->thread_group_name) {
1822  }
1823 
1824  if (tv->printable_name) {
1825  SCFree(tv->printable_name);
1826  }
1827 
1828  s = (TmSlot *)tv->tm_slots;
1829  while (s) {
1830  ps = s;
1831  s = s->slot_next;
1832  SCFree(ps);
1833  }
1834 
1836  SCFree(tv);
1837 }
1838 
1839 void TmThreadSetGroupName(ThreadVars *tv, const char *name)
1840 {
1841  char *thread_group_name = NULL;
1842 
1843  if (name == NULL)
1844  return;
1845 
1846  if (tv == NULL)
1847  return;
1848 
1849  thread_group_name = SCStrdup(name);
1850  if (unlikely(thread_group_name == NULL)) {
1851  SCLogError(SC_ERR_RUNMODE, "error allocating memory");
1852  return;
1853  }
1854  tv->thread_group_name = thread_group_name;
1855 }
1856 
1858 {
1859  ThreadVars *tv = NULL;
1860  ThreadVars *ptv = NULL;
1861 
1862  if ((family < 0) || (family >= TVT_MAX))
1863  return;
1864 
1865  SCMutexLock(&tv_root_lock);
1866  tv = tv_root[family];
1867 
1868  while (tv) {
1869  ptv = tv;
1870  tv = tv->next;
1871  TmThreadFree(ptv);
1872  }
1873  tv_root[family] = NULL;
1874  SCMutexUnlock(&tv_root_lock);
1875 }
1876 
1877 /**
1878  * \brief Spawns a thread associated with the ThreadVars instance tv
1879  *
1880  * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
1881  */
1883 {
1884  pthread_attr_t attr;
1885  if (tv->tm_func == NULL) {
1886  printf("ERROR: no thread function set\n");
1887  return TM_ECODE_FAILED;
1888  }
1889 
1890  /* Initialize and set thread detached attribute */
1891  pthread_attr_init(&attr);
1892 
1893  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
1894 
1895  int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
1896  if (rc) {
1897  printf("ERROR; return code from pthread_create() is %" PRId32 "\n", rc);
1898  return TM_ECODE_FAILED;
1899  }
1900 
1902 
1903  TmThreadAppend(tv, tv->type);
1904  return TM_ECODE_OK;
1905 }
1906 
1907 /**
1908  * \brief Sets the thread flags for a thread instance(tv)
1909  *
1910  * \param tv Pointer to the thread instance for which the flag has to be set
1911  * \param flags Holds the thread state this thread instance has to be set to
1912  */
1913 #if 0
1914 void TmThreadSetFlags(ThreadVars *tv, uint8_t flags)
1915 {
1916  if (tv != NULL)
1917  tv->flags = flags;
1918 
1919  return;
1920 }
1921 #endif
1922 
1923 /**
1924  * \brief Initializes the mutex and condition variables for this TV
1925  *
1926  * It can be used by a thread to control a wait loop that can also be
1927  * influenced by other threads.
1928  *
1929  * \param tv Pointer to a TV instance
1930  */
1932 {
1933  if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) {
1934  SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. "
1935  "Exiting...");
1936  exit(EXIT_FAILURE);
1937  }
1938 
1939  if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) {
1940  printf("Error initializing the tv->m mutex\n");
1941  exit(EXIT_FAILURE);
1942  }
1943 
1944  if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) {
1945  SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. "
1946  "Exiting...");
1947  exit(EXIT_FAILURE);
1948  }
1949 
1950  if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) {
1951  SCLogError(SC_ERR_FATAL, "Error initializing the tv->cond condition "
1952  "variable");
1953  exit(EXIT_FAILURE);
1954  }
1955 
1956  return;
1957 }
1958 
1959 static void TmThreadDeinitMC(ThreadVars *tv)
1960 {
1961  if (tv->ctrl_mutex) {
1963  SCFree(tv->ctrl_mutex);
1964  }
1965  if (tv->ctrl_cond) {
1967  SCFree(tv->ctrl_cond);
1968  }
1969  return;
1970 }
1971 
1972 /**
1973  * \brief Tests if the thread represented in the arg has been unpaused or not.
1974  *
1975  * The function would return if the thread tv has been unpaused or if the
1976  * kill flag for the thread has been set.
1977  *
1978  * \param tv Pointer to the TV instance.
1979  */
1981 {
1982  while (TmThreadsCheckFlag(tv, THV_PAUSE)) {
1983  SleepUsec(100);
1984 
1985  if (TmThreadsCheckFlag(tv, THV_KILL))
1986  break;
1987  }
1988 
1989  return;
1990 }
1991 
1992 /**
1993  * \brief Waits till the specified flag(s) is(are) set. We don't bother if
1994  * the kill flag has been set or not on the thread.
1995  *
1996  * \param tv Pointer to the TV instance.
1997  */
1998 void TmThreadWaitForFlag(ThreadVars *tv, uint16_t flags)
1999 {
2000  while (!TmThreadsCheckFlag(tv, flags)) {
2001  SleepUsec(100);
2002  }
2003 
2004  return;
2005 }
2006 
2007 /**
2008  * \brief Unpauses a thread
2009  *
2010  * \param tv Pointer to a TV instance that has to be unpaused
2011  */
2013 {
2015 
2016  return;
2017 }
2018 
2019 /**
2020  * \brief Unpauses all threads present in tv_root
2021  */
2023 {
2024  ThreadVars *tv = NULL;
2025  int i = 0;
2026 
2027  SCMutexLock(&tv_root_lock);
2028  for (i = 0; i < TVT_MAX; i++) {
2029  tv = tv_root[i];
2030  while (tv != NULL) {
2031  TmThreadContinue(tv);
2032  tv = tv->next;
2033  }
2034  }
2035  SCMutexUnlock(&tv_root_lock);
2036 
2037  return;
2038 }
2039 
2040 /**
2041  * \brief Pauses a thread
2042  *
2043  * \param tv Pointer to a TV instance that has to be paused
2044  */
2046 {
2048 
2049  return;
2050 }
2051 
2052 /**
2053  * \brief Pauses all threads present in tv_root
2054  */
2056 {
2057  ThreadVars *tv = NULL;
2058  int i = 0;
2059 
2061 
2062  SCMutexLock(&tv_root_lock);
2063  for (i = 0; i < TVT_MAX; i++) {
2064  tv = tv_root[i];
2065  while (tv != NULL) {
2066  TmThreadPause(tv);
2067  tv = tv->next;
2068  }
2069  }
2070  SCMutexUnlock(&tv_root_lock);
2071 
2072  return;
2073 }
2074 
2075 /**
2076  * \brief Used to check the thread for certain conditions of failure.
2077  */
2079 {
2080  ThreadVars *tv = NULL;
2081  int i = 0;
2082 
2083  SCMutexLock(&tv_root_lock);
2084  for (i = 0; i < TVT_MAX; i++) {
2085  tv = tv_root[i];
2086 
2087  while (tv) {
2088  if (TmThreadsCheckFlag(tv, THV_FAILED)) {
2089  FatalError(SC_ERR_FATAL, "thread %s failed", tv->name);
2090  }
2091  tv = tv->next;
2092  }
2093  }
2094  SCMutexUnlock(&tv_root_lock);
2095  return;
2096 }
2097 
2098 /**
2099  * \brief Used to check if all threads have finished their initialization. On
2100  * finding an un-initialized thread, it waits till that thread completes
2101  * its initialization, before proceeding to the next thread.
2102  *
2103  * \retval TM_ECODE_OK all initialized properly
2104  * \retval TM_ECODE_FAILED failure
2105  */
2107 {
2108  ThreadVars *tv = NULL;
2109  int i = 0;
2110  uint16_t mgt_num = 0;
2111  uint16_t ppt_num = 0;
2112 
2113  struct timeval start_ts;
2114  struct timeval cur_ts;
2115  gettimeofday(&start_ts, NULL);
2116 
2117 again:
2118  SCMutexLock(&tv_root_lock);
2119  for (i = 0; i < TVT_MAX; i++) {
2120  tv = tv_root[i];
2121  while (tv != NULL) {
2122  if (TmThreadsCheckFlag(tv, (THV_CLOSED|THV_DEAD))) {
2123  SCMutexUnlock(&tv_root_lock);
2124 
2125  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
2126  "initialize: flags %04x", tv->name,
2127  SC_ATOMIC_GET(tv->flags));
2128  return TM_ECODE_FAILED;
2129  }
2130 
2131  if (!(TmThreadsCheckFlag(tv, THV_INIT_DONE))) {
2132  SCMutexUnlock(&tv_root_lock);
2133 
2134  gettimeofday(&cur_ts, NULL);
2135  if ((cur_ts.tv_sec - start_ts.tv_sec) > 120) {
2136  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
2137  "initialize in time: flags %04x", tv->name,
2138  SC_ATOMIC_GET(tv->flags));
2139  return TM_ECODE_FAILED;
2140  }
2141 
2142  /* sleep a little to give the thread some
2143  * time to finish initialization */
2144  SleepUsec(100);
2145  goto again;
2146  }
2147 
2148  if (TmThreadsCheckFlag(tv, THV_FAILED)) {
2149  SCMutexUnlock(&tv_root_lock);
2150  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to "
2151  "initialize.", tv->name);
2152  return TM_ECODE_FAILED;
2153  }
2154  if (TmThreadsCheckFlag(tv, THV_CLOSED)) {
2155  SCMutexUnlock(&tv_root_lock);
2156  SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" closed on "
2157  "initialization.", tv->name);
2158  return TM_ECODE_FAILED;
2159  }
2160 
2161  if (i == TVT_MGMT)
2162  mgt_num++;
2163  else if (i == TVT_PPT)
2164  ppt_num++;
2165 
2166  tv = tv->next;
2167  }
2168  }
2169  SCMutexUnlock(&tv_root_lock);
2170 
2171  SCLogNotice("all %"PRIu16" packet processing threads, %"PRIu16" management "
2172  "threads initialized, engine started.", ppt_num, mgt_num);
2173 
2174  return TM_ECODE_OK;
2175 }
2176 
2177 /**
2178  * \brief Returns the TV for the calling thread.
2179  *
2180  * \retval tv Pointer to the ThreadVars instance for the calling thread;
2181  * NULL on no match
2182  */
2184 {
2185  pthread_t self = pthread_self();
2186  ThreadVars *tv = NULL;
2187  int i = 0;
2188 
2189  SCMutexLock(&tv_root_lock);
2190 
2191  for (i = 0; i < TVT_MAX; i++) {
2192  tv = tv_root[i];
2193  while (tv) {
2194  if (pthread_equal(self, tv->t)) {
2195  SCMutexUnlock(&tv_root_lock);
2196  return tv;
2197  }
2198  tv = tv->next;
2199  }
2200  }
2201 
2202  SCMutexUnlock(&tv_root_lock);
2203 
2204  return NULL;
2205 }
2206 
2207 /**
2208  * \brief returns a count of all the threads that match the flag
2209  */
2210 uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
2211 {
2212  ThreadVars *tv = NULL;
2213  int i = 0;
2214  uint32_t cnt = 0;
2215 
2216  SCMutexLock(&tv_root_lock);
2217  for (i = 0; i < TVT_MAX; i++) {
2218  tv = tv_root[i];
2219  while (tv != NULL) {
2220  if ((tv->tmm_flags & flags) == flags)
2221  cnt++;
2222 
2223  tv = tv->next;
2224  }
2225  }
2226  SCMutexUnlock(&tv_root_lock);
2227  return cnt;
2228 }
2229 
2230 typedef struct Thread_ {
2231  ThreadVars *tv; /**< threadvars structure */
2232  const char *name;
2233  int type;
2234  int in_use; /**< bool to indicate this is in use */
2235 
2236  struct timeval ts; /**< current time of this thread (offline mode) */
2237 } Thread;
2238 
2239 typedef struct Threads_ {
2240  Thread *threads;
2243 } Threads;
2244 
2245 static Threads thread_store = { NULL, 0, 0 };
2246 static SCMutex thread_store_lock = SCMUTEX_INITIALIZER;
2247 
2249 {
2250  Thread *t;
2251  size_t s;
2252 
2253  SCMutexLock(&thread_store_lock);
2254 
2255  for (s = 0; s < thread_store.threads_size; s++) {
2256  t = &thread_store.threads[s];
2257  if (t == NULL || t->in_use == 0)
2258  continue;
2259  SCLogInfo("Thread %"PRIuMAX", %s type %d, tv %p", (uintmax_t)s+1, t->name, t->type, t->tv);
2260  }
2261 
2262  SCMutexUnlock(&thread_store_lock);
2263 }
2264 
2265 #define STEP 32
2266 /**
2267  * \retval id thread id, or 0 if not found
2268  */
2270 {
2271  SCMutexLock(&thread_store_lock);
2272  if (thread_store.threads == NULL) {
2273  thread_store.threads = SCCalloc(STEP, sizeof(Thread));
2274  BUG_ON(thread_store.threads == NULL);
2275  thread_store.threads_size = STEP;
2276  }
2277 
2278  size_t s;
2279  for (s = 0; s < thread_store.threads_size; s++) {
2280  if (thread_store.threads[s].in_use == 0) {
2281  Thread *t = &thread_store.threads[s];
2282  t->name = tv->name;
2283  t->type = type;
2284  t->tv = tv;
2285  t->in_use = 1;
2286 
2287  SCMutexUnlock(&thread_store_lock);
2288  return (int)(s+1);
2289  }
2290  }
2291 
2292  /* if we get here the array is completely filled */
2293  void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread)));
2294  BUG_ON(newmem == NULL);
2295  thread_store.threads = newmem;
2296  memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP * sizeof(Thread));
2297 
2298  Thread *t = &thread_store.threads[thread_store.threads_size];
2299  t->name = tv->name;
2300  t->type = type;
2301  t->tv = tv;
2302  t->in_use = 1;
2303 
2304  s = thread_store.threads_size;
2305  thread_store.threads_size += STEP;
2306 
2307  SCMutexUnlock(&thread_store_lock);
2308  return (int)(s+1);
2309 }
2310 #undef STEP
2311 
2312 void TmThreadsUnregisterThread(const int id)
2313 {
2314  SCMutexLock(&thread_store_lock);
2315  if (id <= 0 || id > (int)thread_store.threads_size) {
2316  SCMutexUnlock(&thread_store_lock);
2317  return;
2318  }
2319 
2320  /* id is one higher than index */
2321  int idx = id - 1;
2322 
2323  /* reset thread_id, which serves as clearing the record */
2324  thread_store.threads[idx].in_use = 0;
2325 
2326  /* check if we have at least one registered thread left */
2327  size_t s;
2328  for (s = 0; s < thread_store.threads_size; s++) {
2329  Thread *t = &thread_store.threads[s];
2330  if (t->in_use == 1) {
2331  goto end;
2332  }
2333  }
2334 
2335  /* if we get here no threads are registered */
2336  SCFree(thread_store.threads);
2337  thread_store.threads = NULL;
2338  thread_store.threads_size = 0;
2339  thread_store.threads_cnt = 0;
2340 
2341 end:
2342  SCMutexUnlock(&thread_store_lock);
2343 }
2344 
2345 void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
2346 {
2347  SCMutexLock(&thread_store_lock);
2348  if (unlikely(id <= 0 || id > (int)thread_store.threads_size)) {
2349  SCMutexUnlock(&thread_store_lock);
2350  return;
2351  }
2352 
2353  int idx = id - 1;
2354  Thread *t = &thread_store.threads[idx];
2355  t->ts.tv_sec = ts->tv_sec;
2356  t->ts.tv_usec = ts->tv_usec;
2357  SCMutexUnlock(&thread_store_lock);
2358 }
2359 
2360 #define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec) // XXX unify with flow-util.h
2361 void TmreadsGetMinimalTimestamp(struct timeval *ts)
2362 {
2363  struct timeval local, nullts;
2364  memset(&local, 0, sizeof(local));
2365  memset(&nullts, 0, sizeof(nullts));
2366  int set = 0;
2367  size_t s;
2368 
2369  SCMutexLock(&thread_store_lock);
2370  for (s = 0; s < thread_store.threads_size; s++) {
2371  Thread *t = &thread_store.threads[s];
2372  if (t == NULL || t->in_use == 0)
2373  continue;
2374  if (!(timercmp(&t->ts, &nullts, ==))) {
2375  if (!set) {
2376  local.tv_sec = t->ts.tv_sec;
2377  local.tv_usec = t->ts.tv_usec;
2378  set = 1;
2379  } else {
2380  if (timercmp(&t->ts, &local, <)) {
2381  COPY_TIMESTAMP(&t->ts, &local);
2382  }
2383  }
2384  }
2385  }
2386  SCMutexUnlock(&thread_store_lock);
2387  COPY_TIMESTAMP(&local, ts);
2388  SCLogDebug("ts->tv_sec %"PRIuMAX, (uintmax_t)ts->tv_sec);
2389 }
2390 #undef COPY_TIMESTAMP
2391 
2392 /**
2393  * \retval r 1 if packet was accepted, 0 otherwise
2394  * \note if packet was not accepted, it's still the responsibility
2395  * of the caller.
2396  */
2397 int TmThreadsInjectPacketsById(Packet **packets, const int id)
2398 {
2399  if (id <= 0 || id > (int)thread_store.threads_size)
2400  return 0;
2401 
2402  int idx = id - 1;
2403 
2404  Thread *t = &thread_store.threads[idx];
2405  ThreadVars *tv = t->tv;
2406 
2407  if (tv == NULL || tv->stream_pq == NULL)
2408  return 0;
2409 
2410  SCMutexLock(&tv->stream_pq->mutex_q);
2411  while (*packets != NULL) {
2412  PacketEnqueue(tv->stream_pq, *packets);
2413  packets++;
2414  }
2416 
2417  /* wake up listening thread(s) if necessary */
2418  if (tv->inq != NULL) {
2419  SCCondSignal(&trans_q[tv->inq->id].cond_q);
2420  }
2421  return 1;
2422 }
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value from our atomic variable.
Definition: util-atomic.h:155
Packet *(* InHandler)(ThreadVars *)
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:88
#define SCCondSignal(x)
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
Appends a new entry to the slots.
Definition: tm-threads.c:836
#define THREAD_SET_PRIORITY
Definition: threadvars.h:117
const char * name
#define SCDropCaps(...)
Definition: util-privs.h:37
Tmq * TmqGetQueueByName(const char *name)
Definition: tm-queues.c:56
uint16_t flags
void EngineDone(void)
Used to indicate that the current task is done.
Definition: suricata.c:427
TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type)
Definition: tm-threads.c:1040
#define SCLogDebug(...)
Definition: util-debug.h:335
void TmThreadSetFlags(ThreadVars *, uint8_t)
uint8_t type
Definition: threadvars.h:92
int AffinityGetNextCPU(ThreadsAffinityType *taf)
Return next cpu to use for a given thread family.
__thread uint64_t rww_lock_contention
int TmThreadGetNbThreads(uint8_t type)
Definition: tm-threads.c:1056
uint8_t cap_flags
Definition: tm-modules.h:67
void TmThreadCheckThreadState(void)
Used to check the thread for certain conditions of failure.
Definition: tm-threads.c:2078
void TmThreadPause(ThreadVars *tv)
Pauses a thread.
Definition: tm-threads.c:2045
SCCondT cond_q
Definition: decode.h:638
uint16_t writer_cnt
Definition: tm-queues.h:31
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TmEcode(* SlotThreadDeinit)(ThreadVars *, void *)
Definition: tm-threads.h:49
int thread_priority
Definition: threadvars.h:96
#define BUG_ON(x)
#define THV_KILL_PKTACQ
Definition: threadvars.h:47
uint8_t flags
Definition: tm-modules.h:70
#define SCSetThreadName(n)
Definition: threads.h:301
#define THV_RUNNING_DONE
Definition: threadvars.h:45
#define STEP
Definition: tm-threads.c:2265
struct ThreadVars_ * prev
Definition: threadvars.h:112
void TmThreadDisablePacketThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1663
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1282
int TmThreadsInjectPacketsById(Packet **packets, const int id)
Definition: tm-threads.c:2397
#define unlikely(expr)
Definition: util-optimize.h:35
void StatsThreadCleanup(ThreadVars *tv)
Definition: counters.c:1310
TmEcode(* Func)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *)
Definition: tm-modules.h:52
TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio)
Set the thread options (thread priority).
Definition: tm-threads.c:984
pthread_t t
Definition: threadvars.h:58
TmSlot * TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name)
Definition: tm-threads.c:1734
#define THV_DEAD
Definition: threadvars.h:54
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-threads.h:45
uint16_t id
Definition: tm-queues.h:29
#define MIN(x, y)
SCCtrlCondT * ctrl_cond
Definition: threadvars.h:107
ThreadVars * TmThreadCreatePacketHandler(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots)
Creates and returns a TV instance for a Packet Processing Thread. This function doesn&#39;t support custo...
Definition: tm-threads.c:1252
int type
Definition: tm-threads.c:2233
void TmThreadSetGroupName(ThreadVars *tv, const char *name)
Definition: tm-threads.c:1839
#define THV_PAUSED
Definition: threadvars.h:38
#define COPY_TIMESTAMP(src, dst)
Definition: tm-threads.c:2360
__thread uint64_t spin_lock_wait_ticks
struct Thread_ Thread
#define THV_FAILED
Definition: threadvars.h:40
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:201
void TmThreadsSetThreadTimestamp(const int id, const struct timeval *ts)
Definition: tm-threads.c:2345
#define SC_ATOMIC_AND(name, val)
Bitwise AND a value from our atomic variable.
Definition: util-atomic.h:142
__thread uint64_t spin_lock_contention
#define SCCtrlCondDestroy
ThreadVars * TmThreadCreateCmdThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Command thread (CMD). This function supports only custom sl...
Definition: tm-threads.c:1343
void TmThreadClearThreadsFamily(int family)
Definition: tm-threads.c:1857
#define THREAD_SET_AFFINITY
Definition: threadvars.h:116
void TmThreadContinue(ThreadVars *tv)
Unpauses a thread.
Definition: tm-threads.c:2012
void(* InShutdownHandler)(struct ThreadVars_ *)
Definition: threadvars.h:79
#define SCCtrlMutexDestroy
TmEcode(* PktAcqLoop)(ThreadVars *, void *, void *)
Definition: tm-modules.h:54
__thread uint64_t rwr_lock_wait_ticks
TmEcode TmThreadWaitOnThreadInit(void)
Used to check if all threads have finished their initialization. On finding an un-initialized thread...
Definition: tm-threads.c:2106
ThreadVars * TmThreadsGetCallingThread(void)
Returns the TV for the calling thread.
Definition: tm-threads.c:2183
char * thread_group_name
Definition: threadvars.h:61
SCCtrlMutex * ctrl_mutex
Definition: threadvars.h:106
#define SCCtrlCondInit
#define SleepUsec(usec)
Definition: tm-threads.c:84
#define TM_FLAG_RECEIVE_TM
Definition: tm-modules.h:31
#define SleepMsec(msec)
Definition: tm-threads.c:85
uint8_t thread_setup_flags
Definition: threadvars.h:89
void TmThreadsListThreads(void)
Definition: tm-threads.c:2248
void(* OutHandlerCtxFree)(void *)
TmEcode(* PktAcqBreakLoop)(ThreadVars *, void *)
Definition: tm-modules.h:57
void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag)
Set a thread flag.
Definition: tm-threads.c:113
#define SC_ATOMIC_INIT(name)
Initialize the previously declared atomic variable and it&#39;s lock.
Definition: util-atomic.h:82
#define SCCalloc(nm, a)
Definition: util-mem.h:205
struct TmSlot_ * tm_slots
Definition: threadvars.h:84
void TmThreadWaitForFlag(ThreadVars *tv, uint16_t flags)
Waits till the specified flag(s) is(are) set. We don&#39;t bother if the kill flag has been set or not on...
Definition: tm-threads.c:1998
#define SCMutexUnlock(mut)
#define THV_INIT_DONE
Definition: threadvars.h:36
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tm-queues.h:27
void TmThreadRemove(ThreadVars *tv, int type)
Removes this TV from tv_root based on its type.
Definition: tm-threads.c:1407
uint16_t type
#define THV_CLOSED
Definition: threadvars.h:41
SCMutex tv_root_lock
Definition: tm-threads.c:97
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-modules.h:59
#define SCMutexInit(mut, mutattr)
uint32_t len
Definition: decode.h:633
void TmThreadKillThreadsFamily(int family)
Definition: tm-threads.c:1767
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:294
#define PACKET_PROFILING_TMM_END(p, id)
struct ThreadVars_ * next
Definition: threadvars.h:111
#define SCGetThreadIdLong(...)
Definition: threads.h:255
const void * slot_initdata
Definition: tm-threads.h:52
void(* tmqh_out)(struct ThreadVars_ *, struct Packet_ *)
Definition: threadvars.h:80
__thread uint64_t mutex_lock_wait_ticks
size_t threads_size
Definition: tm-threads.c:2241
PacketQueue slot_post_pq
Definition: tm-threads.h:63
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
#define SCEnter(...)
Definition: util-debug.h:337
TmEcode(* Management)(ThreadVars *, void *)
Definition: tm-threads.h:75
#define PACKET_PROFILING_TMM_START(p, id)
Packet * top
Definition: decode.h:631
ThreadVars * TmThreadCreateMgmtThreadByName(const char *name, const char *module, int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1310
int tm_id
Definition: tm-threads.h:66
struct TmSlot_ * slot_next
Definition: tm-threads.h:72
#define THV_USE
Definition: threadvars.h:35
void TmThreadPauseThreads()
Pauses all threads present in tv_root.
Definition: tm-threads.c:2055
void TmThreadContinueThreads()
Unpauses all threads present in tv_root.
Definition: tm-threads.c:2022
void PacketPoolInitEmpty(void)
void TmThreadsUnsetFlag(ThreadVars *tv, uint16_t flag)
Unset a thread flag.
Definition: tm-threads.c:121
PacketQueue slot_pre_pq
Definition: tm-threads.h:58
Tmq * outq
Definition: threadvars.h:73
void TmThreadAppend(ThreadVars *tv, int type)
Appends this TV to tv_root based on its type.
Definition: tm-threads.c:1369
uint8_t tmm_flags
Definition: threadvars.h:66
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
Separate run function so we can call it recursively.
Definition: tm-threads.c:131
#define SCCtrlMutexInit(mut, mutattr)
int TmThreadsRegisterThread(ThreadVars *tv, const int type)
Definition: tm-threads.c:2269
void PacketPoolDestroy(void)
TmEcode(* SlotThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-threads.h:47
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
#define THV_PAUSE
Definition: threadvars.h:37
void(* OutHandler)(ThreadVars *, Packet *)
void(* SlotThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-threads.h:48
struct Threads_ Threads
#define THV_KILL
Definition: threadvars.h:39
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:281
#define THREAD_SET_AFFTYPE
Definition: threadvars.h:118
__thread uint64_t mutex_lock_contention
__thread uint64_t rwr_lock_cnt
#define SCRealloc(x, a)
Definition: util-mem.h:190
uint16_t cpu_affinity
Definition: threadvars.h:94
#define THV_FLOW_LOOP
Definition: threadvars.h:48
TmSlot * TmSlotGetSlotForTM(int tm_id)
Returns the slot holding a TM with the particular tm_id.
Definition: tm-threads.c:885
void PacketPoolInit(void)
void TmThreadKillThreads(void)
Definition: tm-threads.c:1796
uint16_t reader_cnt
Definition: tm-queues.h:30
ThreadsAffinityType thread_affinity[MAX_CPU_SET]
Definition: util-affinity.c:34
const char * name
Definition: tm-modules.h:44
Tmqh * TmqhGetQueueHandlerByName(const char *name)
#define MAX_WAIT_TIME
Definition: tm-threads.c:1766
void TmreadsGetMinimalTimestamp(struct timeval *ts)
Definition: tm-threads.c:2361
#define SCMalloc(a)
Definition: util-mem.h:174
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:208
ThreadVars * tv
Definition: tm-threads.h:40
void TmThreadsUnregisterThread(const int id)
Definition: tm-threads.c:2312
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:254
ThreadVars * tv
Definition: tm-threads.c:2231
StatsPublicThreadContext perf_public_ctx
Definition: threadvars.h:101
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:1072
#define SCFree(a)
Definition: util-mem.h:236
void *(* tm_func)(void *)
Definition: threadvars.h:83
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:269
void TmThreadDisableReceiveThreads(void)
Disable all threads having the specified TMs.
Definition: tm-threads.c:1567
const char * outqh_name
Definition: threadvars.h:75
char * printable_name
Definition: threadvars.h:60
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2210
void TmThreadSetPrio(ThreadVars *tv)
Adjusting nice value for threads.
Definition: tm-threads.c:995
#define MIN_WAIT_TIME
Definition: tm-threads.c:1765
int TmModuleGetIDForTM(TmModule *tm)
Given a TM Module, returns its id.
Definition: tm-modules.c:107
#define SCMutex
PacketQueue trans_q[256]
Definition: suricata.h:132
#define SCLogPerf(...)
Definition: util-debug.h:261
#define FatalError(x,...)
Definition: util-debug.h:539
ThreadVars * TmThreadsGetTVContainingSlot(TmSlot *tm_slot)
Definition: tm-threads.c:799
__thread uint64_t rwr_lock_contention
__thread uint64_t rww_lock_cnt
#define SCMutexLock(mut)
void(* InShutdownHandler)(ThreadVars *)
TmEcode(* TmSlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *)
Definition: tm-threads.h:35
SCMutex m
Definition: flow-hash.h:105
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1201
__thread uint64_t rww_lock_wait_ticks
void * outctx
Definition: threadvars.h:74
struct timeval ts
Definition: tm-threads.c:2236
int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:105
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:193
TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu)
Set the thread options (cpu affinity).
Definition: tm-threads.c:1031
#define StatsSyncCounters(tv)
Definition: counters.h:133
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:167
int in_use
Definition: tm-threads.c:2234
void TmThreadInitMC(ThreadVars *tv)
Sets the thread flags for a thread instance(tv)
Definition: tm-threads.c:1931
const char * name
Definition: tm-threads.c:2232
#define SCStrdup(a)
Definition: util-mem.h:220
char name[16]
Definition: threadvars.h:59
TmModule * TmModuleGetByName(const char *name)
get a tm module ptr by name
Definition: tm-modules.c:51
Thread * threads
Definition: tm-threads.c:2240
struct PacketQueue_ * stream_pq
Definition: threadvars.h:87
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:94
#define SCReturn
Definition: util-debug.h:339
#define THV_DEINIT
Definition: threadvars.h:44
Per thread variable structure.
Definition: threadvars.h:57
__thread uint64_t mutex_lock_cnt
char * name
Definition: tm-queues.h:28
void *(* OutHandlerCtxSetup)(const char *)
Tmq * inq
Definition: threadvars.h:72
uint8_t cap_flags
Definition: threadvars.h:109
int id
Definition: tm-threads.h:69
int threading_set_cpu_affinity
Definition: runmodes.h:112
struct Packet_ *(* tmqh_in)(struct ThreadVars_ *)
Definition: threadvars.h:78
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1882
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:1131
Tmq * TmqCreateQueue(const char *name)
Definition: tm-queues.c:36
int threads_cnt
Definition: tm-threads.c:2242
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:139
__thread uint64_t spin_lock_cnt
SCMutex mutex_q
Definition: decode.h:637
void TmThreadTestThreadUnPaused(ThreadVars *tv)
Tests if the thread represented in the arg has been unpaused or not.
Definition: tm-threads.c:1980
void TmqhReleasePacketsToPacketPool(PacketQueue *pq)
Release all the packets in the queue back to the packetpool. Mainly used by threads that have failed...
#define SCMUTEX_INITIALIZER