suricata
output-streaming.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2014 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  *
23  * Logger for streaming data
24  */
25 
26 #include "suricata-common.h"
27 #include "tm-modules.h"
28 #include "output.h"
29 #include "output-streaming.h"
30 #include "app-layer.h"
31 #include "app-layer-parser.h"
32 #include "app-layer-htp.h"
33 #include "util-print.h"
34 #include "conf.h"
35 #include "util-profiling.h"
36 #include "stream-tcp.h"
37 #include "stream-tcp-inline.h"
38 #include "stream-tcp-reassemble.h"
39 #include "util-validate.h"
40 
41 typedef struct OutputLoggerThreadStore_ {
42  void *thread_data;
45 
46 /** per thread data for this module, contains a list of per thread
47  * data for the packet loggers. */
48 typedef struct OutputLoggerThreadData_ {
50  uint32_t loggers;
52 
53 /* logger instance, a module + a output ctx,
54  * it's perfectly valid that have multiple instances of the same
55  * log module (e.g. http.log) with different output ctx'. */
56 typedef struct OutputStreamingLogger_ {
60  const char *name;
67 
68 static OutputStreamingLogger *list = NULL;
69 
75 {
76  OutputStreamingLogger *op = SCMalloc(sizeof(*op));
77  if (op == NULL)
78  return -1;
79  memset(op, 0x00, sizeof(*op));
80 
81  op->LogFunc = LogFunc;
82  op->output_ctx = output_ctx;
83  op->name = name;
84  op->logger_id = id;
85  op->type = type;
86  op->ThreadInit = ThreadInit;
89 
90  if (list == NULL)
91  list = op;
92  else {
93  OutputStreamingLogger *t = list;
94  while (t->next)
95  t = t->next;
96  t->next = op;
97  }
98 
99  if (op->type == STREAMING_TCP_DATA) {
101  }
102 
103  SCLogDebug("OutputRegisterStreamingLogger happy");
104  return 0;
105 }
106 
107 typedef struct StreamerCallbackData_ {
114 
115 static int Streamer(void *cbdata, Flow *f, const uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags)
116 {
117  StreamerCallbackData *streamer_cbdata = (StreamerCallbackData *)cbdata;
118  DEBUG_VALIDATE_BUG_ON(streamer_cbdata == NULL);
119  OutputStreamingLogger *logger = streamer_cbdata->logger;
120  OutputLoggerThreadStore *store = streamer_cbdata->store;
121  ThreadVars *tv = streamer_cbdata->tv;
122 #ifdef PROFILING
123  Packet *p = streamer_cbdata->p;
124 #endif
125  DEBUG_VALIDATE_BUG_ON(logger == NULL);
126  DEBUG_VALIDATE_BUG_ON(store == NULL);
127 
128  while (logger && store) {
129  DEBUG_VALIDATE_BUG_ON(logger->LogFunc == NULL);
130 
131  if (logger->type == streamer_cbdata->type) {
132  SCLogDebug("logger %p", logger);
134  logger->LogFunc(tv, store->thread_data, (const Flow *)f, data, data_len, tx_id, flags);
136  }
137 
138  logger = logger->next;
139  store = store->next;
140 
141  DEBUG_VALIDATE_BUG_ON(logger == NULL && store != NULL);
142  DEBUG_VALIDATE_BUG_ON(logger != NULL && store == NULL);
143  }
144 
145  return 0;
146 }
147 
148 /** \brief Http Body Iterator for logging
149  *
150  * Global logic:
151  *
152  * - For each tx
153  * - For each body chunk
154  * - Invoke Streamer
155  */
156 
157 static int HttpBodyIterator(Flow *f, int close, void *cbdata, uint8_t iflags)
158 {
159  SCLogDebug("called with %p, %d, %p, %02x", f, close, cbdata, iflags);
160 
161  HtpState *s = f->alstate;
162  if (s == NULL || s->conn == NULL) {
163  return 0;
164  }
165 
166  const int tx_progress_done_value_ts =
169  const int tx_progress_done_value_tc =
172  const uint64_t total_txs = AppLayerParserGetTxCnt(f, f->alstate);
173 
174  uint64_t tx_id = 0;
175  for (tx_id = 0; tx_id < total_txs; tx_id++) { // TODO optimization store log tx
176  htp_tx_t *tx = AppLayerParserGetTx(f->proto, f->alproto, f->alstate, tx_id);
177  if (tx == NULL) {
178  continue;
179  }
180 
181  int tx_done = 0;
182  int tx_logged = 0;
183  int tx_progress_ts = AppLayerParserGetStateProgress(
184  IPPROTO_TCP, ALPROTO_HTTP, tx, FlowGetDisruptionFlags(f, STREAM_TOSERVER));
185  if (tx_progress_ts >= tx_progress_done_value_ts) {
186  int tx_progress_tc = AppLayerParserGetStateProgress(
187  IPPROTO_TCP, ALPROTO_HTTP, tx, FlowGetDisruptionFlags(f, STREAM_TOCLIENT));
188  if (tx_progress_tc >= tx_progress_done_value_tc) {
189  tx_done = 1;
190  }
191  }
192 
193  SCLogDebug("tx %p", tx);
194  HtpTxUserData *htud = (HtpTxUserData *) htp_tx_get_user_data(tx);
195  if (htud != NULL) {
196  SCLogDebug("htud %p", htud);
197  HtpBody *body = NULL;
198  if (iflags & OUTPUT_STREAMING_FLAG_TOSERVER)
199  body = &htud->request_body;
200  else if (iflags & OUTPUT_STREAMING_FLAG_TOCLIENT)
201  body = &htud->response_body;
202 
203  if (body == NULL) {
204  SCLogDebug("no body");
205  goto next;
206  }
207  if (body->first == NULL) {
208  SCLogDebug("no body chunks");
209  goto next;
210  }
211  if (body->last->logged == 1) {
212  SCLogDebug("all logged already");
213  goto next;
214  }
215 
216  // for each chunk
217  HtpBodyChunk *chunk = body->first;
218  for ( ; chunk != NULL; chunk = chunk->next) {
219  if (chunk->logged) {
220  SCLogDebug("logged %d", chunk->logged);
221  continue;
222  }
223 
224  uint8_t flags = iflags | OUTPUT_STREAMING_FLAG_TRANSACTION;
225  if (chunk->sbseg.stream_offset == 0)
227  /* if we need to close and we're at the last segment in the list
228  * we add the 'close' flag so the logger can close up. */
229  if ((tx_done || close) && chunk->next == NULL) {
231  }
232 
233  const uint8_t *data = NULL;
234  uint32_t data_len = 0;
235  StreamingBufferSegmentGetData(body->sb, &chunk->sbseg, &data, &data_len);
236 
237  // invoke Streamer
238  Streamer(cbdata, f, data, data_len, tx_id, flags);
239  //PrintRawDataFp(stdout, data, data_len);
240  chunk->logged = 1;
241  tx_logged = 1;
242  }
243 
244  next:
245  /* if we need to close we need to invoke the Streamer for sure. If we
246  * logged no chunks, we call the Streamer with NULL data so it can
247  * close up. */
248  if (tx_logged == 0 && (close||tx_done)) {
249  Streamer(cbdata, f, NULL, 0, tx_id,
251  }
252  }
253  }
254  return 0;
255 }
256 
258  uint8_t flags;
260  Flow *f;
261 };
262 
263 static int StreamLogFunc(void *cb_data, const uint8_t *data, const uint32_t data_len)
264 {
265  struct StreamLogData *log = cb_data;
266 
267  Streamer(log->streamer_cbdata, log->f, data, data_len, 0, log->flags);
268 
269  /* hack: unset open flag after first run */
271 
272  return 0;
273 }
274 
275 static int TcpDataLogger (Flow *f, TcpSession *ssn, TcpStream *stream,
276  bool eof, uint8_t iflags, void *streamer_cbdata)
277 {
278  uint8_t flags = iflags;
279  uint64_t progress = STREAM_LOG_PROGRESS(stream);
280 
281  if (progress == 0)
283 
284  struct StreamLogData log_data = { flags, streamer_cbdata, f };
285  StreamReassembleLog(ssn, stream,
286  StreamLogFunc, &log_data,
287  progress, &progress, eof);
288 
289  if (progress > STREAM_LOG_PROGRESS(stream)) {
290  uint32_t slide = progress - STREAM_LOG_PROGRESS(stream);
291  stream->log_progress_rel += slide;
292  }
293 
294  if (eof) {
295  Streamer(streamer_cbdata, f, NULL, 0, 0, flags|OUTPUT_STREAMING_FLAG_CLOSE);
296  }
297  return 0;
298 }
299 
300 static TmEcode OutputStreamingLog(ThreadVars *tv, Packet *p, void *thread_data)
301 {
302  DEBUG_VALIDATE_BUG_ON(thread_data == NULL);
303 
304  if (list == NULL) {
305  /* No child loggers. */
306  return TM_ECODE_OK;
307  }
308 
309  OutputLoggerThreadData *op_thread_data = (OutputLoggerThreadData *)thread_data;
310  OutputStreamingLogger *logger = list;
311  OutputLoggerThreadStore *store = op_thread_data->store;
312 
313  StreamerCallbackData streamer_cbdata = { logger, store, tv, p , 0};
314 
315  DEBUG_VALIDATE_BUG_ON(logger == NULL && store != NULL);
316  DEBUG_VALIDATE_BUG_ON(logger != NULL && store == NULL);
317  DEBUG_VALIDATE_BUG_ON(logger == NULL && store == NULL);
318 
319  uint8_t flags = 0;
320  Flow * const f = p->flow;
321 
322  /* no flow, no streaming */
323  if (f == NULL) {
325  }
326 
327  if (!(StreamTcpInlineMode())) {
328  if (PKT_IS_TOCLIENT(p)) {
330  } else {
332  }
333  } else {
334  if (PKT_IS_TOSERVER(p)) {
336  } else {
338  }
339  }
340 
341  if (op_thread_data->loggers & (1<<STREAMING_TCP_DATA)) {
342  TcpSession *ssn = f->protoctx;
343  if (ssn) {
344  int close = (ssn->state >= TCP_CLOSED);
345  close |= ((p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0);
346  SCLogDebug("close ? %s", close ? "yes" : "no");
347 
348  TcpStream *stream = flags & OUTPUT_STREAMING_FLAG_TOSERVER ? &ssn->client : &ssn->server;
349  streamer_cbdata.type = STREAMING_TCP_DATA;
350  TcpDataLogger(f, ssn, stream, close, flags, (void *)&streamer_cbdata);
351  }
352  }
353  if (op_thread_data->loggers & (1<<STREAMING_HTTP_BODIES)) {
354  if (f->alproto == ALPROTO_HTTP && f->alstate != NULL) {
355  int close = 0;
356  TcpSession *ssn = f->protoctx;
357  if (ssn) {
358  close = (ssn->state >= TCP_CLOSED);
359  close |= ((p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0);
360  }
361  SCLogDebug("close ? %s", close ? "yes" : "no");
362  streamer_cbdata.type = STREAMING_HTTP_BODIES;
363  HttpBodyIterator(f, close, (void *)&streamer_cbdata, flags);
364  }
365  }
366 
367  return TM_ECODE_OK;
368 }
369 
370 /** \brief thread init for the tx logger
371  * This will run the thread init functions for the individual registered
372  * loggers */
373 static TmEcode OutputStreamingLogThreadInit(ThreadVars *tv, const void *initdata, void **data) {
374  OutputLoggerThreadData *td = SCMalloc(sizeof(*td));
375  if (td == NULL)
376  return TM_ECODE_FAILED;
377  memset(td, 0x00, sizeof(*td));
378 
379  *data = (void *)td;
380 
381  SCLogDebug("OutputStreamingLogThreadInit happy (*data %p)", *data);
382 
383  OutputStreamingLogger *logger = list;
384  while (logger) {
385  if (logger->ThreadInit) {
386  void *retptr = NULL;
387  if (logger->ThreadInit(tv, (void *)logger->output_ctx, &retptr) == TM_ECODE_OK) {
388  OutputLoggerThreadStore *ts = SCMalloc(sizeof(*ts));
389 /* todo */ BUG_ON(ts == NULL);
390  memset(ts, 0x00, sizeof(*ts));
391 
392  /* store thread handle */
393  ts->thread_data = retptr;
394 
395  if (td->store == NULL) {
396  td->store = ts;
397  } else {
398  OutputLoggerThreadStore *tmp = td->store;
399  while (tmp->next != NULL)
400  tmp = tmp->next;
401  tmp->next = ts;
402  }
403 
404  SCLogInfo("%s is now set up", logger->name);
405  }
406  }
407 
408  td->loggers |= (1<<logger->type);
409 
410  logger = logger->next;
411  }
412 
413  return TM_ECODE_OK;
414 }
415 
416 static TmEcode OutputStreamingLogThreadDeinit(ThreadVars *tv, void *thread_data) {
417  OutputLoggerThreadData *op_thread_data = (OutputLoggerThreadData *)thread_data;
418  OutputLoggerThreadStore *store = op_thread_data->store;
419  OutputStreamingLogger *logger = list;
420 
421  while (logger && store) {
422  if (logger->ThreadDeinit) {
423  logger->ThreadDeinit(tv, store->thread_data);
424  }
425 
426  OutputLoggerThreadStore *next_store = store->next;
427  SCFree(store);
428  logger = logger->next;
429  store = next_store;
430  }
431 
432  SCFree(op_thread_data);
433  return TM_ECODE_OK;
434 }
435 
436 static void OutputStreamingLogExitPrintStats(ThreadVars *tv, void *thread_data) {
437  OutputLoggerThreadData *op_thread_data = (OutputLoggerThreadData *)thread_data;
438  OutputLoggerThreadStore *store = op_thread_data->store;
439  OutputStreamingLogger *logger = list;
440 
441  while (logger && store) {
442  if (logger->ThreadExitPrintStats) {
443  logger->ThreadExitPrintStats(tv, store->thread_data);
444  }
445 
446  logger = logger->next;
447  store = store->next;
448  }
449 }
450 
452  OutputRegisterRootLogger(OutputStreamingLogThreadInit,
453  OutputStreamingLogThreadDeinit, OutputStreamingLogExitPrintStats,
454  OutputStreamingLog);
455 }
456 
458 {
459  OutputStreamingLogger *logger = list;
460  while (logger) {
461  OutputStreamingLogger *next_logger = logger->next;
462  SCFree(logger);
463  logger = next_logger;
464  }
465  list = NULL;
466 }
uint16_t flags
#define PACKET_PROFILING_LOGGER_START(p, id)
#define SCLogDebug(...)
Definition: util-debug.h:335
struct Flow_ * flow
Definition: decode.h:445
struct HtpBodyChunk_ * next
#define OUTPUT_STREAMING_FLAG_OPEN
TcpStreamCnf stream_config
Definition: stream-tcp.h:106
#define BUG_ON(x)
uint8_t proto
Definition: flow.h:344
int AppLayerParserGetStateProgressCompletionStatus(AppProto alproto, uint8_t direction)
#define PKT_IS_TOCLIENT(p)
Definition: decode.h:258
LoggerId
int AppLayerParserGetStateProgress(uint8_t ipproto, AppProto alproto, void *alstate, uint8_t flags)
get the progress value for a tx/protocol
OutputLoggerThreadStore * store
void(* ThreadExitPrintStatsFunc)(ThreadVars *, void *)
Definition: tm-modules.h:41
uint64_t AppLayerParserGetTxCnt(const Flow *f, void *alstate)
struct OutputStreamingLogger_ * next
enum OutputStreamingType type
OutputStreamingLogger * logger
OutputLoggerThreadStore * store
Definition: output-file.c:44
#define OUTPUT_STREAMING_FLAG_TOCLIENT
StreamingBufferSegment sbseg
HtpBody request_body
uint8_t FlowGetDisruptionFlags(const Flow *f, uint8_t flags)
get &#39;disruption&#39; flags: GAP/DEPTH/PASS
Definition: flow.c:1077
OutputStreamingType
void * protoctx
Definition: flow.h:400
int StreamReassembleLog(TcpSession *ssn, TcpStream *stream, StreamReassembleRawFunc Callback, void *cb_data, uint64_t progress_in, uint64_t *progress_out, bool eof)
TmEcode(* ThreadDeinitFunc)(ThreadVars *, void *)
Definition: tm-modules.h:40
void * alstate
Definition: flow.h:438
void * AppLayerParserGetTx(uint8_t ipproto, AppProto alproto, void *alstate, uint64_t tx_id)
ThreadExitPrintStatsFunc ThreadExitPrintStats
HtpBody response_body
struct OutputLoggerThreadStore_ * next
Definition: output-file.c:38
int OutputRegisterStreamingLogger(LoggerId id, const char *name, StreamingLogger LogFunc, OutputCtx *output_ctx, enum OutputStreamingType type, ThreadInitFunc ThreadInit, ThreadDeinitFunc ThreadDeinit, ThreadExitPrintStatsFunc ThreadExitPrintStats)
uint8_t type
StreamingBuffer * sb
void OutputRegisterRootLogger(ThreadInitFunc ThreadInit, ThreadDeinitFunc ThreadDeinit, ThreadExitPrintStatsFunc ThreadExitPrintStats, OutputLogFunc LogFunc)
Definition: output.c:1002
#define STREAM_TOCLIENT
Definition: stream.h:32
#define PKT_IS_TOSERVER(p)
Definition: decode.h:257
htp_conn_t * conn
#define PKT_PSEUDO_STREAM_END
Definition: decode.h:1091
HtpBodyChunk * last
#define SCReturnInt(x)
Definition: util-debug.h:341
#define OUTPUT_STREAMING_FLAG_CLOSE
enum OutputStreamingType type
void StreamingBufferSegmentGetData(const StreamingBuffer *sb, const StreamingBufferSegment *seg, const uint8_t **data, uint32_t *data_len)
void OutputStreamingLoggerRegister(void)
int(* StreamingLogger)(ThreadVars *, void *thread_data, const Flow *f, const uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags)
#define SCMalloc(a)
Definition: util-mem.h:222
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:254
uint32_t log_progress_rel
#define SCFree(a)
Definition: util-mem.h:322
struct OutputLoggerThreadData_ OutputLoggerThreadData
bool streaming_log_api
Definition: stream-tcp.h:65
uint16_t tx_id
uint64_t ts
#define STREAM_TOSERVER
Definition: stream.h:31
ThreadInitFunc ThreadInit
#define OUTPUT_STREAMING_FLAG_TRANSACTION
HtpBodyChunk * first
Per thread variable structure.
Definition: threadvars.h:57
TmEcode(* ThreadInitFunc)(ThreadVars *, const void *, void **)
Definition: tm-modules.h:39
AppProto alproto
application level protocol
Definition: flow.h:409
#define PACKET_PROFILING_LOGGER_END(p, id)
uint32_t flags
Definition: decode.h:443
StreamingLogger LogFunc
int StreamTcpInlineMode(void)
See if stream engine is operating in inline mode.
Definition: stream-tcp.c:6280
ThreadDeinitFunc ThreadDeinit
Flow data structure.
Definition: flow.h:325
#define OUTPUT_STREAMING_FLAG_TOSERVER
struct StreamerCallbackData_ StreamerCallbackData
struct OutputStreamingLogger_ OutputStreamingLogger
#define STREAM_LOG_PROGRESS(stream)
void OutputStreamingShutdown(void)
#define DEBUG_VALIDATE_BUG_ON(exp)
struct OutputLoggerThreadStore_ OutputLoggerThreadStore