suricata
flow-worker.c
Go to the documentation of this file.
1 /* Copyright (C) 2016 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  *
23  * Flow Workers are single thread modules taking care of (almost)
24  * everything related to packets with flows:
25  *
26  * - Lookup/creation
27  * - Stream tracking, reassembly
28  * - Applayer update
29  * - Detection
30  *
31  * This all while holding the flow lock.
32  */
33 
34 #include "suricata-common.h"
35 #include "suricata.h"
36 
37 #include "decode.h"
38 #include "stream-tcp.h"
39 #include "app-layer.h"
40 #include "detect-engine.h"
41 #include "output.h"
42 #include "app-layer-parser.h"
43 
44 #include "util-validate.h"
45 
46 #include "flow-util.h"
47 
49 
50 typedef struct FlowWorkerThreadData_ {
52 
53  union {
56  };
57 
59 
60  void *output_thread; /* Output thread data. */
61 
64  uint16_t both_bypass_pkts;
66 
68 
70 
71 /** \brief handle flow for packet
72  *
73  * Handle flow creation/lookup
74  */
75 static inline TmEcode FlowUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
76 {
78 
79  int state = SC_ATOMIC_GET(p->flow->flow_state);
80  switch (state) {
81 #ifdef CAPTURE_OFFLOAD
82  case FLOW_STATE_CAPTURE_BYPASSED:
83  StatsAddUI64(tv, fw->both_bypass_pkts, 1);
85  return TM_ECODE_DONE;
86 #endif
88  StatsAddUI64(tv, fw->local_bypass_pkts, 1);
90  return TM_ECODE_DONE;
91  default:
92  return TM_ECODE_OK;
93  }
94 }
95 
96 static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data);
97 
98 static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void **data)
99 {
100  FlowWorkerThreadData *fw = SCCalloc(1, sizeof(*fw));
101  if (fw == NULL)
102  return TM_ECODE_FAILED;
103 
104  SC_ATOMIC_INIT(fw->detect_thread);
105  SC_ATOMIC_SET(fw->detect_thread, NULL);
106 
107  fw->local_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_pkts", tv);
108  fw->local_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_bytes", tv);
109  fw->both_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_capture_pkts", tv);
110  fw->both_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_capture_bytes", tv);
111 
112  fw->dtv = DecodeThreadVarsAlloc(tv);
113  if (fw->dtv == NULL) {
114  FlowWorkerThreadDeinit(tv, fw);
115  return TM_ECODE_FAILED;
116  }
117 
118  /* setup TCP */
119  if (StreamTcpThreadInit(tv, NULL, &fw->stream_thread_ptr) != TM_ECODE_OK) {
120  FlowWorkerThreadDeinit(tv, fw);
121  return TM_ECODE_FAILED;
122  }
123 
124  if (DetectEngineEnabled()) {
125  /* setup DETECT */
126  void *detect_thread = NULL;
127  if (DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK) {
128  FlowWorkerThreadDeinit(tv, fw);
129  return TM_ECODE_FAILED;
130  }
131  SC_ATOMIC_SET(fw->detect_thread, detect_thread);
132  }
133 
134  /* Setup outputs for this thread. */
135  if (OutputLoggerThreadInit(tv, initdata, &fw->output_thread) != TM_ECODE_OK) {
136  FlowWorkerThreadDeinit(tv, fw);
137  return TM_ECODE_FAILED;
138  }
139 
142 
143  /* setup pq for stream end pkts */
144  memset(&fw->pq, 0, sizeof(PacketQueue));
145  SCMutexInit(&fw->pq.mutex_q, NULL);
146 
147  *data = fw;
148  return TM_ECODE_OK;
149 }
150 
151 static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data)
152 {
153  FlowWorkerThreadData *fw = data;
154 
155  DecodeThreadVarsFree(tv, fw->dtv);
156 
157  /* free TCP */
158  StreamTcpThreadDeinit(tv, (void *)fw->stream_thread);
159 
160  /* free DETECT */
161  void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
162  if (detect_thread != NULL) {
163  DetectEngineThreadCtxDeinit(tv, detect_thread);
164  SC_ATOMIC_SET(fw->detect_thread, NULL);
165  }
166 
167  /* Free output. */
169 
170  /* free pq */
171  BUG_ON(fw->pq.len);
172  SCMutexDestroy(&fw->pq.mutex_q);
173 
174  SC_ATOMIC_DESTROY(fw->detect_thread);
175  SCFree(fw);
176  return TM_ECODE_OK;
177 }
178 
179 TmEcode Detect(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq);
181 
182 static void FlowPruneFiles(Packet *p)
183 {
184  if (p->flow && p->flow->alstate) {
185  Flow *f = p->flow;
188  if (fc != NULL) {
189  FilePrune(fc);
190  }
191  }
192 }
193 
194 static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, PacketQueue *unused)
195 {
196  FlowWorkerThreadData *fw = data;
197  void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
198 
199  SCLogDebug("packet %"PRIu64, p->pcap_cnt);
200 
201  /* update time */
202  if (!(PKT_IS_PSEUDOPKT(p))) {
203  TimeSetByThread(tv->id, &p->ts);
204  }
205 
206  /* handle Flow */
207  if (p->flags & PKT_WANTS_FLOW) {
209 
210  FlowHandlePacket(tv, fw->dtv, p);
211  if (likely(p->flow != NULL)) {
213  if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) {
214  FLOWLOCK_UNLOCK(p->flow);
215  return TM_ECODE_OK;
216  }
217  }
218  /* Flow is now LOCKED */
219 
221 
222  /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
223  * pseudo packet created by the flow manager. */
224  } else if (p->flags & PKT_HAS_FLOW) {
225  FLOWLOCK_WRLOCK(p->flow);
226  }
227 
228  SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no");
229 
230  /* handle TCP and app layer */
231  if (p->flow && PKT_IS_TCP(p)) {
232  SCLogDebug("packet %"PRIu64" is TCP. Direction %s", p->pcap_cnt, PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
234 
235  /* if detect is disabled, we need to apply file flags to the flow
236  * here on the first packet. */
237  if (detect_thread == NULL &&
240  {
242  }
243 
245  StreamTcp(tv, p, fw->stream_thread, &fw->pq, NULL);
247 
248  if (FlowChangeProto(p->flow)) {
249  StreamTcpDetectLogFlush(tv, fw->stream_thread, p->flow, p, &fw->pq);
250  }
251 
252  /* Packets here can safely access p->flow as it's locked */
253  SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len);
254  Packet *x;
255  while ((x = PacketDequeue(&fw->pq))) {
256  SCLogDebug("packet %"PRIu64" extra packet %p", p->pcap_cnt, x);
257 
258  // TODO do we need to call StreamTcp on these pseudo packets or not?
259  //StreamTcp(tv, x, fw->stream_thread, &fw->pq, NULL);
260  if (detect_thread != NULL) {
262  Detect(tv, x, detect_thread, NULL, NULL);
264  }
265 
266  // Outputs
267  OutputLoggerLog(tv, x, fw->output_thread);
268 
269  /* put these packets in the preq queue so that they are
270  * by the other thread modules before packet 'p'. */
271  PacketEnqueue(preq, x);
272  }
273 
274  /* handle the app layer part of the UDP packet payload */
275  } else if (p->flow && p->proto == IPPROTO_UDP) {
279  }
280 
282 
283  /* handle Detect */
285  SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
286 
287  if (detect_thread != NULL) {
289  Detect(tv, p, detect_thread, NULL, NULL);
291  }
292 
293  // Outputs.
294  OutputLoggerLog(tv, p, fw->output_thread);
295 
296  /* Prune any stored files. */
297  FlowPruneFiles(p);
298 
299  /* Release tcp segments. Done here after alerting can use them. */
300  if (p->flow != NULL && p->proto == IPPROTO_TCP) {
305  }
306 
307  if (p->flow) {
309 
310  /* run tx cleanup last */
312  FLOWLOCK_UNLOCK(p->flow);
313  }
314 
315  return TM_ECODE_OK;
316 }
317 
318 void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
319 {
320  FlowWorkerThreadData *fw = flow_worker;
321 
322  SC_ATOMIC_SET(fw->detect_thread, detect_ctx);
323 }
324 
325 void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
326 {
327  FlowWorkerThreadData *fw = flow_worker;
328 
329  return SC_ATOMIC_GET(fw->detect_thread);
330 }
331 
333 {
334  switch (fwi) {
336  return "flow";
338  return "stream";
340  return "app-layer";
342  return "detect";
344  return "tcp-prune";
346  return "size";
347  }
348  return "error";
349 }
350 
351 static void FlowWorkerExitPrintStats(ThreadVars *tv, void *data)
352 {
353  FlowWorkerThreadData *fw = data;
355 }
356 
358 {
359  tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
360  tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
361  tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
362  tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
363  tmm_modules[TMM_FLOWWORKER].ThreadExitPrintStats = FlowWorkerExitPrintStats;
366 }
TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data)
Definition: output.c:917
DecodeThreadVars * DecodeThreadVarsAlloc(ThreadVars *tv)
Alloc and setup DecodeThreadVars.
Definition: decode.c:614
#define SCLogDebug(...)
Definition: util-debug.h:335
TmEcode StreamTcp(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *)
Definition: stream-tcp.c:5142
struct Flow_ * flow
Definition: decode.h:446
uint8_t cap_flags
Definition: tm-modules.h:67
void FlowHandlePacket(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Entry point for packet flow handling.
Definition: flow.c:499
#define BUG_ON(x)
uint8_t flags
Definition: tm-modules.h:70
DetectEngineThreadCtx * DetectEngineThreadCtxPtr
Definition: flow-worker.c:48
void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueue *pq)
create packets in both directions to flush out logging and detection before switching protocols...
Definition: stream-tcp.c:6127
#define PKT_IS_TOCLIENT(p)
Definition: decode.h:259
StreamTcpThread * stream_thread
Definition: flow-worker.c:54
#define FLOWLOCK_UNLOCK(fb)
Definition: flow.h:243
TmEcode(* Func)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *)
Definition: tm-modules.h:52
void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
Definition: flow-worker.c:318
void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
Definition: decode.c:474
ProfileFlowWorkerId
Definition: flow-worker.h:21
void TimeSetByThread(const int thread_id, const struct timeval *tv)
Definition: util-time.c:107
void * FlowWorkerGetDetectCtxPtr(void *flow_worker)
Definition: flow-worker.c:325
void OutputLoggerExitPrintStats(ThreadVars *tv, void *thread_data)
Definition: output.c:990
#define FLOW_PKT_TOSERVER_FIRST
Definition: flow.h:206
uint16_t both_bypass_pkts
Definition: flow-worker.c:64
#define TM_FLAG_STREAM_TM
Definition: tm-modules.h:33
#define FLOWLOCK_WRLOCK(fb)
Definition: flow.h:240
uint64_t pcap_cnt
Definition: decode.h:562
TmEcode DetectEngineThreadCtxInit(ThreadVars *, void *, void **)
initialize thread specific detection engine context
TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
Definition: stream-tcp.c:5178
uint16_t local_bypass_bytes
Definition: flow-worker.c:63
#define SC_ATOMIC_DESTROY(name)
Destroy the lock used to protect this variable.
Definition: util-atomic.h:97
TmEcode DetectEngineThreadCtxDeinit(ThreadVars *, void *)
TcpReassemblyThreadCtx * ra_ctx
Definition: stream-tcp.h:103
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:943
void * alstate
Definition: flow.h:438
#define SC_ATOMIC_INIT(name)
Initialize the previously declared atomic variable and it&#39;s lock.
Definition: util-atomic.h:81
#define SCCalloc(nm, a)
Definition: util-mem.h:253
#define FLOWWORKER_PROFILING_START(p, id)
uint8_t proto
Definition: decode.h:431
TmEcode OutputLoggerThreadInit(ThreadVars *tv, const void *initdata, void **data)
Definition: output.c:933
uint32_t len
Definition: decode.h:624
Structure to hold thread specific data for all decode modules.
Definition: decode.h:633
#define DEBUG_ASSERT_FLOW_LOCKED(f)
TmEcode(* ThreadDeinit)(ThreadVars *, void *)
Definition: tm-modules.h:49
DecodeThreadVars * dtv
Definition: flow-worker.c:51
uint8_t flowflags
Definition: decode.h:440
int FlowChangeProto(Flow *f)
Check if change proto flag is set for flow.
Definition: flow.c:240
int DetectEngineEnabled(void)
Check if detection is enabled.
#define STREAM_TOCLIENT
Definition: stream.h:32
#define PKT_IS_TOSERVER(p)
Definition: decode.h:258
#define FLOW_PKT_TOSERVER
Definition: flow.h:201
void TmModuleFlowWorkerRegister(void)
Definition: flow-worker.c:357
TmEcode OutputLoggerThreadDeinit(ThreadVars *tv, void *thread_data)
Definition: output.c:964
void PacketUpdateEngineEventCounters(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
Definition: decode.c:122
#define SCMutexInit(mut, mutattrs)
void(* ThreadExitPrintStats)(ThreadVars *, void *)
Definition: tm-modules.h:48
void FilePrune(FileContainer *ffc)
Definition: util-file.c:342
TmEcode Detect(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
Detection engine thread wrapper.
Definition: detect.c:1602
#define FLOW_PKT_TOCLIENT_FIRST
Definition: flow.h:207
struct FlowWorkerThreadData_ FlowWorkerThreadData
const char * name
Definition: tm-modules.h:44
uint16_t local_bypass_pkts
Definition: flow-worker.c:62
void DisableDetectFlowFileFlags(Flow *f)
disable file features we don&#39;t need Called if we have no detection engine.
Definition: detect.c:1660
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:207
void AppLayerParserTransactionsCleanup(Flow *f)
remove obsolete (inspected and logged) transactions
#define SCFree(a)
Definition: util-mem.h:322
#define PKT_IS_TCP(p)
Definition: decode.h:254
TmModule tmm_modules[TMM_SIZE]
Definition: tm-modules.h:73
TmEcode StreamTcpThreadDeinit(ThreadVars *tv, void *data)
Definition: stream-tcp.c:5245
#define FLOWWORKER_PROFILING_END(p, id)
SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread)
#define STREAM_TOSERVER
Definition: stream.h:31
TmEcode(* ThreadInit)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:47
void FlowHandlePacketUpdate(Flow *f, Packet *p)
Update Packet and Flow.
Definition: flow.c:398
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:192
#define PKT_HAS_FLOW
Definition: decode.h:1094
Packet * PacketDequeue(PacketQueue *q)
Definition: packet-queue.c:167
#define PKT_WANTS_FLOW
Definition: decode.h:1115
#define PKT_IS_PSEUDOPKT(p)
return 1 if the packet is a pseudo packet
Definition: decode.h:1133
const char * ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
Definition: flow-worker.c:332
uint16_t both_bypass_bytes
Definition: flow-worker.c:65
#define TM_FLAG_DETECT_TM
Definition: tm-modules.h:34
int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow *f)
Handle a app layer UDP message.
Definition: app-layer.c:688
Per thread variable structure.
Definition: threadvars.h:57
struct timeval ts
Definition: decode.h:452
AppProto alproto
application level protocol
Definition: flow.h:409
#define GET_PKT_LEN(p)
Definition: decode.h:225
uint32_t flags
Definition: decode.h:444
FileContainer * AppLayerParserGetFiles(uint8_t ipproto, AppProto alproto, void *alstate, uint8_t direction)
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:147
#define likely(expr)
Definition: util-optimize.h:32
void DecodeThreadVarsFree(ThreadVars *tv, DecodeThreadVars *dtv)
Definition: decode.c:633
Flow data structure.
Definition: flow.h:325
void PacketEnqueue(PacketQueue *q, Packet *p)
Definition: packet-queue.c:139
void AppLayerRegisterThreadCounters(ThreadVars *tv)
Registers per flow counters for all protocols.
Definition: app-layer.c:926
SCMutex mutex_q
Definition: decode.h:628
#define SCMutexDestroy
void StreamTcpPruneSession(Flow *f, uint8_t flags)
Remove idle TcpSegments from TcpSession.