suricata
output-streaming.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2014 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  *
23  * Logger for streaming data
24  */
25 
26 #include "suricata-common.h"
27 #include "tm-modules.h"
28 #include "output.h"
29 #include "output-streaming.h"
30 #include "app-layer.h"
31 #include "app-layer-parser.h"
32 #include "app-layer-htp.h"
33 #include "util-print.h"
34 #include "conf.h"
35 #include "util-profiling.h"
36 #include "stream-tcp.h"
37 #include "stream-tcp-inline.h"
38 #include "stream-tcp-reassemble.h"
39 
40 typedef struct OutputLoggerThreadStore_ {
41  void *thread_data;
44 
45 /** per thread data for this module, contains a list of per thread
46  * data for the packet loggers. */
47 typedef struct OutputLoggerThreadData_ {
49  uint32_t loggers;
51 
52 /* logger instance, a module + a output ctx,
53  * it's perfectly valid that have multiple instances of the same
54  * log module (e.g. http.log) with different output ctx'. */
55 typedef struct OutputStreamingLogger_ {
59  const char *name;
66 
67 static OutputStreamingLogger *list = NULL;
68 
74 {
75  OutputStreamingLogger *op = SCMalloc(sizeof(*op));
76  if (op == NULL)
77  return -1;
78  memset(op, 0x00, sizeof(*op));
79 
80  op->LogFunc = LogFunc;
81  op->output_ctx = output_ctx;
82  op->name = name;
83  op->logger_id = id;
84  op->type = type;
85  op->ThreadInit = ThreadInit;
88 
89  if (list == NULL)
90  list = op;
91  else {
92  OutputStreamingLogger *t = list;
93  while (t->next)
94  t = t->next;
95  t->next = op;
96  }
97 
98  if (op->type == STREAMING_TCP_DATA) {
100  }
101 
102  SCLogDebug("OutputRegisterStreamingLogger happy");
103  return 0;
104 }
105 
106 typedef struct StreamerCallbackData_ {
113 
114 static int Streamer(void *cbdata, Flow *f, const uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags)
115 {
116  StreamerCallbackData *streamer_cbdata = (StreamerCallbackData *)cbdata;
117  BUG_ON(streamer_cbdata == NULL);
118  OutputStreamingLogger *logger = streamer_cbdata->logger;
119  OutputLoggerThreadStore *store = streamer_cbdata->store;
120  ThreadVars *tv = streamer_cbdata->tv;
121 #ifdef PROFILING
122  Packet *p = streamer_cbdata->p;
123 #endif
124  BUG_ON(logger == NULL);
125  BUG_ON(store == NULL);
126 
127  while (logger && store) {
128  BUG_ON(logger->LogFunc == NULL);
129 
130  if (logger->type == streamer_cbdata->type) {
131  SCLogDebug("logger %p", logger);
133  logger->LogFunc(tv, store->thread_data, (const Flow *)f, data, data_len, tx_id, flags);
135  }
136 
137  logger = logger->next;
138  store = store->next;
139 
140  BUG_ON(logger == NULL && store != NULL);
141  BUG_ON(logger != NULL && store == NULL);
142  }
143 
144  return 0;
145 }
146 
147 /** \brief Http Body Iterator for logging
148  *
149  * Global logic:
150  *
151  * - For each tx
152  * - For each body chunk
153  * - Invoke Streamer
154  */
155 
156 static int HttpBodyIterator(Flow *f, int close, void *cbdata, uint8_t iflags)
157 {
158  SCLogDebug("called with %p, %d, %p, %02x", f, close, cbdata, iflags);
159 
160  HtpState *s = f->alstate;
161  if (s == NULL || s->conn == NULL) {
162  return 0;
163  }
164 
165  const int tx_progress_done_value_ts =
168  const int tx_progress_done_value_tc =
171  const uint64_t total_txs = AppLayerParserGetTxCnt(f, f->alstate);
172 
173  uint64_t tx_id = 0;
174  for (tx_id = 0; tx_id < total_txs; tx_id++) { // TODO optimization store log tx
175  htp_tx_t *tx = AppLayerParserGetTx(f->proto, f->alproto, f->alstate, tx_id);
176  if (tx == NULL) {
177  continue;
178  }
179 
180  int tx_done = 0;
181  int tx_logged = 0;
182  int tx_progress_ts = AppLayerParserGetStateProgress(
183  IPPROTO_TCP, ALPROTO_HTTP, tx, FlowGetDisruptionFlags(f, STREAM_TOSERVER));
184  if (tx_progress_ts >= tx_progress_done_value_ts) {
185  int tx_progress_tc = AppLayerParserGetStateProgress(
186  IPPROTO_TCP, ALPROTO_HTTP, tx, FlowGetDisruptionFlags(f, STREAM_TOCLIENT));
187  if (tx_progress_tc >= tx_progress_done_value_tc) {
188  tx_done = 1;
189  }
190  }
191 
192  SCLogDebug("tx %p", tx);
193  HtpTxUserData *htud = (HtpTxUserData *) htp_tx_get_user_data(tx);
194  if (htud != NULL) {
195  SCLogDebug("htud %p", htud);
196  HtpBody *body = NULL;
197  if (iflags & OUTPUT_STREAMING_FLAG_TOSERVER)
198  body = &htud->request_body;
199  else if (iflags & OUTPUT_STREAMING_FLAG_TOCLIENT)
200  body = &htud->response_body;
201 
202  if (body == NULL) {
203  SCLogDebug("no body");
204  goto next;
205  }
206  if (body->first == NULL) {
207  SCLogDebug("no body chunks");
208  goto next;
209  }
210  if (body->last->logged == 1) {
211  SCLogDebug("all logged already");
212  goto next;
213  }
214 
215  // for each chunk
216  HtpBodyChunk *chunk = body->first;
217  for ( ; chunk != NULL; chunk = chunk->next) {
218  if (chunk->logged) {
219  SCLogDebug("logged %d", chunk->logged);
220  continue;
221  }
222 
223  uint8_t flags = iflags | OUTPUT_STREAMING_FLAG_TRANSACTION;
224  if (chunk->sbseg.stream_offset == 0)
226  /* if we need to close and we're at the last segment in the list
227  * we add the 'close' flag so the logger can close up. */
228  if ((tx_done || close) && chunk->next == NULL) {
230  }
231 
232  const uint8_t *data = NULL;
233  uint32_t data_len = 0;
234  StreamingBufferSegmentGetData(body->sb, &chunk->sbseg, &data, &data_len);
235 
236  // invoke Streamer
237  Streamer(cbdata, f, data, data_len, tx_id, flags);
238  //PrintRawDataFp(stdout, data, data_len);
239  chunk->logged = 1;
240  tx_logged = 1;
241  }
242 
243  next:
244  /* if we need to close we need to invoke the Streamer for sure. If we
245  * logged no chunks, we call the Streamer with NULL data so it can
246  * close up. */
247  if (tx_logged == 0 && (close||tx_done)) {
248  Streamer(cbdata, f, NULL, 0, tx_id,
250  }
251  }
252  }
253  return 0;
254 }
255 
257  uint8_t flags;
259  Flow *f;
260 };
261 
262 static int StreamLogFunc(void *cb_data, const uint8_t *data, const uint32_t data_len)
263 {
264  struct StreamLogData *log = cb_data;
265 
266  Streamer(log->streamer_cbdata, log->f, data, data_len, 0, log->flags);
267 
268  /* hack: unset open flag after first run */
270 
271  return 0;
272 }
273 
274 static int TcpDataLogger (Flow *f, TcpSession *ssn, TcpStream *stream,
275  bool eof, uint8_t iflags, void *streamer_cbdata)
276 {
277  uint8_t flags = iflags;
278  uint64_t progress = STREAM_LOG_PROGRESS(stream);
279 
280  if (progress == 0)
282 
283  struct StreamLogData log_data = { flags, streamer_cbdata, f };
284  StreamReassembleLog(ssn, stream,
285  StreamLogFunc, &log_data,
286  progress, &progress, eof);
287 
288  if (progress > STREAM_LOG_PROGRESS(stream)) {
289  uint32_t slide = progress - STREAM_LOG_PROGRESS(stream);
290  stream->log_progress_rel += slide;
291  }
292 
293  if (eof) {
294  Streamer(streamer_cbdata, f, NULL, 0, 0, flags|OUTPUT_STREAMING_FLAG_CLOSE);
295  }
296  return 0;
297 }
298 
299 static TmEcode OutputStreamingLog(ThreadVars *tv, Packet *p, void *thread_data)
300 {
301  BUG_ON(thread_data == NULL);
302 
303  if (list == NULL) {
304  /* No child loggers. */
305  return TM_ECODE_OK;
306  }
307 
308  OutputLoggerThreadData *op_thread_data = (OutputLoggerThreadData *)thread_data;
309  OutputStreamingLogger *logger = list;
310  OutputLoggerThreadStore *store = op_thread_data->store;
311 
312  StreamerCallbackData streamer_cbdata = { logger, store, tv, p , 0};
313 
314  BUG_ON(logger == NULL && store != NULL);
315  BUG_ON(logger != NULL && store == NULL);
316  BUG_ON(logger == NULL && store == NULL);
317 
318  uint8_t flags = 0;
319  Flow * const f = p->flow;
320 
321  /* no flow, no streaming */
322  if (f == NULL) {
324  }
325 
326  if (!(StreamTcpInlineMode())) {
327  if (PKT_IS_TOCLIENT(p)) {
329  } else {
331  }
332  } else {
333  if (PKT_IS_TOSERVER(p)) {
335  } else {
337  }
338  }
339 
340  if (op_thread_data->loggers & (1<<STREAMING_TCP_DATA)) {
341  TcpSession *ssn = f->protoctx;
342  if (ssn) {
343  int close = (ssn->state >= TCP_CLOSED);
344  close |= ((p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0);
345  SCLogDebug("close ? %s", close ? "yes" : "no");
346 
347  TcpStream *stream = flags & OUTPUT_STREAMING_FLAG_TOSERVER ? &ssn->client : &ssn->server;
348  streamer_cbdata.type = STREAMING_TCP_DATA;
349  TcpDataLogger(f, ssn, stream, close, flags, (void *)&streamer_cbdata);
350  }
351  }
352  if (op_thread_data->loggers & (1<<STREAMING_HTTP_BODIES)) {
353  if (f->alproto == ALPROTO_HTTP && f->alstate != NULL) {
354  int close = 0;
355  TcpSession *ssn = f->protoctx;
356  if (ssn) {
357  close = (ssn->state >= TCP_CLOSED);
358  close |= ((p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0);
359  }
360  SCLogDebug("close ? %s", close ? "yes" : "no");
361  streamer_cbdata.type = STREAMING_HTTP_BODIES;
362  HttpBodyIterator(f, close, (void *)&streamer_cbdata, flags);
363  }
364  }
365 
366  return TM_ECODE_OK;
367 }
368 
369 /** \brief thread init for the tx logger
370  * This will run the thread init functions for the individual registered
371  * loggers */
372 static TmEcode OutputStreamingLogThreadInit(ThreadVars *tv, const void *initdata, void **data) {
373  OutputLoggerThreadData *td = SCMalloc(sizeof(*td));
374  if (td == NULL)
375  return TM_ECODE_FAILED;
376  memset(td, 0x00, sizeof(*td));
377 
378  *data = (void *)td;
379 
380  SCLogDebug("OutputStreamingLogThreadInit happy (*data %p)", *data);
381 
382  OutputStreamingLogger *logger = list;
383  while (logger) {
384  if (logger->ThreadInit) {
385  void *retptr = NULL;
386  if (logger->ThreadInit(tv, (void *)logger->output_ctx, &retptr) == TM_ECODE_OK) {
387  OutputLoggerThreadStore *ts = SCMalloc(sizeof(*ts));
388 /* todo */ BUG_ON(ts == NULL);
389  memset(ts, 0x00, sizeof(*ts));
390 
391  /* store thread handle */
392  ts->thread_data = retptr;
393 
394  if (td->store == NULL) {
395  td->store = ts;
396  } else {
397  OutputLoggerThreadStore *tmp = td->store;
398  while (tmp->next != NULL)
399  tmp = tmp->next;
400  tmp->next = ts;
401  }
402 
403  SCLogInfo("%s is now set up", logger->name);
404  }
405  }
406 
407  td->loggers |= (1<<logger->type);
408 
409  logger = logger->next;
410  }
411 
412  return TM_ECODE_OK;
413 }
414 
415 static TmEcode OutputStreamingLogThreadDeinit(ThreadVars *tv, void *thread_data) {
416  OutputLoggerThreadData *op_thread_data = (OutputLoggerThreadData *)thread_data;
417  OutputLoggerThreadStore *store = op_thread_data->store;
418  OutputStreamingLogger *logger = list;
419 
420  while (logger && store) {
421  if (logger->ThreadDeinit) {
422  logger->ThreadDeinit(tv, store->thread_data);
423  }
424 
425  OutputLoggerThreadStore *next_store = store->next;
426  SCFree(store);
427  logger = logger->next;
428  store = next_store;
429  }
430 
431  SCFree(op_thread_data);
432  return TM_ECODE_OK;
433 }
434 
435 static void OutputStreamingLogExitPrintStats(ThreadVars *tv, void *thread_data) {
436  OutputLoggerThreadData *op_thread_data = (OutputLoggerThreadData *)thread_data;
437  OutputLoggerThreadStore *store = op_thread_data->store;
438  OutputStreamingLogger *logger = list;
439 
440  while (logger && store) {
441  if (logger->ThreadExitPrintStats) {
442  logger->ThreadExitPrintStats(tv, store->thread_data);
443  }
444 
445  logger = logger->next;
446  store = store->next;
447  }
448 }
449 
451  OutputRegisterRootLogger(OutputStreamingLogThreadInit,
452  OutputStreamingLogThreadDeinit, OutputStreamingLogExitPrintStats,
453  OutputStreamingLog);
454 }
455 
457 {
458  OutputStreamingLogger *logger = list;
459  while (logger) {
460  OutputStreamingLogger *next_logger = logger->next;
461  SCFree(logger);
462  logger = next_logger;
463  }
464  list = NULL;
465 }
uint16_t flags
#define PACKET_PROFILING_LOGGER_START(p, id)
#define SCLogDebug(...)
Definition: util-debug.h:335
struct Flow_ * flow
Definition: decode.h:443
struct HtpBodyChunk_ * next
#define OUTPUT_STREAMING_FLAG_OPEN
TcpStreamCnf stream_config
Definition: stream-tcp.h:106
#define BUG_ON(x)
uint8_t proto
Definition: flow.h:343
int AppLayerParserGetStateProgressCompletionStatus(AppProto alproto, uint8_t direction)
#define PKT_IS_TOCLIENT(p)
Definition: decode.h:256
LoggerId
int AppLayerParserGetStateProgress(uint8_t ipproto, AppProto alproto, void *alstate, uint8_t flags)
get the progress value for a tx/protocol
OutputLoggerThreadStore * store
void(* ThreadExitPrintStatsFunc)(ThreadVars *, void *)
Definition: tm-modules.h:41
uint64_t AppLayerParserGetTxCnt(const Flow *f, void *alstate)
struct OutputStreamingLogger_ * next
enum OutputStreamingType type
OutputStreamingLogger * logger
OutputLoggerThreadStore * store
Definition: output-file.c:44
#define OUTPUT_STREAMING_FLAG_TOCLIENT
StreamingBufferSegment sbseg
HtpBody request_body
uint8_t FlowGetDisruptionFlags(const Flow *f, uint8_t flags)
get &#39;disruption&#39; flags: GAP/DEPTH/PASS
Definition: flow.c:1077
OutputStreamingType
void * protoctx
Definition: flow.h:395
int StreamReassembleLog(TcpSession *ssn, TcpStream *stream, StreamReassembleRawFunc Callback, void *cb_data, uint64_t progress_in, uint64_t *progress_out, bool eof)
TmEcode(* ThreadDeinitFunc)(ThreadVars *, void *)
Definition: tm-modules.h:40
void * alstate
Definition: flow.h:433
void * AppLayerParserGetTx(uint8_t ipproto, AppProto alproto, void *alstate, uint64_t tx_id)
ThreadExitPrintStatsFunc ThreadExitPrintStats
HtpBody response_body
struct OutputLoggerThreadStore_ * next
Definition: output-file.c:38
int OutputRegisterStreamingLogger(LoggerId id, const char *name, StreamingLogger LogFunc, OutputCtx *output_ctx, enum OutputStreamingType type, ThreadInitFunc ThreadInit, ThreadDeinitFunc ThreadDeinit, ThreadExitPrintStatsFunc ThreadExitPrintStats)
uint8_t type
StreamingBuffer * sb
void OutputRegisterRootLogger(ThreadInitFunc ThreadInit, ThreadDeinitFunc ThreadDeinit, ThreadExitPrintStatsFunc ThreadExitPrintStats, OutputLogFunc LogFunc)
Definition: output.c:999
#define STREAM_TOCLIENT
Definition: stream.h:32
#define PKT_IS_TOSERVER(p)
Definition: decode.h:255
htp_conn_t * conn
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1093
HtpBodyChunk * last
#define SCReturnInt(x)
Definition: util-debug.h:341
#define OUTPUT_STREAMING_FLAG_CLOSE
enum OutputStreamingType type
void StreamingBufferSegmentGetData(const StreamingBuffer *sb, const StreamingBufferSegment *seg, const uint8_t **data, uint32_t *data_len)
void OutputStreamingLoggerRegister(void)
int(* StreamingLogger)(ThreadVars *, void *thread_data, const Flow *f, const uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags)
#define SCMalloc(a)
Definition: util-mem.h:166
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:254
uint32_t log_progress_rel
#define SCFree(a)
Definition: util-mem.h:228
struct OutputLoggerThreadData_ OutputLoggerThreadData
bool streaming_log_api
Definition: stream-tcp.h:65
uint16_t tx_id
uint64_t ts
#define STREAM_TOSERVER
Definition: stream.h:31
ThreadInitFunc ThreadInit
#define OUTPUT_STREAMING_FLAG_TRANSACTION
HtpBodyChunk * first
Per thread variable structure.
Definition: threadvars.h:57
TmEcode(* ThreadInitFunc)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:39
AppProto alproto
application level protocol
Definition: flow.h:404
#define PACKET_PROFILING_LOGGER_END(p, id)
uint32_t flags
Definition: decode.h:441
StreamingLogger LogFunc
int StreamTcpInlineMode(void)
See if stream engine is operating in inline mode.
Definition: stream-tcp.c:6282
ThreadDeinitFunc ThreadDeinit
Flow data structure.
Definition: flow.h:324
#define OUTPUT_STREAMING_FLAG_TOSERVER
struct StreamerCallbackData_ StreamerCallbackData
struct OutputStreamingLogger_ OutputStreamingLogger
#define STREAM_LOG_PROGRESS(stream)
void OutputStreamingShutdown(void)
struct OutputLoggerThreadStore_ OutputLoggerThreadStore