suricata
flow-worker.c
Go to the documentation of this file.
1 /* Copyright (C) 2016-2024 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  *
23  * Flow Workers are single thread modules taking care of (almost)
24  * everything related to packets with flows:
25  *
26  * - Lookup/creation
27  * - Stream tracking, reassembly
28  * - Applayer update
29  * - Detection
30  *
31  * This all while holding the flow lock.
32  */
33 
34 #include "suricata-common.h"
35 #include "suricata.h"
36 
37 #include "action-globals.h"
38 #include "packet.h"
39 #include "decode.h"
40 #include "detect.h"
41 #include "stream-tcp.h"
42 #include "app-layer.h"
43 #include "detect-engine.h"
44 #include "output.h"
45 #include "app-layer-parser.h"
46 #include "app-layer-frames.h"
47 
48 #include "util-profiling.h"
49 #include "util-validate.h"
50 #include "util-time.h"
51 #include "tmqh-packetpool.h"
52 
53 #include "flow-util.h"
54 #include "flow-manager.h"
55 #include "flow-timeout.h"
56 #include "flow-spare-pool.h"
57 #include "flow-worker.h"
58 
60 
61 typedef struct FlowTimeoutCounters {
65 
66 typedef struct FlowWorkerThreadData_ {
68 
69  union {
72  };
73 
75 
76  SC_ATOMIC_DECLARE(bool, flush_ack);
77 
78  void *output_thread; /* Output thread data. */
79  void *output_thread_flow; /* Output thread data. */
80 
85  /** Queue to put pseudo packets that have been created by the stream (RST response) and by the
86  * flush logic following a protocol change. */
89 
90  struct {
96  } cnt;
98 
100 
101 static void FlowWorkerFlowTimeout(
103 
104 /**
105  * \internal
106  * \brief Forces reassembly for flow if it needs it.
107  *
108  * The function requires flow to be locked beforehand.
109  *
110  * \param f Pointer to the flow.
111  *
112  * \retval cnt number of packets injected
113  */
114 static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *detect_thread)
115 {
116  const int server = f->ffr_tc;
117  const int client = f->ffr_ts;
118  int cnt = 0;
119 
120  /* Get the tcp session for the flow */
121  const TcpSession *ssn = (TcpSession *)f->protoctx;
122 
123  /* insert a pseudo packet in the toserver direction */
125  Packet *p = FlowPseudoPacketGet(0, f, ssn);
126  if (p != NULL) {
128  if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NONE) {
130  }
131  FlowWorkerFlowTimeout(tv, p, fw, detect_thread);
133  cnt++;
134  }
135  }
136 
137  /* handle toclient */
139  Packet *p = FlowPseudoPacketGet(1, f, ssn);
140  if (p != NULL) {
143  FlowWorkerFlowTimeout(tv, p, fw, detect_thread);
146  cnt++;
147  }
148  }
149 
150  if (cnt > 0) {
152  }
153  return cnt;
154 }
155 
156 /** \param[in] max_work Max flows to process. 0 if unlimited. */
157 static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowTimeoutCounters *counters,
158  FlowQueuePrivate *fq, const uint32_t max_work)
159 {
160  FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
161  uint32_t i = 0;
162  Flow *f;
163  while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) {
164  FLOWLOCK_WRLOCK(f);
165  f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT; //TODO emerg
166 
167  if (f->proto == IPPROTO_TCP) {
169  !FlowIsBypassed(f) && FlowNeedsReassembly(f) && f->ffr != 0) {
170  /* read detect thread in case we're doing a reload */
171  void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
172  int cnt = FlowFinish(tv, f, fw, detect_thread);
173  counters->flows_aside_pkt_inject += cnt;
174  counters->flows_aside_needs_work++;
175  }
176  }
177 
178  /* no one is referring to this flow, removed from hash
179  * so we can unlock it and pass it to the flow recycler */
180 
181  if (fw->output_thread_flow != NULL)
182  (void)OutputFlowLog(tv, fw->output_thread_flow, f);
183 
184  FlowEndCountersUpdate(tv, &fw->fec, f);
185  if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
187  }
189 
190  FlowClearMemory (f, f->protomap);
191  FLOWLOCK_UNLOCK(f);
192 
193  if (fw->fls.spare_queue.len >= (FLOW_SPARE_POOL_BLOCK_SIZE * 2)) {
194  FlowQueuePrivatePrependFlow(&ret_queue, f);
195  if (ret_queue.len == FLOW_SPARE_POOL_BLOCK_SIZE) {
196  FlowSparePoolReturnFlows(&ret_queue);
197  }
198  } else {
200  }
201 
202  if (max_work != 0 && ++i == max_work)
203  break;
204  }
205  if (ret_queue.len > 0) {
206  FlowSparePoolReturnFlows(&ret_queue);
207  }
208 
209  StatsCounterAddI64(&tv->stats, fw->cnt.flows_removed, (int64_t)i);
210 }
211 
212 /** \brief handle flow for packet
213  *
214  * Handle flow creation/lookup
215  */
216 static inline TmEcode FlowUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
217 {
218  FlowHandlePacketUpdate(p->flow, p, tv, fw->dtv);
219 
220  int state = p->flow->flow_state;
221  switch (state) {
222 #ifdef CAPTURE_OFFLOAD
223  case FLOW_STATE_CAPTURE_BYPASSED: {
226  Flow *f = p->flow;
227  FlowDeReference(&p->flow);
228  FLOWLOCK_UNLOCK(f);
229  return TM_ECODE_DONE;
230  }
231 #endif
235  Flow *f = p->flow;
236  FlowDeReference(&p->flow);
237  FLOWLOCK_UNLOCK(f);
238  return TM_ECODE_DONE;
239  }
240  default:
241  return TM_ECODE_OK;
242  }
243 }
244 
245 static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data);
246 
247 static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void **data)
248 {
249  FlowWorkerThreadData *fw = SCCalloc(1, sizeof(*fw));
250  if (fw == NULL)
251  return TM_ECODE_FAILED;
252 
253  SC_ATOMIC_INITPTR(fw->detect_thread);
254  SC_ATOMIC_SET(fw->detect_thread, NULL);
255 
256  fw->local_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_pkts", &tv->stats);
257  fw->local_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_bytes", &tv->stats);
258  fw->both_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_capture_pkts", &tv->stats);
259  fw->both_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_capture_bytes", &tv->stats);
260 
262  StatsRegisterCounter("flow.wrk.flows_evicted_needs_work", &tv->stats);
264  StatsRegisterCounter("flow.wrk.flows_evicted_pkt_inject", &tv->stats);
265  fw->cnt.flows_removed = StatsRegisterCounter("flow.wrk.flows_evicted", &tv->stats);
266  fw->cnt.flows_injected = StatsRegisterCounter("flow.wrk.flows_injected", &tv->stats);
267  fw->cnt.flows_injected_max = StatsRegisterMaxCounter("flow.wrk.flows_injected_max", &tv->stats);
268 
269  fw->fls.dtv = fw->dtv = DecodeThreadVarsAlloc(tv);
270  if (fw->dtv == NULL) {
271  FlowWorkerThreadDeinit(tv, fw);
272  return TM_ECODE_FAILED;
273  }
274 
275  /* setup TCP */
276  if (StreamTcpThreadInit(tv, NULL, &fw->stream_thread_ptr) != TM_ECODE_OK) {
277  FlowWorkerThreadDeinit(tv, fw);
278  return TM_ECODE_FAILED;
279  }
280 
281  if (DetectEngineEnabled()) {
282  /* setup DETECT */
283  void *detect_thread = NULL;
284  if (DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK) {
285  FlowWorkerThreadDeinit(tv, fw);
286  return TM_ECODE_FAILED;
287  }
288  SC_ATOMIC_SET(fw->detect_thread, detect_thread);
289  }
290 
291  /* Setup outputs for this thread. */
292  if (OutputLoggerThreadInit(tv, initdata, &fw->output_thread) != TM_ECODE_OK) {
293  FlowWorkerThreadDeinit(tv, fw);
294  return TM_ECODE_FAILED;
295  }
297  SCLogError("initializing flow log API for thread failed");
298  FlowWorkerThreadDeinit(tv, fw);
299  return TM_ECODE_FAILED;
300  }
301 
305 
306  /* setup pq for stream end pkts */
307  memset(&fw->pq, 0, sizeof(PacketQueueNoLock));
308  *data = fw;
309  return TM_ECODE_OK;
310 }
311 
312 static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data)
313 {
314  FlowWorkerThreadData *fw = data;
315 
317 
318  /* free TCP */
319  StreamTcpThreadDeinit(tv, (void *)fw->stream_thread);
320 
321  /* free DETECT */
322  void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
323  if (detect_thread != NULL) {
324  DetectEngineThreadCtxDeinit(tv, detect_thread);
325  SC_ATOMIC_SET(fw->detect_thread, NULL);
326  }
327 
328  /* Free output. */
331 
332  /* free pq */
333  BUG_ON(fw->pq.len);
334 
335  Flow *f;
336  while ((f = FlowQueuePrivateGetFromTop(&fw->fls.spare_queue)) != NULL) {
337  FlowFree(f);
338  }
339 
340  SCFree(fw);
341  return TM_ECODE_OK;
342 }
343 
344 TmEcode Detect(ThreadVars *tv, Packet *p, void *data);
346 
347 static inline void UpdateCounters(ThreadVars *tv,
348  FlowWorkerThreadData *fw, const FlowTimeoutCounters *counters)
349 {
350  if (counters->flows_aside_needs_work) {
352  (int64_t)counters->flows_aside_needs_work);
353  }
354  if (counters->flows_aside_pkt_inject) {
356  (int64_t)counters->flows_aside_pkt_inject);
357  }
358 }
359 
360 /** \brief update stream engine
361  *
362  * We can be called from both the flow timeout path as well as from the
363  * "real" traffic path. If in the timeout path any additional packets we
364  * forge for flushing pipelines should not leave our scope. If the original
365  * packet is real (or related to a real packet) we need to push the packets
366  * on, so IPS logic stays valid.
367  */
368 static inline void FlowWorkerStreamTCPUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p,
369  DetectEngineThreadCtx *det_ctx, const bool timeout)
370 {
371  if (det_ctx != NULL && det_ctx->de_ctx->PreStreamHook != NULL) {
372  const uint8_t action = det_ctx->de_ctx->PreStreamHook(tv, det_ctx, p);
373  if (action & ACTION_DROP) {
375  return;
376  }
377  }
378 
380  StreamTcp(tv, p, fw->stream_thread, &fw->pq);
382 
383  // this is the first packet that sets no payload inspection
384  bool setting_nopayload =
385  p->flow->alparser &&
388  if (FlowChangeProto(p->flow) || setting_nopayload) {
389  StreamTcpDetectLogFlush(tv, fw->stream_thread, p->flow, p, &fw->pq);
390  if (setting_nopayload) {
391  FlowSetNoPayloadInspectionFlag(p->flow);
392  }
395  }
396 
397  /* Packets here can safely access p->flow as it's locked */
398  SCLogDebug("packet %" PRIu64 ": extra packets %u", PcapPacketCntGet(p), fw->pq.len);
399  Packet *x;
400  while ((x = PacketDequeueNoLock(&fw->pq))) {
401  SCLogDebug("packet %" PRIu64 " extra packet %p", PcapPacketCntGet(p), x);
402 
403  if (det_ctx != NULL) {
405  Detect(tv, x, det_ctx);
407  }
408 
410 
411  FramesPrune(x->flow, x);
412  /* Release tcp segments. Done here after alerting can use them. */
415  x->flow, x->flowflags & FLOW_PKT_TOSERVER ? STREAM_TOSERVER : STREAM_TOCLIENT);
417 
418  /* no need to keep a flow ref beyond this point */
419  FlowDeReference(&x->flow);
420 
421  /* no further work to do for this pseudo packet, so we can return
422  * it to the pool immediately. */
423  if (timeout) {
425  } else {
426  /* to support IPS verdict logic, in the non-timeout case we need to do a bit more */
428  }
429  }
430  if (FlowChangeProto(p->flow) && p->flow->flags & FLOW_ACTION_DROP) {
431  // in case f->flags & FLOW_ACTION_DROP was set by one of the dequeued packets
433  }
434 }
435 
436 static void FlowWorkerFlowTimeout(
438 {
440 
441  SCLogDebug("packet %" PRIu64 " is TCP. Direction %s", PcapPacketCntGet(p),
442  PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
443  DEBUG_VALIDATE_BUG_ON(!(p->flow && PacketIsTCP(p)));
445 
446  /* handle TCP and app layer */
447  FlowWorkerStreamTCPUpdate(tv, fw, p, det_ctx, true);
448 
450 
451  /* handle Detect */
452  SCLogDebug("packet %" PRIu64 " calling Detect", PcapPacketCntGet(p));
453  if (det_ctx != NULL) {
455  Detect(tv, p, det_ctx);
457  }
458 
459  // Outputs.
461 
462  FramesPrune(p->flow, p);
463 
464  /* Release tcp segments. Done here after alerting can use them. */
467  STREAM_TOSERVER : STREAM_TOCLIENT);
469 
470  /* run tx cleanup last */
472 
473  FlowDeReference(&p->flow);
474  /* flow is unlocked later in FlowFinish() */
475 }
476 
477 /** \internal
478  * \brief process flows injected into our queue by other threads
479  */
480 static inline void FlowWorkerProcessInjectedFlows(
482 {
483  /* take injected flows and append to our work queue */
485  FlowQueuePrivate injected = { NULL, NULL, 0 };
486  if (SC_ATOMIC_GET(tv->flow_queue->non_empty))
488  if (injected.len > 0) {
489  StatsCounterAddI64(&tv->stats, fw->cnt.flows_injected, (int64_t)injected.len);
490  if (p->pkt_src == PKT_SRC_WIRE)
491  StatsCounterMaxUpdateI64(&tv->stats, fw->cnt.flows_injected_max, (int64_t)injected.len);
492 
493  /* move to local queue so we can process over the course of multiple packets */
495  }
497 }
498 
499 /** \internal
500  * \brief process flows set aside locally during flow lookup
501  */
502 static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
503 {
504  uint32_t max_work = 2;
506  max_work = 0;
507 
509  if (fw->fls.work_queue.len) {
510  FlowTimeoutCounters counters = { 0, 0, };
511  CheckWorkQueue(tv, fw, &counters, &fw->fls.work_queue, max_work);
512  UpdateCounters(tv, fw, &counters);
513  }
515 }
516 
517 /** \internal
518  * \brief apply Packet::app_update_direction to the flow flags
519  */
520 static void PacketAppUpdate2FlowFlags(Packet *p)
521 {
522  switch ((enum StreamUpdateDir)p->app_update_direction) {
523  case UPDATE_DIR_NONE: // NONE implies pseudo packet
524  SCLogDebug("pcap_cnt %" PRIu64 ", UPDATE_DIR_NONE", PcapPacketCntGet(p));
525  break;
526  case UPDATE_DIR_PACKET:
527  if (PKT_IS_TOSERVER(p)) {
529  SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", PcapPacketCntGet(p));
530  } else {
532  SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", PcapPacketCntGet(p));
533  }
534  break;
535  case UPDATE_DIR_BOTH:
536  if (PKT_IS_TOSERVER(p)) {
538  SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED|FLOW_TC_APP_UPDATE_NEXT set",
539  PcapPacketCntGet(p));
540  } else {
542  SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED|FLOW_TS_APP_UPDATE_NEXT set",
543  PcapPacketCntGet(p));
544  }
545  /* fall through */
546  case UPDATE_DIR_OPPOSING:
547  if (PKT_IS_TOSERVER(p)) {
549  SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED|FLOW_TS_APP_UPDATE_NEXT set",
550  PcapPacketCntGet(p));
551  } else {
553  SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED|FLOW_TC_APP_UPDATE_NEXT set",
554  PcapPacketCntGet(p));
555  }
556  break;
557  }
558 }
559 
560 static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
561 {
562  FlowWorkerThreadData *fw = data;
563  DetectEngineThreadCtx *det_ctx = SC_ATOMIC_GET(fw->detect_thread);
564 
565  DEBUG_VALIDATE_BUG_ON(p == NULL);
567 
568  SCLogDebug("packet %" PRIu64, PcapPacketCntGet(p));
569 
570  if ((PKT_IS_FLUSHPKT(p))) {
571  SCLogDebug("thread %s flushing", tv->printable_name);
573  /* Ack if a flush was requested */
574  bool notset = false;
575  SC_ATOMIC_CAS(&fw->flush_ack, notset, true);
576  return TM_ECODE_OK;
577  }
578 
579  /* handle Flow */
580  if (det_ctx != NULL && det_ctx->de_ctx->PreFlowHook != NULL) {
581  const uint8_t action = det_ctx->de_ctx->PreFlowHook(tv, det_ctx, p);
582  if (action & ACTION_DROP) {
584  goto pre_flow_drop;
585  }
586  }
587 
588  if (p->flags & PKT_WANTS_FLOW) {
590 
591  FlowHandlePacket(tv, &fw->fls, p);
592  if (likely(p->flow != NULL)) {
594  if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) {
595  /* update time */
596  if (!(PKT_IS_PSEUDOPKT(p))) {
597  TimeSetByThread(tv->id, p->ts);
598  }
599  goto housekeeping;
600  }
601  }
602  /* Flow is now LOCKED */
603 
605 
606  /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
607  * pseudo packet created by the flow manager. */
608  } else if (p->flags & PKT_HAS_FLOW) {
609  FLOWLOCK_WRLOCK(p->flow);
611  }
612 
613  /* update time */
614  if (!(PKT_IS_PSEUDOPKT(p))) {
615  TimeSetByThread(tv->id, p->ts);
616  }
617 
618  SCLogDebug("packet %" PRIu64 " has flow? %s", PcapPacketCntGet(p), p->flow ? "yes" : "no");
619 
620  /* handle TCP and app layer */
621  if (p->flow) {
622  SCLogDebug("packet %" PRIu64
623  ": direction %s FLOW_TS_APP_UPDATE_NEXT %s FLOW_TC_APP_UPDATE_NEXT %s",
624  PcapPacketCntGet(p), PKT_IS_TOSERVER(p) ? "toserver" : "toclient",
627  /* see if need to consider flags set by prev packets */
631  SCLogDebug("FLOW_TS_APP_UPDATED");
632  } else if (PKT_IS_TOCLIENT(p) && (p->flow->flags & FLOW_TC_APP_UPDATE_NEXT)) {
635  SCLogDebug("FLOW_TC_APP_UPDATED");
636  }
637 
638  if (PacketIsTCP(p)) {
639  SCLogDebug("packet %" PRIu64 " is TCP. Direction %s", PcapPacketCntGet(p),
640  PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
642 
643  /* if detect is disabled, we need to apply file flags to the flow
644  * here on the first packet. */
645  if (det_ctx == NULL &&
649  }
650 
651  FlowWorkerStreamTCPUpdate(tv, fw, p, det_ctx, false);
652  PacketAppUpdate2FlowFlags(p);
653 
654  /* handle the app layer part of the UDP packet payload */
655  } else if (p->proto == IPPROTO_UDP && !PacketCheckAction(p, ACTION_DROP)) {
659  PacketAppUpdate2FlowFlags(p);
660  }
661  }
662 
664 
665  /* handle Detect */
667  SCLogDebug("packet %" PRIu64 " calling Detect", PcapPacketCntGet(p));
668  if (det_ctx != NULL) {
670  Detect(tv, p, det_ctx);
672  }
673 
674 pre_flow_drop:
675  // Outputs.
677 
678  /* Release tcp segments. Done here after alerting can use them. */
679  if (p->flow != NULL) {
681 
682  if (FlowIsBypassed(p->flow)) {
684  if (p->proto == IPPROTO_TCP) {
686  }
687  } else if (p->proto == IPPROTO_TCP && p->flow->protoctx && p->flags & PKT_STREAM_EST) {
688  if ((p->flow->flags & FLOW_TS_APP_UPDATED) && PKT_IS_TOSERVER(p)) {
689  FramesPrune(p->flow, p);
690  } else if ((p->flow->flags & FLOW_TC_APP_UPDATED) && PKT_IS_TOCLIENT(p)) {
691  FramesPrune(p->flow, p);
692  }
695  STREAM_TOSERVER : STREAM_TOCLIENT);
697  } else if (p->proto == IPPROTO_UDP) {
698  FramesPrune(p->flow, p);
699  }
700 
701  if ((PKT_IS_PSEUDOPKT(p)) ||
703  if ((p->flags & PKT_STREAM_EST) || p->proto != IPPROTO_TCP) {
704  if (PKT_IS_TOSERVER(p)) {
705  if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TS_APP_UPDATED))) {
706  AppLayerParserTransactionsCleanup(p->flow, STREAM_TOSERVER);
708  SCLogDebug("~FLOW_TS_APP_UPDATED");
709  }
710  } else {
711  if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TC_APP_UPDATED))) {
712  AppLayerParserTransactionsCleanup(p->flow, STREAM_TOCLIENT);
714  SCLogDebug("~FLOW_TC_APP_UPDATED");
715  }
716  }
717  }
718  } else {
719  SCLogDebug("not pseudo, no app update: skip");
720  }
721 
722  if (p->flow->flags & FLOW_ACTION_DROP) {
723  SCLogDebug("flow drop in place: remove app update flags");
725  }
726 
727  Flow *f = p->flow;
728  FlowDeReference(&p->flow);
729  FLOWLOCK_UNLOCK(f);
730  }
731 
732 housekeeping:
733 
734  /* take injected flows and add them to our local queue */
735  FlowWorkerProcessInjectedFlows(tv, fw, p);
736 
737  /* process local work queue */
738  FlowWorkerProcessLocalFlows(tv, fw, p);
739 
740  return TM_ECODE_OK;
741 }
742 
743 void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
744 {
745  FlowWorkerThreadData *fw = flow_worker;
746 
747  SC_ATOMIC_SET(fw->detect_thread, detect_ctx);
748 }
749 
750 void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
751 {
752  FlowWorkerThreadData *fw = flow_worker;
753 
754  return SC_ATOMIC_GET(fw->detect_thread);
755 }
756 
757 void *FlowWorkerGetThreadData(void *flow_worker)
758 {
759  return (FlowWorkerThreadData *)flow_worker;
760 }
761 
762 bool FlowWorkerGetFlushAck(void *flow_worker)
763 {
764  FlowWorkerThreadData *fw = flow_worker;
765  return SC_ATOMIC_GET(fw->flush_ack) == true;
766 }
767 
768 void FlowWorkerSetFlushAck(void *flow_worker)
769 {
770  FlowWorkerThreadData *fw = flow_worker;
771  SC_ATOMIC_SET(fw->flush_ack, false);
772 }
773 
775 {
776  switch (fwi) {
778  return "flow";
780  return "stream";
782  return "app-layer";
784  return "detect";
786  return "tcp-prune";
788  return "flow-inject";
790  return "flow-evict";
792  return "size";
793  }
794  return "error";
795 }
796 
797 static bool FlowWorkerIsBusy(ThreadVars *tv, void *flow_worker)
798 {
799  FlowWorkerThreadData *fw = flow_worker;
800  if (fw->pq.len)
801  return true;
802  if (fw->fls.work_queue.len)
803  return true;
804 
805  if (tv->flow_queue) {
807  bool fq_done = (tv->flow_queue->qlen == 0);
809  if (!fq_done) {
810  return true;
811  }
812  }
813 
814  return false;
815 }
816 
818 {
819  tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
820  tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
821  tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
822  tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy;
823  tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
826 }
PKT_IS_TOCLIENT
#define PKT_IS_TOCLIENT(p)
Definition: decode.h:239
TmModule_::cap_flags
uint8_t cap_flags
Definition: tm-modules.h:77
PacketCheckAction
bool PacketCheckAction(const Packet *p, const uint8_t a)
Definition: packet.c:50
UPDATE_DIR_BOTH
@ UPDATE_DIR_BOTH
Definition: stream-tcp-reassemble.h:58
Flow_::ffr_tc
uint8_t ffr_tc
Definition: flow.h:379
FlowLookupStruct_::work_queue
FlowQueuePrivate work_queue
Definition: flow.h:537
UPDATE_DIR_PACKET
@ UPDATE_DIR_PACKET
Definition: stream-tcp-reassemble.h:56
ThreadVars_::flow_queue
struct FlowQueue_ * flow_queue
Definition: threadvars.h:131
OutputFlowLog
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition: output-flow.c:87
FLOW_TC_APP_UPDATE_NEXT
#define FLOW_TC_APP_UPDATE_NEXT
Definition: flow.h:56
PKT_DROP_REASON_STREAM_PRE_HOOK
@ PKT_DROP_REASON_STREAM_PRE_HOOK
Definition: decode.h:400
Packet_::proto
uint8_t proto
Definition: decode.h:523
StatsCounterMaxUpdateI64
void StatsCounterMaxUpdateI64(StatsThreadContext *stats, StatsCounterMaxId id, int64_t x)
update the value of the localmax counter
Definition: counters.c:225
SCAppLayerParserStateIssetFlag
uint16_t SCAppLayerParserStateIssetFlag(AppLayerParserState *pstate, uint16_t flag)
Definition: app-layer-parser.c:1840
FlowCleanupAppLayer
void FlowCleanupAppLayer(Flow *f)
Definition: flow.c:140
FlowTimeoutCounters
Definition: flow-worker.c:61
detect-engine.h
Flow_::ffr_ts
uint8_t ffr_ts
Definition: flow.h:378
PROFILE_FLOWWORKER_DETECT
@ PROFILE_FLOWWORKER_DETECT
Definition: flow-worker.h:25
OutputFlowLogThreadInit
TmEcode OutputFlowLogThreadInit(ThreadVars *tv, void **data)
thread init for the flow logger This will run the thread init functions for the individual registered...
Definition: output-flow.c:123
PKT_HAS_FLOW
#define PKT_HAS_FLOW
Definition: decode.h:1267
PROFILE_FLOWWORKER_FLOW_INJECTED
@ PROFILE_FLOWWORKER_FLOW_INJECTED
Definition: flow-worker.h:27
flow-util.h
FlowWorkerThreadData
struct FlowWorkerThreadData_ FlowWorkerThreadData
FlowLookupStruct_::dtv
DecodeThreadVars * dtv
Definition: flow.h:536
PKT_IS_PSEUDOPKT
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition: decode.h:1322
STREAM_HAS_UNPROCESSED_SEGMENTS_NONE
@ STREAM_HAS_UNPROCESSED_SEGMENTS_NONE
Definition: stream-tcp.h:189
stream-tcp.h
PacketPoolReturnPacket
void PacketPoolReturnPacket(Packet *p)
Return packet to Packet pool.
Definition: tmqh-packetpool.c:168
FLOW_PKT_LAST_PSEUDO
#define FLOW_PKT_LAST_PSEUDO
Definition: flow.h:231
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
AppLayerParserTransactionsCleanup
void AppLayerParserTransactionsCleanup(Flow *f, const uint8_t pkt_dir)
remove obsolete (inspected and logged) transactions
Definition: app-layer-parser.c:896
PROFILE_FLOWWORKER_FLOW_EVICTED
@ PROFILE_FLOWWORKER_FLOW_EVICTED
Definition: flow-worker.h:28
ProfileFlowWorkerIdToString
const char * ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
Definition: flow-worker.c:774
DetectEngineCtx_::PreFlowHook
DetectPacketHookFunc PreFlowHook
Definition: detect.h:1159
PcapPacketCntGet
uint64_t PcapPacketCntGet(const Packet *p)
Definition: decode.c:1104
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:282
PKT_SRC_SHUTDOWN_FLUSH
@ PKT_SRC_SHUTDOWN_FLUSH
Definition: decode.h:64
StatsRegisterCounter
StatsCounterId StatsRegisterCounter(const char *name, StatsThreadContext *stats)
Registers a normal, unqualified counter.
Definition: counters.c:1039
StreamTcpThread_
Definition: stream-tcp.h:92
PROFILE_FLOWWORKER_TCPPRUNE
@ PROFILE_FLOWWORKER_TCPPRUNE
Definition: flow-worker.h:26
Flow_::proto
uint8_t proto
Definition: flow.h:369
TM_ECODE_DONE
@ TM_ECODE_DONE
Definition: tm-threads-common.h:83
PKT_SRC_CAPTURE_TIMEOUT
@ PKT_SRC_CAPTURE_TIMEOUT
Definition: decode.h:62
action-globals.h
TcpReassemblyThreadCtx_::app_tctx
void * app_tctx
Definition: stream-tcp-reassemble.h:62
Packet_::flags
uint32_t flags
Definition: decode.h:544
DetectEngineCtx_::PreStreamHook
DetectPacketHookFunc PreStreamHook
Definition: detect.h:1154
SCAppLayerParserStateSetFlag
void SCAppLayerParserStateSetFlag(AppLayerParserState *pstate, uint16_t flag)
Definition: app-layer-parser.c:1832
Flow_
Flow data structure.
Definition: flow.h:347
Flow_::protomap
uint8_t protomap
Definition: flow.h:436
FLOW_TC_APP_UPDATED
#define FLOW_TC_APP_UPDATED
Definition: flow.h:119
PKT_WANTS_FLOW
#define PKT_WANTS_FLOW
Definition: decode.h:1297
PKT_DROP_REASON_FLOW_PRE_HOOK
@ PKT_DROP_REASON_FLOW_PRE_HOOK
Definition: decode.h:401
FlowTimeoutCounters::flows_aside_pkt_inject
uint32_t flows_aside_pkt_inject
Definition: flow-worker.c:63
AppLayerRegisterThreadCounters
void AppLayerRegisterThreadCounters(ThreadVars *tv)
Registers per flow counters for all protocols.
Definition: app-layer.c:1312
FlowLookupStruct_
Definition: flow.h:533
FlowWorkerThreadData_::local_bypass_pkts
StatsCounterId local_bypass_pkts
Definition: flow-worker.c:81
FlowWorkerThreadData_::dtv
DecodeThreadVars * dtv
Definition: flow-worker.c:67
StreamTcpThreadInit
TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
Definition: stream-tcp.c:6147
SC_ATOMIC_CAS
#define SC_ATOMIC_CAS(name, cmpval, newval)
atomic Compare and Switch
Definition: util-atomic.h:367
FLOW_PKT_TOSERVER
#define FLOW_PKT_TOSERVER
Definition: flow.h:224
FlowPseudoPacketGet
Packet * FlowPseudoPacketGet(int direction, Flow *f, const TcpSession *ssn)
Definition: flow-timeout.c:265
FlowHandlePacket
void FlowHandlePacket(ThreadVars *tv, FlowLookupStruct *fls, Packet *p)
Entry point for packet flow handling.
Definition: flow.c:557
FLOW_ACTION_DROP
#define FLOW_ACTION_DROP
Definition: flow.h:69
StatsCounterId
Definition: counters.h:30
FlowQueuePrivatePrependFlow
void FlowQueuePrivatePrependFlow(FlowQueuePrivate *fqc, Flow *f)
Definition: flow-queue.c:78
PKT_NOPAYLOAD_INSPECTION
#define PKT_NOPAYLOAD_INSPECTION
Definition: decode.h:1253
FlowWorkerThreadData_::stream_thread
StreamTcpThread * stream_thread
Definition: flow-worker.c:70
TmModule_::ThreadBusy
bool(* ThreadBusy)(ThreadVars *tv, void *thread_data)
Definition: tm-modules.h:67
PKT_IS_FLUSHPKT
#define PKT_IS_FLUSHPKT(p)
Definition: decode.h:1324
FlowWorkerThreadData_::flows_removed
StatsCounterId flows_removed
Definition: flow-worker.c:93
Packet_::flowflags
uint8_t flowflags
Definition: decode.h:532
TmqhOutputPacketpool
void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
Definition: tmqh-packetpool.c:305
PROFILE_FLOWWORKER_STREAM
@ PROFILE_FLOWWORKER_STREAM
Definition: flow-worker.h:23
APP_LAYER_PARSER_EOF_TS
#define APP_LAYER_PARSER_EOF_TS
Definition: app-layer-parser.h:53
TM_ECODE_FAILED
@ TM_ECODE_FAILED
Definition: tm-threads-common.h:82
FlowQueuePrivate_::len
uint32_t len
Definition: flow-queue.h:43
Flow_::protoctx
void * protoctx
Definition: flow.h:432
OutputLoggerThreadDeinit
TmEcode OutputLoggerThreadDeinit(ThreadVars *tv, void *thread_data)
Definition: output.c:847
DisableDetectFlowFileFlags
void DisableDetectFlowFileFlags(Flow *f)
disable file features we don't need Called if we have no detection engine.
Definition: detect.c:2410
PacketQueueNoLock_
simple fifo queue for packets
Definition: packet-queue.h:34
tmqh-packetpool.h
StatsCounterDecr
void StatsCounterDecr(StatsThreadContext *stats, StatsCounterId id)
Decrements the local counter.
Definition: counters.c:185
FLOW_TIMEOUT_REASSEMBLY_DONE
#define FLOW_TIMEOUT_REASSEMBLY_DONE
Definition: flow.h:96
FLOWWORKER_PROFILING_START
#define FLOWWORKER_PROFILING_START(p, id)
Definition: util-profiling.h:147
FlowSparePoolReturnFlows
void FlowSparePoolReturnFlows(FlowQueuePrivate *fqp)
Definition: flow-spare-pool.c:120
FLOWLOCK_UNLOCK
#define FLOWLOCK_UNLOCK(fb)
Definition: flow.h:264
TM_ECODE_OK
@ TM_ECODE_OK
Definition: tm-threads-common.h:81
Flow_::flow_state
FlowStateType flow_state
Definition: flow.h:403
FQLOCK_LOCK
#define FQLOCK_LOCK(q)
Definition: flow-queue.h:72
FlowWorkerThreadData_::both_bypass_pkts
StatsCounterId both_bypass_pkts
Definition: flow-worker.c:83
TmModule_::ThreadDeinit
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:53
PKT_SET_SRC
#define PKT_SET_SRC(p, src_val)
Definition: decode.h:1326
flow-spare-pool.h
Flow_::alparser
AppLayerParserState * alparser
Definition: flow.h:469
DecodeRegisterPerfCounters
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:632
DetectEngineThreadCtxPtr
DetectEngineThreadCtx * DetectEngineThreadCtxPtr
Definition: flow-worker.c:59
STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION
@ STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION
Definition: stream-tcp.h:192
FLOW_STATE_LOCAL_BYPASSED
@ FLOW_STATE_LOCAL_BYPASSED
Definition: flow.h:498
decode.h
PKT_SRC_WIRE
@ PKT_SRC_WIRE
Definition: decode.h:52
FlowWorkerThreadData_::flows_aside_needs_work
StatsCounterId flows_aside_needs_work
Definition: flow-worker.c:94
PKT_IS_TOSERVER
#define PKT_IS_TOSERVER(p)
Definition: decode.h:238
DetectEngineThreadCtx_
Definition: detect.h:1245
Packet_::ts
SCTime_t ts
Definition: decode.h:555
StatsCounterMaxId
Definition: counters.h:38
PacketDequeueNoLock
Packet * PacketDequeueNoLock(PacketQueueNoLock *qnl)
Definition: packet-queue.c:208
ProfileFlowWorkerId
ProfileFlowWorkerId
Definition: flow-worker.h:21
FlowTimeoutCounters
struct FlowTimeoutCounters FlowTimeoutCounters
FlowHandlePacketUpdate
void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars *dtv)
Update Packet and Flow.
Definition: flow.c:424
flow-worker.h
BOOL2STR
#define BOOL2STR(b)
Definition: util-debug.h:542
FLOWLOCK_WRLOCK
#define FLOWLOCK_WRLOCK(fb)
Definition: flow.h:261
STREAM_FLAGS_FOR_PACKET
#define STREAM_FLAGS_FOR_PACKET(p)
Definition: stream.h:30
FlowWorkerThreadData_::flows_injected_max
StatsCounterMaxId flows_injected_max
Definition: flow-worker.c:92
StreamTcp
TmEcode StreamTcp(ThreadVars *, Packet *, void *, PacketQueueNoLock *pq)
Definition: stream-tcp.c:6106
detect.h
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
DetectEngineEnabled
int DetectEngineEnabled(void)
Check if detection is enabled.
Definition: detect-engine.c:3855
DecodeThreadVars_::counter_flow_active
StatsCounterId counter_flow_active
Definition: decode.h:1032
Flow_::flow_end_flags
uint8_t flow_end_flags
Definition: flow.h:438
DetectEngineThreadCtxInit
TmEcode DetectEngineThreadCtxInit(ThreadVars *tv, void *initdata, void **data)
initialize thread specific detection engine context
Definition: detect-engine.c:3414
TmModule_::Func
TmEcode(* Func)(ThreadVars *, Packet *, void *)
Definition: tm-modules.h:56
FLOW_PKT_TOCLIENT_FIRST
#define FLOW_PKT_TOCLIENT_FIRST
Definition: flow.h:228
PROFILE_FLOWWORKER_APPLAYERUDP
@ PROFILE_FLOWWORKER_APPLAYERUDP
Definition: flow-worker.h:24
util-time.h
FlowQueuePrivateGetFromTop
Flow * FlowQueuePrivateGetFromTop(FlowQueuePrivate *fqc)
Definition: flow-queue.c:151
FLOWWORKER_PROFILING_END
#define FLOWWORKER_PROFILING_END(p, id)
Definition: util-profiling.h:154
StreamTcpPruneSession
void StreamTcpPruneSession(Flow *f, uint8_t flags)
Remove idle TcpSegments from TcpSession.
Definition: stream-tcp-list.c:886
OutputLoggerThreadInit
TmEcode OutputLoggerThreadInit(ThreadVars *tv, const void *initdata, void **data)
Definition: output.c:816
app-layer-parser.h
ThreadVars_::id
int id
Definition: threadvars.h:86
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:317
util-profiling.h
Packet_
Definition: decode.h:501
FlowWorkerGetFlushAck
bool FlowWorkerGetFlushAck(void *flow_worker)
Definition: flow-worker.c:762
tmm_modules
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.c:29
GET_PKT_LEN
#define GET_PKT_LEN(p)
Definition: decode.h:208
APP_LAYER_PARSER_EOF_TC
#define APP_LAYER_PARSER_EOF_TC
Definition: app-layer-parser.h:54
AppLayerHandleUdp
int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow *f)
Handle a app layer UDP message.
Definition: app-layer.c:875
DEBUG_ASSERT_FLOW_LOCKED
#define DEBUG_ASSERT_FLOW_LOCKED(f)
Definition: util-validate.h:99
TmEcode
TmEcode
Definition: tm-threads-common.h:80
FlowClearMemory
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition: flow.c:1121
TmModuleFlowWorkerRegister
void TmModuleFlowWorkerRegister(void)
Definition: flow-worker.c:817
FlowQueuePrivateAppendPrivate
void FlowQueuePrivateAppendPrivate(FlowQueuePrivate *dest, FlowQueuePrivate *src)
Definition: flow-queue.c:88
flow-timeout.h
TimeSetByThread
void TimeSetByThread(const int thread_id, SCTime_t tv)
Definition: util-time.c:116
FlowWorkerThreadData_::both_bypass_bytes
StatsCounterId both_bypass_bytes
Definition: flow-worker.c:84
TmModule_::name
const char * name
Definition: tm-modules.h:48
APP_LAYER_PARSER_NO_INSPECTION
#define APP_LAYER_PARSER_NO_INSPECTION
Definition: app-layer-parser.h:49
UPDATE_DIR_OPPOSING
@ UPDATE_DIR_OPPOSING
Definition: stream-tcp-reassemble.h:57
FlowLookupStruct_::spare_queue
FlowQueuePrivate spare_queue
Definition: flow.h:535
TMM_FLOWWORKER
@ TMM_FLOWWORKER
Definition: tm-threads-common.h:34
Flow_::ffr
uint8_t ffr
Definition: flow.h:381
FlowWorkerGetDetectCtxPtr
void * FlowWorkerGetDetectCtxPtr(void *flow_worker)
Definition: flow-worker.c:750
FlowWorkerThreadData_::stream_thread_ptr
void * stream_thread_ptr
Definition: flow-worker.c:71
FlowQueueExtractPrivate
FlowQueuePrivate FlowQueueExtractPrivate(FlowQueue *fq)
Definition: flow-queue.c:140
FlowWorkerThreadData_::pq
PacketQueueNoLock pq
Definition: flow-worker.c:87
cnt
uint32_t cnt
Definition: tmqh-packetpool.h:7
app-layer-frames.h
Packet_::flow
struct Flow_ * flow
Definition: decode.h:546
DecodeThreadVarsFree
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:840
flow-manager.h
suricata-common.h
PKT_SRC_FFR
@ PKT_SRC_FFR
Definition: decode.h:58
FlowFree
void FlowFree(Flow *f)
cleanup & free the memory of a flow
Definition: flow-util.c:84
FlowWorkerGetThreadData
void * FlowWorkerGetThreadData(void *flow_worker)
Definition: flow-worker.c:757
PROFILE_FLOWWORKER_SIZE
@ PROFILE_FLOWWORKER_SIZE
Definition: flow-worker.h:29
packet.h
ACTION_DROP
#define ACTION_DROP
Definition: action-globals.h:30
DetectEngineThreadCtxDeinit
TmEcode DetectEngineThreadCtxDeinit(ThreadVars *tv, void *data)
Definition: detect-engine.c:3651
Packet_::app_update_direction
uint8_t app_update_direction
Definition: decode.h:535
FlowWorkerSetFlushAck
void FlowWorkerSetFlushAck(void *flow_worker)
Definition: flow-worker.c:768
FLOW_TS_APP_UPDATED
#define FLOW_TS_APP_UPDATED
Definition: flow.h:118
TmModule_::ThreadInit
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:51
PacketUpdateEngineEventCounters
void PacketUpdateEngineEventCounters(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Definition: decode.c:243
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:66
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:33
Detect
TmEcode Detect(ThreadVars *tv, Packet *p, void *data)
Detection engine thread wrapper.
Definition: detect.c:2341
util-validate.h
FlowQueuePrivate_
Definition: flow-queue.h:40
StreamTcpDetectLogFlush
void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueueNoLock *pq)
create packets in both directions to flush out logging and detection before switching protocols....
Definition: stream-tcp.c:7094
PacketQueueNoLock_::len
uint32_t len
Definition: packet-queue.h:37
FlowWorkerThreadData_::output_thread_flow
void * output_thread_flow
Definition: flow-worker.c:79
FlowWorkerThreadData_::flows_aside_pkt_inject
StatsCounterId flows_aside_pkt_inject
Definition: flow-worker.c:95
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:274
OutputFlowLogThreadDeinit
TmEcode OutputFlowLogThreadDeinit(ThreadVars *tv, void *thread_data)
Definition: output-flow.c:163
FramesPrune
void FramesPrune(Flow *f, Packet *p)
Definition: app-layer-frames.c:836
OutputLoggerLog
TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data)
Definition: output.c:802
SCFree
#define SCFree(p)
Definition: util-mem.h:61
Packet_::pkt_src
uint8_t pkt_src
Definition: decode.h:611
DecodeThreadVars_
Structure to hold thread specific data for all decode modules.
Definition: decode.h:959
OutputLoggerFlush
TmEcode OutputLoggerFlush(ThreadVars *tv, Packet *p, void *thread_data)
Definition: output.c:787
Flow_::flags
uint32_t flags
Definition: flow.h:412
StreamTcpSessionCleanup
void StreamTcpSessionCleanup(TcpSession *ssn)
Session cleanup function. Does not free the ssn.
Definition: stream-tcp.c:335
FlowWorkerThreadData_::fls
FlowLookupStruct fls
Definition: flow-worker.c:88
StatsRegisterMaxCounter
StatsCounterMaxId StatsRegisterMaxCounter(const char *name, StatsThreadContext *stats)
Registers a counter, whose value holds the maximum of all the values assigned to it.
Definition: counters.c:1077
SC_ATOMIC_INITPTR
#define SC_ATOMIC_INITPTR(name)
Definition: util-atomic.h:317
DecodeThreadVars_::counter_tcp_active_sessions
StatsCounterId counter_tcp_active_sessions
Definition: decode.h:1030
UPDATE_DIR_NONE
@ UPDATE_DIR_NONE
Definition: stream-tcp-reassemble.h:55
DecodeThreadVarsAlloc
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:822
FlowWorkerThreadData_::cnt
struct FlowWorkerThreadData_::@119 cnt
PROFILE_FLOWWORKER_FLOW
@ PROFILE_FLOWWORKER_FLOW
Definition: flow-worker.h:22
StreamTcpThread_::ra_ctx
TcpReassemblyThreadCtx * ra_ctx
Definition: stream-tcp.h:117
PacketDrop
void PacketDrop(Packet *p, const uint8_t action, enum PacketDropReason r)
issue drop action
Definition: packet.c:34
DetectEngineThreadCtx_::de_ctx
DetectEngineCtx * de_ctx
Definition: detect.h:1363
suricata.h
StreamUpdateDir
StreamUpdateDir
Definition: stream-tcp-reassemble.h:54
FlowWorkerThreadData_::flows_injected
StatsCounterId flows_injected
Definition: flow-worker.c:91
FLOW_END_FLAG_TIMEOUT
#define FLOW_END_FLAG_TIMEOUT
Definition: flow.h:234
FlowWorkerThreadData_::fec
FlowEndCounters fec
Definition: flow-worker.c:97
likely
#define likely(expr)
Definition: util-optimize.h:32
FlowNeedsReassembly
bool FlowNeedsReassembly(Flow *f)
Check if a flow needs forced reassembly, or any other processing.
Definition: flow-timeout.c:286
FlowWorkerThreadData_::output_thread
void * output_thread
Definition: flow-worker.c:78
FlowChangeProto
int FlowChangeProto(Flow *f)
Check if change proto flag is set for flow.
Definition: flow.c:197
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
TcpSession_
Definition: stream-tcp-private.h:283
FlowEndCountersRegister
void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
Definition: flow-util.c:246
FlowWorkerThreadData_::SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread)
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
ThreadVars_::stats
StatsThreadContext stats
Definition: threadvars.h:121
FlowEndCounters_
Definition: flow-util.h:146
FLOW_SPARE_POOL_BLOCK_SIZE
#define FLOW_SPARE_POOL_BLOCK_SIZE
Definition: flow-spare-pool.h:30
StatsCounterAddI64
void StatsCounterAddI64(StatsThreadContext *stats, StatsCounterId id, int64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:147
FLOW_PKT_TOSERVER_FIRST
#define FLOW_PKT_TOSERVER_FIRST
Definition: flow.h:227
FlowWorkerThreadData_::local_bypass_bytes
StatsCounterId local_bypass_bytes
Definition: flow-worker.c:82
DEBUG_VALIDATE_BUG_ON
#define DEBUG_VALIDATE_BUG_ON(exp)
Definition: util-validate.h:102
PKT_DROP_REASON_FLOW_DROP
@ PKT_DROP_REASON_FLOW_DROP
Definition: decode.h:386
FlowWorkerThreadData_::SC_ATOMIC_DECLARE
SC_ATOMIC_DECLARE(bool, flush_ack)
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:80
FlowWorkerThreadData_
Definition: flow-worker.c:66
FLOW_TS_APP_UPDATE_NEXT
#define FLOW_TS_APP_UPDATE_NEXT
Definition: flow.h:122
StreamTcpThreadDeinit
TmEcode StreamTcpThreadDeinit(ThreadVars *tv, void *data)
Definition: stream-tcp.c:6240
output.h
PKT_STREAM_EST
#define PKT_STREAM_EST
Definition: decode.h:1263
TM_FLAG_FLOWWORKER_TM
#define TM_FLAG_FLOWWORKER_TM
Definition: tm-modules.h:34
FlowWorkerReplaceDetectCtx
void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
Definition: flow-worker.c:743
app-layer.h
FlowTimeoutCounters::flows_aside_needs_work
uint32_t flows_aside_needs_work
Definition: flow-worker.c:62
FQLOCK_UNLOCK
#define FQLOCK_UNLOCK(q)
Definition: flow-queue.h:74