suricata
util-log-redis.c
Go to the documentation of this file.
1 /* vi: set et ts=4: */
2 /* Copyright (C) 2007-2016 Open Information Security Foundation
3  *
4  * You can copy, redistribute or modify this Program under the terms of
5  * the GNU General Public License version 2 as published by the Free
6  * Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * version 2 along with this program; if not, write to the Free Software
15  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16  * 02110-1301, USA.
17  */
18 
19 /**
20  * \file
21  *
22  * \author Paulo Pacheco <fooinha@gmail.com>
23  *
24  * File-like output for logging: redis
25  */
26 #include "suricata-common.h" /* errno.h, string.h, etc. */
27 #include "util-log-redis.h"
28 #include "util-logopenfile.h"
29 #include "util-byte.h"
30 
31 #ifdef HAVE_LIBHIREDIS
32 
33 #ifdef HAVE_LIBEVENT_PTHREADS
34 #include <event2/thread.h>
35 #endif /* HAVE_LIBEVENT_PTHREADS */
36 
37 static const char * redis_lpush_cmd = "LPUSH";
38 static const char * redis_rpush_cmd = "RPUSH";
39 static const char * redis_publish_cmd = "PUBLISH";
40 static const char * redis_default_key = "suricata";
41 static const char * redis_default_server = "127.0.0.1";
42 
43 static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx);
44 static void SCLogFileCloseRedis(LogFileCtx *log_ctx);
45 
46 /**
47  * \brief SCLogRedisInit() - Initializes global stuff before threads
48  */
49 void SCLogRedisInit()
50 {
51 #ifdef HAVE_LIBEVENT_PTHREADS
52  evthread_use_pthreads();
53 #endif /* HAVE_LIBEVENT_PTHREADS */
54 }
55 
56 /** \brief SCLogRedisContextAlloc() - Allocates and initalizes redis context
57  */
58 static SCLogRedisContext *SCLogRedisContextAlloc(void)
59 {
60  SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
61  if (ctx == NULL) {
62  FatalError(SC_ERR_FATAL, "Unable to allocate redis context");
63  }
64  ctx->sync = NULL;
65 #if HAVE_LIBEVENT
66  ctx->ev_base = NULL;
67  ctx->async = NULL;
68 #endif
69  ctx->batch_count = 0;
70  ctx->last_push = 0;
71  ctx->tried = 0;
72 
73  return ctx;
74 }
75 
76 #ifdef HAVE_LIBEVENT
77 
78 static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx);
79 #include <hiredis/adapters/libevent.h>
80 
81 /** \brief SCLogRedisAsyncContextAlloc() - Allocates and initalizes redis context with async
82  */
83 static SCLogRedisContext *SCLogRedisContextAsyncAlloc(void)
84 {
85  SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
86  if (unlikely(ctx == NULL)) {
87  FatalError(SC_ERR_FATAL, "Unable to allocate redis context");
88  }
89 
90  ctx->sync = NULL;
91  ctx->async = NULL;
92  ctx->ev_base = NULL;
93  ctx->connected = 0;
94  ctx->batch_count = 0;
95  ctx->last_push = 0;
96  ctx->tried = 0;
97 
98  return ctx;
99 }
100 
101 /** \brief SCRedisAsyncCommandCallback() Callback when reply from redis happens.
102  * \param ac redis async context
103  * \param r redis reply
104  * \param privvata opaque datq with pointer to LogFileCtx
105  */
106 static void SCRedisAsyncCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
107 {
108  redisReply *reply = r;
109  LogFileCtx *log_ctx = privdata;
110  SCLogRedisContext *ctx = log_ctx->redis;
111 
112  if (reply == NULL) {
113  if (ctx->connected > 0)
114  SCLogInfo("Missing reply from redis, disconnected.");
115  ctx->connected = 0;
116  } else {
117  ctx->connected = 1;
118  event_base_loopbreak(ctx->ev_base);
119  }
120 }
121 
122 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an ECHO command reply
123  * This is used to check if redis is connected.
124  * \param ac redis async context
125  * \param r redis reply
126  * \param privvata opaque datq with pointer to LogFileCtx
127  */
128 static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
129 {
130  redisReply *reply = r;
131  SCLogRedisContext * ctx = privdata;
132 
133  if (reply) {
134  if (ctx->connected == 0) {
135  SCLogNotice("Connected to Redis.");
136  ctx->connected = 1;
137  ctx->tried = 0;
138  }
139  } else {
140  ctx->connected = 0;
141  if (ctx->tried == 0) {
142  SCLogWarning(SC_ERR_SOCKET, "Failed to connect to Redis... (will keep trying)");
143  }
144  ctx->tried = time(NULL);
145  }
146  event_base_loopbreak(ctx->ev_base);
147 }
148 
149 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
150  * Emits and awaits response for an async ECHO command.
151  * It's used for check if redis is alive.
152  * \param ctx redis context
153  */
154 static void SCLogAsyncRedisSendEcho(SCLogRedisContext * ctx)
155 {
156  redisAsyncCommand(ctx->async, SCRedisAsyncEchoCommandCallback, ctx, "ECHO suricata");
157  event_base_dispatch(ctx->ev_base);
158 }
159 
160 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
161  * This is used to terminate connection with redis.
162  * \param ac redis async context
163  * \param r redis reply
164  * \param privvata opaque datq with pointer to LogFileCtx
165  */
166 static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
167 {
168  SCLogInfo("Disconnecting from redis!");
169 }
170 
171 /** \brief QUIT command
172  * Emits and awaits response for an async QUIT command.
173  * It's used to disconnect with redis
174  * \param ctx redis context
175  */
176 static void SCLogAsyncRedisSendQuit(SCLogRedisContext * ctx)
177 {
178  if (ctx->connected) {
179  redisAsyncCommand(ctx->async, SCRedisAsyncQuitCommandCallback, ctx, "QUIT");
180  SCLogInfo("QUIT Command sent to redis. Connection will terminate!");
181  }
182 
183  redisAsyncFree(ctx->async);
184  event_base_dispatch(ctx->ev_base);
185  ctx->async = NULL;
186  event_base_free(ctx->ev_base);
187  ctx->ev_base = NULL;
188  ctx->connected = 0;
189 }
190 
191 /** \brief SCConfLogReopenAsyncRedis() Open or re-opens connection to redis for logging.
192  * \param log_ctx Log file context allocated by caller
193  */
194 static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx)
195 {
196  SCLogRedisContext * ctx = log_ctx->redis;
197  const char *redis_server = log_ctx->redis_setup.server;
198  int redis_port = log_ctx->redis_setup.port;
199 
200  /* only try to reconnect once per second */
201  if (ctx->tried >= time(NULL)) {
202  return -1;
203  }
204 
205  if (strchr(redis_server, '/') == NULL) {
206  ctx->async = redisAsyncConnect(redis_server, redis_port);
207  } else {
208  ctx->async = redisAsyncConnectUnix(redis_server);
209  }
210 
211  if (ctx->ev_base != NULL) {
212  event_base_free(ctx->ev_base);
213  }
214 
215  if (ctx->async == NULL) {
216  SCLogError(SC_ERR_MEM_ALLOC, "Error allocate redis async.");
217  ctx->tried = time(NULL);
218  return -1;
219  }
220 
221  if (ctx->async != NULL && ctx->async->err) {
222  SCLogError(SC_ERR_SOCKET, "Error setting to redis async: [%s].", ctx->async->errstr);
223  ctx->tried = time(NULL);
224  return -1;
225  }
226 
227  ctx->ev_base = event_base_new();
228 
229  if (ctx->ev_base == NULL) {
230  ctx->tried = time(NULL);
231  redisAsyncFree(ctx->async);
232  ctx->async = NULL;
233  return -1;
234  }
235 
236  redisLibeventAttach(ctx->async, ctx->ev_base);
237 
238  log_ctx->redis = ctx;
239  log_ctx->Close = SCLogFileCloseRedis;
240  return 0;
241 }
242 
243 
244 /** \brief SCLogRedisWriteAsync() writes string to redis output in async mode
245  * \param file_ctx Log file context allocated by caller
246  * \param string Buffer to output
247  */
248 static int SCLogRedisWriteAsync(LogFileCtx *file_ctx, const char *string, size_t string_len)
249 {
250  SCLogRedisContext *ctx = file_ctx->redis;
251 
252  if (! ctx->connected) {
253  if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
254  return -1;
255  }
256  if (ctx->tried == 0) {
257  SCLogNotice("Trying to connect to Redis");
258  }
259  SCLogAsyncRedisSendEcho(ctx);
260  }
261 
262  if (!ctx->connected) {
263  return -1;
264  }
265 
266  if (ctx->async == NULL) {
267  return -1;
268  }
269 
270  redisAsyncCommand(ctx->async,
271  SCRedisAsyncCommandCallback,
272  file_ctx,
273  "%s %s %s",
274  file_ctx->redis_setup.command,
275  file_ctx->redis_setup.key,
276  string);
277 
278  event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
279 
280  return 0;
281 }
282 
283 #endif// HAVE_LIBEVENT
284 
285 /** \brief SCConfLogReopenSyncRedis() Open or re-opens connection to redis for logging.
286  * \param log_ctx Log file context allocated by caller
287  */
288 static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx)
289 {
290  SCLogRedisContext * ctx = log_ctx->redis;
291 
292  /* only try to reconnect once per second */
293  if (ctx->tried >= time(NULL)) {
294  return -1;
295  }
296 
297  const char *redis_server = log_ctx->redis_setup.server;
298  int redis_port = log_ctx->redis_setup.port;
299 
300  if (ctx->sync != NULL) {
301  redisFree(ctx->sync);
302  }
303 
304  if (strchr(redis_server, '/') == NULL) {
305  ctx->sync = redisConnect(redis_server, redis_port);
306  } else {
307  ctx->sync = redisConnectUnix(redis_server);
308  }
309  if (ctx->sync == NULL) {
310  SCLogError(SC_ERR_SOCKET, "Error connecting to redis server.");
311  ctx->tried = time(NULL);
312  return -1;
313  }
314  if (ctx->sync->err) {
315  SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: [%s].", ctx->sync->errstr);
316  redisFree(ctx->sync);
317  ctx->sync = NULL;
318  ctx->tried = time(NULL);
319  return -1;
320  }
321  SCLogInfo("Connected to redis server [%s].", log_ctx->redis_setup.server);
322 
323  log_ctx->redis = ctx;
324  log_ctx->Close = SCLogFileCloseRedis;
325  return 0;
326 }
327 /** \brief SCLogRedisWriteSync() writes string to redis output in sync mode
328  * \param file_ctx Log file context allocated by caller
329  * \param string Buffer to output
330  */
331 static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
332 {
333  SCLogRedisContext * ctx = file_ctx->redis;
334  int ret = -1;
335  redisContext *redis = ctx->sync;
336  if (redis == NULL) {
337  SCConfLogReopenSyncRedis(file_ctx);
338  redis = ctx->sync;
339  if (redis == NULL) {
340  SCLogDebug("Redis after re-open is not available.");
341  return -1;
342  }
343  }
344 
345  /* synchronous mode */
346  if (file_ctx->redis_setup.batch_size) {
347  redisAppendCommand(redis, "%s %s %s",
348  file_ctx->redis_setup.command,
349  file_ctx->redis_setup.key,
350  string);
351  time_t now = time(NULL);
352  if ((ctx->batch_count == file_ctx->redis_setup.batch_size) || (ctx->last_push < now)) {
353  redisReply *reply;
354  int i;
355  int batch_size = ctx->batch_count;
356  ctx->batch_count = 0;
357  ctx->last_push = now;
358  for (i = 0; i <= batch_size; i++) {
359  if (redisGetReply(redis, (void **)&reply) == REDIS_OK) {
360  freeReplyObject(reply);
361  ret = 0;
362  } else {
363  if (redis->err) {
364  SCLogInfo("Error when fetching reply: %s (%d)",
365  redis->errstr,
366  redis->err);
367  }
368  switch (redis->err) {
369  case REDIS_ERR_EOF:
370  case REDIS_ERR_IO:
371  SCLogInfo("Reopening connection to redis server");
372  SCConfLogReopenSyncRedis(file_ctx);
373  redis = ctx->sync;
374  if (redis) {
375  SCLogInfo("Reconnected to redis server");
376  redisAppendCommand(redis, "%s %s %s",
377  file_ctx->redis_setup.command,
378  file_ctx->redis_setup.key,
379  string);
380  ctx->batch_count++;
381  return 0;
382  } else {
383  SCLogInfo("Unable to reconnect to redis server");
384  return -1;
385  }
386  break;
387  default:
389  "Unsupported error code %d",
390  redis->err);
391  return -1;
392  }
393  }
394  }
395  } else {
396  ctx->batch_count++;
397  }
398  } else {
399  redisReply *reply = redisCommand(redis, "%s %s %s",
400  file_ctx->redis_setup.command,
401  file_ctx->redis_setup.key,
402  string);
403  /* We may lose the reply if disconnection happens*/
404  if (reply) {
405  switch (reply->type) {
406  case REDIS_REPLY_ERROR:
407  SCLogWarning(SC_ERR_SOCKET, "Redis error: %s", reply->str);
408  SCConfLogReopenSyncRedis(file_ctx);
409  break;
410  case REDIS_REPLY_INTEGER:
411  SCLogDebug("Redis integer %lld", reply->integer);
412  ret = 0;
413  break;
414  default:
416  "Redis default triggered with %d", reply->type);
417  SCConfLogReopenSyncRedis(file_ctx);
418  break;
419  }
420  freeReplyObject(reply);
421  } else {
422  SCConfLogReopenSyncRedis(file_ctx);
423  }
424  }
425  return ret;
426 }
427 
428 /**
429  * \brief LogFileWriteRedis() writes log data to redis output.
430  * \param log_ctx Log file context allocated by caller
431  * \param string buffer with data to write
432  * \param string_len data length
433  * \retval 0 on sucess;
434  * \retval -1 on failure;
435  */
436 int LogFileWriteRedis(void *lf_ctx, const char *string, size_t string_len)
437 {
438  LogFileCtx *file_ctx = lf_ctx;
439  if (file_ctx == NULL) {
440  return -1;
441  }
442 
443 #if HAVE_LIBEVENT
444  /* async mode on */
445  if (file_ctx->redis_setup.is_async) {
446  return SCLogRedisWriteAsync(file_ctx, string, string_len);
447  }
448 #endif
449  /* sync mode */
450  if (! file_ctx->redis_setup.is_async) {
451  return SCLogRedisWriteSync(file_ctx, string);
452  }
453  return -1;
454 }
455 
456 /** \brief configure and initializes redis output logging
457  * \param conf ConfNode structure for the output section in question
458  * \param log_ctx Log file context allocated by caller
459  * \retval 0 on success
460  */
461 int SCConfLogOpenRedis(ConfNode *redis_node, void *lf_ctx)
462 {
463  LogFileCtx *log_ctx = lf_ctx;
464 
465  const char *redis_port = NULL;
466  const char *redis_mode = NULL;
467 
468  int is_async = 0;
469 
470  if (redis_node) {
471  log_ctx->redis_setup.server = ConfNodeLookupChildValue(redis_node, "server");
472  log_ctx->redis_setup.key = ConfNodeLookupChildValue(redis_node, "key");
473 
474  redis_port = ConfNodeLookupChildValue(redis_node, "port");
475  redis_mode = ConfNodeLookupChildValue(redis_node, "mode");
476 
477  (void)ConfGetChildValueBool(redis_node, "async", &is_async);
478  }
479  if (!log_ctx->redis_setup.server) {
480  log_ctx->redis_setup.server = redis_default_server;
481  SCLogInfo("Using default redis server (127.0.0.1)");
482  }
483  if (!redis_port)
484  redis_port = "6379";
485  if (!redis_mode)
486  redis_mode = "list";
487  if (!log_ctx->redis_setup.key) {
488  log_ctx->redis_setup.key = redis_default_key;
489  }
490 
491 #ifndef HAVE_LIBEVENT
492  if (is_async) {
493  SCLogWarning(SC_ERR_NO_REDIS_ASYNC, "async option not available.");
494  }
495  is_async = 0;
496 #endif //ifndef HAVE_LIBEVENT
497 
498  log_ctx->redis_setup.is_async = is_async;
499  log_ctx->redis_setup.batch_size = 0;
500  if (redis_node) {
501  ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining");
502  if (pipelining) {
503  int enabled = 0;
504  int ret;
505  intmax_t val;
506  ret = ConfGetChildValueBool(pipelining, "enabled", &enabled);
507  if (ret && enabled) {
508  ret = ConfGetChildValueInt(pipelining, "batch-size", &val);
509  if (ret) {
510  log_ctx->redis_setup.batch_size = val;
511  } else {
512  log_ctx->redis_setup.batch_size = 10;
513  }
514  }
515  }
516  } else {
517  log_ctx->redis_setup.batch_size = 0;
518  }
519 
520  if (!strcmp(redis_mode, "list") || !strcmp(redis_mode,"lpush")) {
521  log_ctx->redis_setup.command = redis_lpush_cmd;
522  } else if(!strcmp(redis_mode, "rpush")){
523  log_ctx->redis_setup.command = redis_rpush_cmd;
524  } else if(!strcmp(redis_mode,"channel") || !strcmp(redis_mode,"publish")) {
525  log_ctx->redis_setup.command = redis_publish_cmd;
526  } else {
527  FatalError(SC_ERR_FATAL, "Invalid redis mode");
528  }
529 
530  /* store server params for reconnection */
531  if (!log_ctx->redis_setup.server) {
532  FatalError(SC_ERR_FATAL, "Error allocating redis server string");
533  }
534  if (StringParseUint16(&log_ctx->redis_setup.port, 10, 0, (const char *)redis_port) < 0) {
535  FatalError(SC_ERR_INVALID_VALUE, "Invalid value for redis port: %s", redis_port);
536  }
537  log_ctx->Close = SCLogFileCloseRedis;
538 
539 #ifdef HAVE_LIBEVENT
540  if (is_async) {
541  log_ctx->redis = SCLogRedisContextAsyncAlloc();
542  }
543 #endif /*HAVE_LIBEVENT*/
544  if (! is_async) {
545  log_ctx->redis = SCLogRedisContextAlloc();
546  SCConfLogReopenSyncRedis(log_ctx);
547  }
548  return 0;
549 }
550 
551 /** \brief SCLogFileCloseRedis() Closes redis log more
552  * \param log_ctx Log file context allocated by caller
553  */
554 void SCLogFileCloseRedis(LogFileCtx *log_ctx)
555 {
556  SCLogRedisContext * ctx = log_ctx->redis;
557  if (ctx == NULL) {
558  return;
559  }
560  /* asynchronous */
561  if (log_ctx->redis_setup.is_async) {
562 #if HAVE_LIBEVENT == 1
563  if (ctx->async) {
564  if (ctx->connected > 0) {
565  SCLogAsyncRedisSendQuit(ctx);
566  }
567  if (ctx->ev_base != NULL) {
568  event_base_free(ctx->ev_base);
569  ctx->ev_base = NULL;
570  }
571  }
572 #endif
573  }
574 
575  /* synchronous */
576  if (!log_ctx->redis_setup.is_async) {
577  if (ctx->sync) {
578  redisReply *reply;
579  int i;
580  for (i = 0; i < ctx->batch_count; i++) {
581  redisGetReply(ctx->sync, (void **)&reply);
582  if (reply) {
583  freeReplyObject(reply);
584  }
585  }
586  redisFree(ctx->sync);
587  ctx->sync = NULL;
588  }
589  ctx->tried = 0;
590  ctx->batch_count = 0;
591  }
592 
593  if (ctx != NULL) {
594  SCFree(ctx);
595  }
596 }
597 
598 #endif //#ifdef HAVE_LIBHIREDIS
util-byte.h
ConfGetChildValueInt
int ConfGetChildValueInt(const ConfNode *base, const char *name, intmax_t *val)
Definition: conf.c:468
StringParseUint16
int StringParseUint16(uint16_t *res, int base, uint16_t len, const char *str)
Definition: util-byte.c:336
SC_ERR_INVALID_VALUE
@ SC_ERR_INVALID_VALUE
Definition: util-error.h:160
SC_ERR_SOCKET
@ SC_ERR_SOCKET
Definition: util-error.h:232
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:298
LogFileCtx_
Definition: util-logopenfile.h:60
util-log-redis.h
SCLogInfo
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:217
ConfNodeLookupChild
ConfNode * ConfNodeLookupChild(const ConfNode *node, const char *name)
Lookup a child configuration node by name.
Definition: conf.c:814
suricata-common.h
SCLogError
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:257
FatalError
#define FatalError(x,...)
Definition: util-debug.h:532
ConfGetChildValueBool
int ConfGetChildValueBool(const ConfNode *base, const char *name, int *val)
Definition: conf.c:529
SCLogWarning
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:244
SCFree
#define SCFree(p)
Definition: util-mem.h:61
ConfNode_
Definition: conf.h:32
util-logopenfile.h
SC_ERR_FATAL
@ SC_ERR_FATAL
Definition: util-error.h:203
SC_ERR_MEM_ALLOC
@ SC_ERR_MEM_ALLOC
Definition: util-error.h:31
LogFileCtx_::Close
void(* Close)(struct LogFileCtx_ *fp)
Definition: util-logopenfile.h:79
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:232
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
SC_ERR_NO_REDIS_ASYNC
@ SC_ERR_NO_REDIS_ASYNC
Definition: util-error.h:333
ConfNodeLookupChildValue
const char * ConfNodeLookupChildValue(const ConfNode *node, const char *name)
Lookup the value of a child configuration node by name.
Definition: conf.c:842