suricata
tmqh-packetpool.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2014 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  *
23  * Packetpool queue handlers. Packet pool is implemented as a stack.
24  */
25 
26 #include "suricata.h"
27 #include "packet-queue.h"
28 #include "decode.h"
29 #include "detect.h"
30 #include "detect-uricontent.h"
31 #include "threads.h"
32 #include "threadvars.h"
33 #include "flow.h"
34 #include "flow-util.h"
35 #include "host.h"
36 
37 #include "stream.h"
38 #include "stream-tcp-reassemble.h"
39 
40 #include "tm-queuehandlers.h"
41 #include "tm-threads.h"
42 #include "tm-modules.h"
43 
44 #include "pkt-var.h"
45 
46 #include "tmqh-packetpool.h"
47 
48 #include "util-debug.h"
49 #include "util-error.h"
50 #include "util-profiling.h"
51 #include "util-device.h"
52 
53 /* Number of freed packet to save for one pool before freeing them. */
54 #define MAX_PENDING_RETURN_PACKETS 32
55 static uint32_t max_pending_return_packets = MAX_PENDING_RETURN_PACKETS;
56 
57 #ifdef TLS
58 __thread PktPool thread_pkt_pool;
59 
60 static inline PktPool *GetThreadPacketPool(void)
61 {
62  return &thread_pkt_pool;
63 }
64 #else
65 /* __thread not supported. */
66 static pthread_key_t pkt_pool_thread_key;
67 static SCMutex pkt_pool_thread_key_mutex = SCMUTEX_INITIALIZER;
68 static int pkt_pool_thread_key_initialized = 0;
69 
70 static void PktPoolThreadDestroy(void * buf)
71 {
72  SCFreeAligned(buf);
73 }
74 
75 static void TmqhPacketPoolInit(void)
76 {
77  SCMutexLock(&pkt_pool_thread_key_mutex);
78  if (pkt_pool_thread_key_initialized) {
79  /* Key has already been created. */
80  SCMutexUnlock(&pkt_pool_thread_key_mutex);
81  return;
82  }
83 
84  /* Create the pthread Key that is used to look up thread specific
85  * data buffer. Needs to be created only once.
86  */
87  int r = pthread_key_create(&pkt_pool_thread_key, PktPoolThreadDestroy);
88  if (r != 0) {
89  SCLogError(SC_ERR_MEM_ALLOC, "pthread_key_create failed with %d", r);
90  exit(EXIT_FAILURE);
91  }
92 
93  pkt_pool_thread_key_initialized = 1;
94  SCMutexUnlock(&pkt_pool_thread_key_mutex);
95 }
96 
97 static PktPool *ThreadPacketPoolCreate(void)
98 {
99  TmqhPacketPoolInit();
100 
101  /* Create a new pool for this thread. */
102  PktPool* pool = (PktPool*)SCMallocAligned(sizeof(PktPool), CLS);
103  if (pool == NULL) {
104  SCLogError(SC_ERR_MEM_ALLOC, "malloc failed");
105  exit(EXIT_FAILURE);
106  }
107  memset(pool,0x0,sizeof(*pool));
108 
109  int r = pthread_setspecific(pkt_pool_thread_key, pool);
110  if (r != 0) {
111  SCLogError(SC_ERR_MEM_ALLOC, "pthread_setspecific failed with %d", r);
112  exit(EXIT_FAILURE);
113  }
114 
115  return pool;
116 }
117 
118 static inline PktPool *GetThreadPacketPool(void)
119 {
120  PktPool* pool = (PktPool*)pthread_getspecific(pkt_pool_thread_key);
121  if (pool == NULL)
122  pool = ThreadPacketPoolCreate();
123 
124  return pool;
125 }
126 #endif
127 
128 /**
129  * \brief TmqhPacketpoolRegister
130  * \initonly
131  */
133 {
134  tmqh_table[TMQH_PACKETPOOL].name = "packetpool";
137 }
138 
139 static int PacketPoolIsEmpty(PktPool *pool)
140 {
141  /* Check local stack first. */
142  if (pool->head || pool->return_stack.head)
143  return 0;
144 
145  return 1;
146 }
147 
148 void PacketPoolWait(void)
149 {
150  PktPool *my_pool = GetThreadPacketPool();
151 
152  if (PacketPoolIsEmpty(my_pool)) {
153  SCMutexLock(&my_pool->return_stack.mutex);
154  SC_ATOMIC_ADD(my_pool->return_stack.sync_now, 1);
155  SCCondWait(&my_pool->return_stack.cond, &my_pool->return_stack.mutex);
156  SCMutexUnlock(&my_pool->return_stack.mutex);
157  }
158 
159  while(PacketPoolIsEmpty(my_pool))
160  cc_barrier();
161 }
162 
163 /** \brief Wait until we have the requested amount of packets in the pool
164  *
165  * In some cases waiting for packets is undesirable. Especially when
166  * a wait would happen under a lock of some kind, other parts of the
167  * engine could have to wait.
168  *
169  * This function only returns when at least N packets are in our pool.
170  *
171  * \param n number of packets needed
172  */
174 {
175  PktPool *my_pool = GetThreadPacketPool();
176  Packet *p = NULL;
177 
178  while (1) {
179  int i = 0;
180  PacketPoolWait();
181 
182  /* count packets in our stack */
183  p = my_pool->head;
184  while (p != NULL) {
185  if (++i == n)
186  return;
187 
188  p = p->next;
189  }
190 
191  /* continue counting in the return stack */
192  if (my_pool->return_stack.head != NULL) {
193  SCMutexLock(&my_pool->return_stack.mutex);
194  p = my_pool->return_stack.head;
195  while (p != NULL) {
196  if (++i == n) {
197  SCMutexUnlock(&my_pool->return_stack.mutex);
198  return;
199  }
200  p = p->next;
201  }
202  SCMutexUnlock(&my_pool->return_stack.mutex);
203 
204  /* or signal that we need packets and wait */
205  } else {
206  SCMutexLock(&my_pool->return_stack.mutex);
207  SC_ATOMIC_ADD(my_pool->return_stack.sync_now, 1);
208  SCCondWait(&my_pool->return_stack.cond, &my_pool->return_stack.mutex);
209  SCMutexUnlock(&my_pool->return_stack.mutex);
210  }
211  }
212 }
213 
214 /** \brief a initialized packet
215  *
216  * \warning Use *only* at init, not at packet runtime
217  */
218 static void PacketPoolStorePacket(Packet *p)
219 {
220  /* Clear the PKT_ALLOC flag, since that indicates to push back
221  * onto the ring buffer. */
222  p->flags &= ~PKT_ALLOC;
223  p->pool = GetThreadPacketPool();
226 }
227 
228 static void PacketPoolGetReturnedPackets(PktPool *pool)
229 {
230  SCMutexLock(&pool->return_stack.mutex);
231  /* Move all the packets from the locked return stack to the local stack. */
232  pool->head = pool->return_stack.head;
233  pool->return_stack.head = NULL;
234  SCMutexUnlock(&pool->return_stack.mutex);
235 }
236 
237 /** \brief Get a new packet from the packet pool
238  *
239  * Only allocates from the thread's local stack, or mallocs new packets.
240  * If the local stack is empty, first move all the return stack packets to
241  * the local stack.
242  * \retval Packet pointer, or NULL on failure.
243  */
245 {
246  PktPool *pool = GetThreadPacketPool();
247 #ifdef DEBUG_VALIDATION
248  BUG_ON(pool->initialized == 0);
249  BUG_ON(pool->destroyed == 1);
250 #endif /* DEBUG_VALIDATION */
251  if (pool->head) {
252  /* Stack is not empty. */
253  Packet *p = pool->head;
254  pool->head = p->next;
255  p->pool = pool;
256  PACKET_REINIT(p);
257  return p;
258  }
259 
260  /* Local Stack is empty, so check the return stack, which requires
261  * locking. */
262  PacketPoolGetReturnedPackets(pool);
263 
264  /* Try to allocate again. Need to check for not empty again, since the
265  * return stack might have been empty too.
266  */
267  if (pool->head) {
268  /* Stack is not empty. */
269  Packet *p = pool->head;
270  pool->head = p->next;
271  p->pool = pool;
272  PACKET_REINIT(p);
273  return p;
274  }
275 
276  /* Failed to allocate a packet, so return NULL. */
277  /* Optionally, could allocate a new packet here. */
278  return NULL;
279 }
280 
281 /** \brief Return packet to Packet pool
282  *
283  */
285 {
286  PktPool *my_pool = GetThreadPacketPool();
287 
289 
290  PktPool *pool = p->pool;
291  if (pool == NULL) {
292  PacketFree(p);
293  return;
294  }
295 #ifdef DEBUG_VALIDATION
296  BUG_ON(pool->initialized == 0);
297  BUG_ON(pool->destroyed == 1);
298  BUG_ON(my_pool->initialized == 0);
299  BUG_ON(my_pool->destroyed == 1);
300 #endif /* DEBUG_VALIDATION */
301 
302  if (pool == my_pool) {
303  /* Push back onto this thread's own stack, so no locking. */
304  p->next = my_pool->head;
305  my_pool->head = p;
306  } else {
307  PktPool *pending_pool = my_pool->pending_pool;
308  if (pending_pool == NULL) {
309  /* No pending packet, so store the current packet. */
310  p->next = NULL;
311  my_pool->pending_pool = pool;
312  my_pool->pending_head = p;
313  my_pool->pending_tail = p;
314  my_pool->pending_count = 1;
315  } else if (pending_pool == pool) {
316  /* Another packet for the pending pool list. */
317  p->next = my_pool->pending_head;
318  my_pool->pending_head = p;
319  my_pool->pending_count++;
320  if (SC_ATOMIC_GET(pool->return_stack.sync_now) || my_pool->pending_count > max_pending_return_packets) {
321  /* Return the entire list of pending packets. */
322  SCMutexLock(&pool->return_stack.mutex);
323  my_pool->pending_tail->next = pool->return_stack.head;
324  pool->return_stack.head = my_pool->pending_head;
325  SC_ATOMIC_RESET(pool->return_stack.sync_now);
326  SCMutexUnlock(&pool->return_stack.mutex);
327  SCCondSignal(&pool->return_stack.cond);
328  /* Clear the list of pending packets to return. */
329  my_pool->pending_pool = NULL;
330  my_pool->pending_head = NULL;
331  my_pool->pending_tail = NULL;
332  my_pool->pending_count = 0;
333  }
334  } else {
335  /* Push onto return stack for this pool */
336  SCMutexLock(&pool->return_stack.mutex);
337  p->next = pool->return_stack.head;
338  pool->return_stack.head = p;
339  SC_ATOMIC_RESET(pool->return_stack.sync_now);
340  SCMutexUnlock(&pool->return_stack.mutex);
341  SCCondSignal(&pool->return_stack.cond);
342  }
343  }
344 }
345 
347 {
348 #ifndef TLS
349  TmqhPacketPoolInit();
350 #endif
351 
352  PktPool *my_pool = GetThreadPacketPool();
353 
354 #ifdef DEBUG_VALIDATION
355  BUG_ON(my_pool->initialized);
356  my_pool->initialized = 1;
357  my_pool->destroyed = 0;
358 #endif /* DEBUG_VALIDATION */
359 
360  SCMutexInit(&my_pool->return_stack.mutex, NULL);
361  SCCondInit(&my_pool->return_stack.cond, NULL);
362  SC_ATOMIC_INIT(my_pool->return_stack.sync_now);
363 }
364 
365 void PacketPoolInit(void)
366 {
367  extern intmax_t max_pending_packets;
368 
369 #ifndef TLS
370  TmqhPacketPoolInit();
371 #endif
372 
373  PktPool *my_pool = GetThreadPacketPool();
374 
375 #ifdef DEBUG_VALIDATION
376  BUG_ON(my_pool->initialized);
377  my_pool->initialized = 1;
378  my_pool->destroyed = 0;
379 #endif /* DEBUG_VALIDATION */
380 
381  SCMutexInit(&my_pool->return_stack.mutex, NULL);
382  SCCondInit(&my_pool->return_stack.cond, NULL);
383  SC_ATOMIC_INIT(my_pool->return_stack.sync_now);
384 
385  /* pre allocate packets */
386  SCLogDebug("preallocating packets... packet size %" PRIuMAX "",
387  (uintmax_t)SIZE_OF_PACKET);
388  int i = 0;
389  for (i = 0; i < max_pending_packets; i++) {
390  Packet *p = PacketGetFromAlloc();
391  if (unlikely(p == NULL)) {
392  SCLogError(SC_ERR_FATAL, "Fatal error encountered while allocating a packet. Exiting...");
393  exit(EXIT_FAILURE);
394  }
395  PacketPoolStorePacket(p);
396  }
397 
398  //SCLogInfo("preallocated %"PRIiMAX" packets. Total memory %"PRIuMAX"",
399  // max_pending_packets, (uintmax_t)(max_pending_packets*SIZE_OF_PACKET));
400 }
401 
403 {
404  Packet *p = NULL;
405  PktPool *my_pool = GetThreadPacketPool();
406 
407 #ifdef DEBUG_VALIDATION
408  BUG_ON(my_pool->destroyed);
409 #endif /* DEBUG_VALIDATION */
410 
411  if (my_pool && my_pool->pending_pool != NULL) {
412  p = my_pool->pending_head;
413  while (p) {
414  Packet *next_p = p->next;
415  PacketFree(p);
416  p = next_p;
417  my_pool->pending_count--;
418  }
419 #ifdef DEBUG_VALIDATION
420  BUG_ON(my_pool->pending_count);
421 #endif /* DEBUG_VALIDATION */
422  my_pool->pending_pool = NULL;
423  my_pool->pending_head = NULL;
424  my_pool->pending_tail = NULL;
425  }
426 
427  while ((p = PacketPoolGetPacket()) != NULL) {
428  PacketFree(p);
429  }
430 
431  SC_ATOMIC_DESTROY(my_pool->return_stack.sync_now);
432 
433 #ifdef DEBUG_VALIDATION
434  my_pool->initialized = 0;
435  my_pool->destroyed = 1;
436 #endif /* DEBUG_VALIDATION */
437 }
438 
440 {
441  return PacketPoolGetPacket();
442 }
443 
445 {
446  bool proot = false;
447 
448  SCEnter();
449  SCLogDebug("Packet %p, p->root %p, alloced %s", p, p->root, p->flags & PKT_ALLOC ? "true" : "false");
450 
451  if (IS_TUNNEL_PKT(p)) {
452  SCLogDebug("Packet %p is a tunnel packet: %s",
453  p,p->root ? "upper layer" : "tunnel root");
454 
455  /* get a lock to access root packet fields */
456  SCMutex *m = p->root ? &p->root->tunnel_mutex : &p->tunnel_mutex;
457  SCMutexLock(m);
458 
459  if (IS_TUNNEL_ROOT_PKT(p)) {
460  SCLogDebug("IS_TUNNEL_ROOT_PKT == TRUE");
461  const uint16_t outstanding = TUNNEL_PKT_TPR(p) - TUNNEL_PKT_RTV(p);
462  SCLogDebug("root pkt: outstanding %u", outstanding);
463  if (outstanding == 0) {
464  SCLogDebug("no tunnel packets outstanding, no more tunnel "
465  "packet(s) depending on this root");
466  /* if this packet is the root and there are no
467  * more tunnel packets to consider
468  *
469  * return it to the pool */
470  } else {
471  SCLogDebug("tunnel root Packet %p: outstanding > 0, so "
472  "packets are still depending on this root, setting "
473  "SET_TUNNEL_PKT_VERDICTED", p);
474  /* if this is the root and there are more tunnel
475  * packets, return this to the pool. It's still referenced
476  * by the tunnel packets, and we will return it
477  * when we handle them */
479 
481  SCMutexUnlock(m);
482  SCReturn;
483  }
484  } else {
485  SCLogDebug("NOT IS_TUNNEL_ROOT_PKT, so tunnel pkt");
486 
488  const uint16_t outstanding = TUNNEL_PKT_TPR(p) - TUNNEL_PKT_RTV(p);
489  SCLogDebug("tunnel pkt: outstanding %u", outstanding);
490  /* all tunnel packets are processed except us. Root already
491  * processed. So return tunnel pkt and root packet to the
492  * pool. */
493  if (outstanding == 0 &&
495  {
496  SCLogDebug("root verdicted == true && no outstanding");
497 
498  /* handle freeing the root as well*/
499  SCLogDebug("setting proot = 1 for root pkt, p->root %p "
500  "(tunnel packet %p)", p->root, p);
501  proot = true;
502 
503  /* fall through */
504 
505  } else {
506  /* root not ready yet, or not the last tunnel packet,
507  * so get rid of the tunnel pkt only */
508 
509  SCLogDebug("NOT IS_TUNNEL_PKT_VERDICTED (%s) || "
510  "outstanding > 0 (%u)",
511  (p->root && IS_TUNNEL_PKT_VERDICTED(p->root)) ? "true" : "false",
512  outstanding);
513 
514  /* fall through */
515  }
516  }
517  SCMutexUnlock(m);
518 
519  SCLogDebug("tunnel stuff done, move on (proot %d)", proot);
520  }
521 
522  /* we're done with the tunnel root now as well */
523  if (proot == true) {
524  SCLogDebug("getting rid of root pkt... alloc'd %s", p->root->flags & PKT_ALLOC ? "true" : "false");
525 
527  p->root->ReleasePacket(p->root);
528  p->root = NULL;
529  }
530 
532 
534  p->ReleasePacket(p);
535 
536  SCReturn;
537 }
538 
539 /**
540  * \brief Release all the packets in the queue back to the packetpool. Mainly
541  * used by threads that have failed, and wants to return the packets back
542  * to the packetpool.
543  *
544  * \param pq Pointer to the packetqueue from which the packets have to be
545  * returned back to the packetpool
546  *
547  * \warning this function assumes that the pq does not use locking
548  */
550 {
551  Packet *p = NULL;
552 
553  if (pq == NULL)
554  return;
555 
556  while ( (p = PacketDequeue(pq)) != NULL)
557  TmqhOutputPacketpool(NULL, p);
558 
559  return;
560 }
561 
562 /**
563  * \brief Set the max_pending_return_packets value
564  *
565  * Set it to the max pending packets value, devided by the number
566  * of lister threads. Normally, in autofp these are the stream/detect/log
567  * worker threads.
568  *
569  * The max_pending_return_packets value needs to stay below the packet
570  * pool size of the 'producers' (normally pkt capture threads but also
571  * flow timeout injection ) to avoid a deadlock where all the 'workers'
572  * keep packets in their return pools, while the capture thread can't
573  * continue because its pool is empty.
574  */
576 {
577  extern intmax_t max_pending_packets;
578 
580  if (threads == 0)
581  return;
582  if (threads > max_pending_packets)
583  return;
584 
585  uint32_t packets = (max_pending_packets / threads) - 1;
586  if (packets < max_pending_return_packets)
587  max_pending_return_packets = packets;
588 
589  SCLogDebug("detect threads %u, max packets %u, max_pending_return_packets %u",
590  threads, threads, max_pending_return_packets);
591 }
Packet *(* InHandler)(ThreadVars *)
const char * name
#define SCMutex
Tmqh tmqh_table[TMQH_SIZE]
#define SCLogDebug(...)
Definition: util-debug.h:335
#define PACKET_PROFILING_END(p)
#define IS_TUNNEL_ROOT_PKT(p)
Definition: decode.h:887
#define BUG_ON(x)
#define unlikely(expr)
Definition: util-optimize.h:35
void PacketPoolWaitForN(int n)
Wait until we have the requested amount of packets in the pool.
struct Packet_ * next
Definition: decode.h:570
#define PKT_ALLOC
Definition: decode.h:1087
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:107
#define SC_ATOMIC_RESET(name)
Initialize the previously declared atomic variable and it&#39;s lock.
Definition: util-atomic.h:90
#define IS_TUNNEL_PKT(p)
Definition: decode.h:884
void PacketPoolReturnPacket(Packet *p)
Return packet to Packet pool.
void PacketFree(Packet *p)
Return a malloced packet.
Definition: decode.c:101
int max_pending_packets
Definition: suricata.c:213
Packet * pending_tail
#define CLS
#define SCCondWait
#define SCCondInit
void(* ReleasePacket)(struct Packet_ *)
Definition: decode.h:482
#define SCMutexLock(mut)
Packet * PacketPoolGetPacket(void)
Get a new packet from the packet pool.
uint32_t pending_count
#define SC_ATOMIC_DESTROY(name)
Destroy the lock used to protect this variable.
Definition: util-atomic.h:97
struct PktPool_ * pool
Definition: decode.h:599
#define IS_TUNNEL_PKT_VERDICTED(p)
Definition: decode.h:889
void PacketPoolPostRunmodes(void)
Set the max_pending_return_packets value.
#define SC_ATOMIC_INIT(name)
Initialize the previously declared atomic variable and it&#39;s lock.
Definition: util-atomic.h:81
#define SIZE_OF_PACKET
Definition: decode.h:618
#define SCMutexUnlock(mut)
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
#define SCMUTEX_INITIALIZER
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:294
#define SET_TUNNEL_PKT_VERDICTED(p)
Definition: decode.h:890
#define SCEnter(...)
Definition: util-debug.h:337
void TmqhPacketpoolRegister(void)
TmqhPacketpoolRegister .
Packet * pending_head
#define TUNNEL_PKT_TPR(p)
Definition: decode.h:882
Packet * TmqhInputPacketpool(ThreadVars *tv)
void PacketPoolInitEmpty(void)
#define SCMutexInit(mut, mutattrs)
void PacketPoolDestroy(void)
void(* OutHandler)(ThreadVars *, Packet *)
#define MAX_PENDING_RETURN_PACKETS
#define TUNNEL_PKT_RTV(p)
Definition: decode.h:881
void PacketPoolInit(void)
void PacketPoolWait(void)
SCMutex tunnel_mutex
Definition: decode.h:587
PktPoolLockedStack return_stack
#define TUNNEL_INCR_PKT_RTV_NOLOCK(p)
Definition: decode.h:871
#define SCMallocAligned(a, b)
wrapper for allocing aligned mem
Definition: util-mem.h:269
#define SCCondSignal
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2195
SCMutex m
Definition: flow-hash.h:105
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:192
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:167
#define cc_barrier()
Definition: util-optimize.h:43
#define PACKET_REINIT(p)
Recycle a packet structure for reuse.
Definition: decode.h:747
Packet * head
#define TM_FLAG_DETECT_TM
Definition: tm-modules.h:34
#define SCReturn
Definition: util-debug.h:339
Per thread variable structure.
Definition: threadvars.h:57
uint32_t flags
Definition: decode.h:441
#define PACKET_RELEASE_REFS(p)
Definition: decode.h:738
struct PktPool_ * pending_pool
Packet * PacketGetFromAlloc(void)
Get a malloced packet.
Definition: decode.c:140
#define SCFreeAligned(a)
Free aligned memory.
Definition: util-mem.h:294
struct Packet_ * root
Definition: decode.h:577
void TmqhReleasePacketsToPacketPool(PacketQueue *pq)
Release all the packets in the queue back to the packetpool. Mainly used by threads that have failed...