suricata
util-log-redis.c
Go to the documentation of this file.
1 /* vi: set et ts=4: */
2 /* Copyright (C) 2007-2016 Open Information Security Foundation
3  *
4  * You can copy, redistribute or modify this Program under the terms of
5  * the GNU General Public License version 2 as published by the Free
6  * Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * version 2 along with this program; if not, write to the Free Software
15  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16  * 02110-1301, USA.
17  */
18 
19 /**
20  * \file
21  *
22  * \author Paulo Pacheco <fooinha@gmail.com>
23  *
24  * File-like output for logging: redis
25  */
26 #include "suricata-common.h" /* errno.h, string.h, etc. */
27 #include "util-log-redis.h"
28 #include "util-logopenfile.h"
29 
30 #ifdef HAVE_LIBHIREDIS
31 
32 #ifdef HAVE_LIBEVENT_PTHREADS
33 #include <event2/thread.h>
34 #endif /* HAVE_LIBEVENT_PTHREADS */
35 
36 static const char * redis_lpush_cmd = "LPUSH";
37 static const char * redis_rpush_cmd = "RPUSH";
38 static const char * redis_publish_cmd = "PUBLISH";
39 static const char * redis_default_key = "suricata";
40 static const char * redis_default_server = "127.0.0.1";
41 
42 static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx);
43 static void SCLogFileCloseRedis(LogFileCtx *log_ctx);
44 
45 /**
46  * \brief SCLogRedisInit() - Initializes global stuff before threads
47  */
48 void SCLogRedisInit()
49 {
50 #ifdef HAVE_LIBEVENT_PTHREADS
51  evthread_use_pthreads();
52 #endif /* HAVE_LIBEVENT_PTHREADS */
53 }
54 
55 /** \brief SCLogRedisContextAlloc() - Allocates and initalizes redis context
56  */
57 static SCLogRedisContext *SCLogRedisContextAlloc(void)
58 {
59  SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
60  if (ctx == NULL) {
61  SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis context");
62  exit(EXIT_FAILURE);
63  }
64  ctx->sync = NULL;
65 #if HAVE_LIBEVENT
66  ctx->ev_base = NULL;
67  ctx->async = NULL;
68 #endif
69  ctx->batch_count = 0;
70  ctx->tried = 0;
71 
72  return ctx;
73 }
74 
75 #ifdef HAVE_LIBEVENT
76 
77 static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx);
78 #include <hiredis/adapters/libevent.h>
79 
80 /** \brief SCLogRedisAsyncContextAlloc() - Allocates and initalizes redis context with async
81  */
82 static SCLogRedisContext *SCLogRedisContextAsyncAlloc(void)
83 {
84  SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
85  if (unlikely(ctx == NULL)) {
86  SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis context");
87  exit(EXIT_FAILURE);
88  }
89 
90  ctx->sync = NULL;
91  ctx->async = NULL;
92  ctx->ev_base = NULL;
93  ctx->connected = 0;
94  ctx->batch_count = 0;
95  ctx->tried = 0;
96 
97  return ctx;
98 }
99 
100 /** \brief SCRedisAsyncCommandCallback() Callback when reply from redis happens.
101  * \param ac redis async context
102  * \param r redis reply
103  * \param privvata opaque datq with pointer to LogFileCtx
104  */
105 static void SCRedisAsyncCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
106 {
107  redisReply *reply = r;
108  LogFileCtx *log_ctx = privdata;
109  SCLogRedisContext *ctx = log_ctx->redis;
110 
111  if (reply == NULL) {
112  if (ctx->connected > 0)
113  SCLogInfo("Missing reply from redis, disconnected.");
114  ctx->connected = 0;
115  } else {
116  ctx->connected = 1;
117  event_base_loopbreak(ctx->ev_base);
118  }
119 }
120 
121 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an ECHO command reply
122  * This is used to check if redis is connected.
123  * \param ac redis async context
124  * \param r redis reply
125  * \param privvata opaque datq with pointer to LogFileCtx
126  */
127 static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
128 {
129  redisReply *reply = r;
130  SCLogRedisContext * ctx = privdata;
131 
132  if (reply) {
133  if (ctx->connected == 0) {
134  SCLogNotice("Connected to Redis.");
135  ctx->connected = 1;
136  ctx->tried = 0;
137  }
138  } else {
139  ctx->connected = 0;
140  if (ctx->tried == 0) {
141  SCLogWarning(SC_ERR_SOCKET, "Failed to connect to Redis... (will keep trying)");
142  }
143  ctx->tried = time(NULL);
144  }
145  event_base_loopbreak(ctx->ev_base);
146 }
147 
148 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
149  * Emits and awaits response for an async ECHO command.
150  * It's used for check if redis is alive.
151  * \param ctx redis context
152  */
153 static void SCLogAsyncRedisSendEcho(SCLogRedisContext * ctx)
154 {
155  redisAsyncCommand(ctx->async, SCRedisAsyncEchoCommandCallback, ctx, "ECHO suricata");
156  event_base_dispatch(ctx->ev_base);
157 }
158 
159 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
160  * This is used to terminate connection with redis.
161  * \param ac redis async context
162  * \param r redis reply
163  * \param privvata opaque datq with pointer to LogFileCtx
164  */
165 static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
166 {
167  SCLogInfo("Disconnecting from redis!");
168 }
169 
170 /** \brief QUIT command
171  * Emits and awaits response for an async QUIT command.
172  * It's used to disconnect with redis
173  * \param ctx redis context
174  */
175 static void SCLogAsyncRedisSendQuit(SCLogRedisContext * ctx)
176 {
177  if (ctx->connected) {
178  redisAsyncCommand(ctx->async, SCRedisAsyncQuitCommandCallback, ctx, "QUIT");
179  SCLogInfo("QUIT Command sent to redis. Connection will terminate!");
180  }
181 
182  redisAsyncFree(ctx->async);
183  event_base_dispatch(ctx->ev_base);
184  ctx->async = NULL;
185  event_base_free(ctx->ev_base);
186  ctx->ev_base = NULL;
187  ctx->connected = 0;
188 }
189 
190 /** \brief SCConfLogReopenAsyncRedis() Open or re-opens connection to redis for logging.
191  * \param log_ctx Log file context allocated by caller
192  */
193 static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx)
194 {
195  SCLogRedisContext * ctx = log_ctx->redis;
196  const char *redis_server = log_ctx->redis_setup.server;
197  int redis_port = log_ctx->redis_setup.port;
198 
199  /* only try to reconnect once per second */
200  if (ctx->tried >= time(NULL)) {
201  return -1;
202  }
203 
204  ctx->async = redisAsyncConnect(redis_server, redis_port);
205 
206  if (ctx->ev_base != NULL) {
207  event_base_free(ctx->ev_base);
208  }
209 
210  if (ctx->async == NULL) {
211  SCLogError(SC_ERR_MEM_ALLOC, "Error allocate redis async.");
212  ctx->tried = time(NULL);
213  return -1;
214  }
215 
216  if (ctx->async != NULL && ctx->async->err) {
217  SCLogError(SC_ERR_SOCKET, "Error setting to redis async: [%s].", ctx->async->errstr);
218  ctx->tried = time(NULL);
219  return -1;
220  }
221 
222  ctx->ev_base = event_base_new();
223 
224  if (ctx->ev_base == NULL) {
225  ctx->tried = time(NULL);
226  redisAsyncFree(ctx->async);
227  ctx->async = NULL;
228  return -1;
229  }
230 
231  redisLibeventAttach(ctx->async, ctx->ev_base);
232 
233  log_ctx->redis = ctx;
234  log_ctx->Close = SCLogFileCloseRedis;
235  return 0;
236 }
237 
238 
239 /** \brief SCLogRedisWriteAsync() writes string to redis output in async mode
240  * \param file_ctx Log file context allocated by caller
241  * \param string Buffer to output
242  */
243 static int SCLogRedisWriteAsync(LogFileCtx *file_ctx, const char *string, size_t string_len)
244 {
245  SCLogRedisContext *ctx = file_ctx->redis;
246 
247  if (! ctx->connected) {
248  if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
249  return -1;
250  }
251  if (ctx->tried == 0) {
252  SCLogNotice("Trying to connect to Redis");
253  }
254  SCLogAsyncRedisSendEcho(ctx);
255  }
256 
257  if (!ctx->connected) {
258  return -1;
259  }
260 
261  if (ctx->async == NULL) {
262  return -1;
263  }
264 
265  redisAsyncCommand(ctx->async,
266  SCRedisAsyncCommandCallback,
267  file_ctx,
268  "%s %s %s",
269  file_ctx->redis_setup.command,
270  file_ctx->redis_setup.key,
271  string);
272 
273  event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
274 
275  return 0;
276 }
277 
278 #endif// HAVE_LIBEVENT
279 
280 /** \brief SCConfLogReopenSyncRedis() Open or re-opens connection to redis for logging.
281  * \param log_ctx Log file context allocated by caller
282  */
283 static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx)
284 {
285  SCLogRedisContext * ctx = log_ctx->redis;
286 
287  /* only try to reconnect once per second */
288  if (ctx->tried >= time(NULL)) {
289  return -1;
290  }
291 
292  const char *redis_server = log_ctx->redis_setup.server;
293  int redis_port = log_ctx->redis_setup.port;
294 
295  if (ctx->sync != NULL) {
296  redisFree(ctx->sync);
297  }
298  ctx->sync = redisConnect(redis_server, redis_port);
299  if (ctx->sync == NULL) {
300  SCLogError(SC_ERR_SOCKET, "Error connecting to redis server.");
301  ctx->tried = time(NULL);
302  return -1;
303  }
304  if (ctx->sync->err) {
305  SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: [%s].", ctx->sync->errstr);
306  redisFree(ctx->sync);
307  ctx->sync = NULL;
308  ctx->tried = time(NULL);
309  return -1;
310  }
311  SCLogInfo("Connected to redis server [%s].", log_ctx->redis_setup.server);
312 
313  log_ctx->redis = ctx;
314  log_ctx->Close = SCLogFileCloseRedis;
315  return 0;
316 }
317 /** \brief SCLogRedisWriteSync() writes string to redis output in sync mode
318  * \param file_ctx Log file context allocated by caller
319  * \param string Buffer to output
320  */
321 static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
322 {
323  SCLogRedisContext * ctx = file_ctx->redis;
324  int ret = -1;
325  redisContext *redis = ctx->sync;
326  if (redis == NULL) {
327  SCConfLogReopenSyncRedis(file_ctx);
328  redis = ctx->sync;
329  if (redis == NULL) {
330  SCLogDebug("Redis after re-open is not available.");
331  return -1;
332  }
333  }
334 
335  /* synchronous mode */
336  if (file_ctx->redis_setup.batch_size) {
337  redisAppendCommand(redis, "%s %s %s",
338  file_ctx->redis_setup.command,
339  file_ctx->redis_setup.key,
340  string);
341  if (ctx->batch_count == file_ctx->redis_setup.batch_size) {
342  redisReply *reply;
343  int i;
344  ctx->batch_count = 0;
345  for (i = 0; i <= file_ctx->redis_setup.batch_size; i++) {
346  if (redisGetReply(redis, (void **)&reply) == REDIS_OK) {
347  freeReplyObject(reply);
348  ret = 0;
349  } else {
350  if (redis->err) {
351  SCLogInfo("Error when fetching reply: %s (%d)",
352  redis->errstr,
353  redis->err);
354  }
355  switch (redis->err) {
356  case REDIS_ERR_EOF:
357  case REDIS_ERR_IO:
358  SCLogInfo("Reopening connection to redis server");
359  SCConfLogReopenSyncRedis(file_ctx);
360  redis = ctx->sync;
361  if (redis) {
362  SCLogInfo("Reconnected to redis server");
363  } else {
364  SCLogInfo("Unable to reconnect to redis server");
365  return -1;
366  }
367  break;
368  default:
370  "Unsupported error code %d",
371  redis->err);
372  return -1;
373  }
374  }
375  }
376  } else {
377  ctx->batch_count++;
378  }
379  } else {
380  redisReply *reply = redisCommand(redis, "%s %s %s",
381  file_ctx->redis_setup.command,
382  file_ctx->redis_setup.key,
383  string);
384  /* We may lose the reply if disconnection happens*/
385  if (reply) {
386  switch (reply->type) {
387  case REDIS_REPLY_ERROR:
388  SCLogWarning(SC_ERR_SOCKET, "Redis error: %s", reply->str);
389  SCConfLogReopenSyncRedis(file_ctx);
390  break;
391  case REDIS_REPLY_INTEGER:
392  SCLogDebug("Redis integer %lld", reply->integer);
393  ret = 0;
394  break;
395  default:
397  "Redis default triggered with %d", reply->type);
398  SCConfLogReopenSyncRedis(file_ctx);
399  break;
400  }
401  freeReplyObject(reply);
402  } else {
403  SCConfLogReopenSyncRedis(file_ctx);
404  }
405  }
406  return ret;
407 }
408 
409 /**
410  * \brief LogFileWriteRedis() writes log data to redis output.
411  * \param log_ctx Log file context allocated by caller
412  * \param string buffer with data to write
413  * \param string_len data length
414  * \retval 0 on sucess;
415  * \retval -1 on failure;
416  */
417 int LogFileWriteRedis(void *lf_ctx, const char *string, size_t string_len)
418 {
419  LogFileCtx *file_ctx = lf_ctx;
420  if (file_ctx == NULL) {
421  return -1;
422  }
423 
424 #if HAVE_LIBEVENT
425  /* async mode on */
426  if (file_ctx->redis_setup.is_async) {
427  return SCLogRedisWriteAsync(file_ctx, string, string_len);
428  }
429 #endif
430  /* sync mode */
431  if (! file_ctx->redis_setup.is_async) {
432  return SCLogRedisWriteSync(file_ctx, string);
433  }
434  return -1;
435 }
436 
437 /** \brief configure and initializes redis output logging
438  * \param conf ConfNode structure for the output section in question
439  * \param log_ctx Log file context allocated by caller
440  * \retval 0 on success
441  */
442 int SCConfLogOpenRedis(ConfNode *redis_node, void *lf_ctx)
443 {
444  LogFileCtx *log_ctx = lf_ctx;
445 
446  const char *redis_port = NULL;
447  const char *redis_mode = NULL;
448 
449  int is_async = 0;
450 
451  if (redis_node) {
452  log_ctx->redis_setup.server = ConfNodeLookupChildValue(redis_node, "server");
453  log_ctx->redis_setup.key = ConfNodeLookupChildValue(redis_node, "key");
454 
455  redis_port = ConfNodeLookupChildValue(redis_node, "port");
456  redis_mode = ConfNodeLookupChildValue(redis_node, "mode");
457 
458  (void)ConfGetChildValueBool(redis_node, "async", &is_async);
459  }
460  if (!log_ctx->redis_setup.server) {
461  log_ctx->redis_setup.server = redis_default_server;
462  SCLogInfo("Using default redis server (127.0.0.1)");
463  }
464  if (!redis_port)
465  redis_port = "6379";
466  if (!redis_mode)
467  redis_mode = "list";
468  if (!log_ctx->redis_setup.key) {
469  log_ctx->redis_setup.key = redis_default_key;
470  }
471 
472 #ifndef HAVE_LIBEVENT
473  if (is_async) {
474  SCLogWarning(SC_ERR_NO_REDIS_ASYNC, "async option not available.");
475  }
476  is_async = 0;
477 #endif //ifndef HAVE_LIBEVENT
478 
479  log_ctx->redis_setup.is_async = is_async;
480  log_ctx->redis_setup.batch_size = 0;
481  if (redis_node) {
482  ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining");
483  if (pipelining) {
484  int enabled = 0;
485  int ret;
486  intmax_t val;
487  ret = ConfGetChildValueBool(pipelining, "enabled", &enabled);
488  if (ret && enabled) {
489  ret = ConfGetChildValueInt(pipelining, "batch-size", &val);
490  if (ret) {
491  log_ctx->redis_setup.batch_size = val;
492  } else {
493  log_ctx->redis_setup.batch_size = 10;
494  }
495  }
496  }
497  } else {
498  log_ctx->redis_setup.batch_size = 0;
499  }
500 
501  if (!strcmp(redis_mode, "list") || !strcmp(redis_mode,"lpush")) {
502  log_ctx->redis_setup.command = redis_lpush_cmd;
503  } else if(!strcmp(redis_mode, "rpush")){
504  log_ctx->redis_setup.command = redis_rpush_cmd;
505  } else if(!strcmp(redis_mode,"channel") || !strcmp(redis_mode,"publish")) {
506  log_ctx->redis_setup.command = redis_publish_cmd;
507  } else {
508  SCLogError(SC_ERR_REDIS_CONFIG,"Invalid redis mode");
509  exit(EXIT_FAILURE);
510  }
511 
512  /* store server params for reconnection */
513  if (!log_ctx->redis_setup.server) {
514  SCLogError(SC_ERR_MEM_ALLOC, "Error allocating redis server string");
515  exit(EXIT_FAILURE);
516  }
517  log_ctx->redis_setup.port = atoi(redis_port);
518  log_ctx->Close = SCLogFileCloseRedis;
519 
520 #ifdef HAVE_LIBEVENT
521  if (is_async) {
522  log_ctx->redis = SCLogRedisContextAsyncAlloc();
523  }
524 #endif /*HAVE_LIBEVENT*/
525  if (! is_async) {
526  log_ctx->redis = SCLogRedisContextAlloc();
527  SCConfLogReopenSyncRedis(log_ctx);
528  }
529  return 0;
530 }
531 
532 /** \brief SCLogFileCloseRedis() Closes redis log more
533  * \param log_ctx Log file context allocated by caller
534  */
535 void SCLogFileCloseRedis(LogFileCtx *log_ctx)
536 {
537  SCLogRedisContext * ctx = log_ctx->redis;
538  if (ctx == NULL) {
539  return;
540  }
541  /* asynchronous */
542  if (log_ctx->redis_setup.is_async) {
543 #if HAVE_LIBEVENT == 1
544  if (ctx->async) {
545  if (ctx->connected > 0) {
546  SCLogAsyncRedisSendQuit(ctx);
547  }
548  if (ctx->ev_base != NULL) {
549  event_base_free(ctx->ev_base);
550  ctx->ev_base = NULL;
551  }
552  }
553 #endif
554  }
555 
556  /* synchronous */
557  if (!log_ctx->redis_setup.is_async) {
558  if (ctx->sync) {
559  redisReply *reply;
560  int i;
561  for (i = 0; i < ctx->batch_count; i++) {
562  redisGetReply(ctx->sync, (void **)&reply);
563  if (reply) {
564  freeReplyObject(reply);
565  }
566  }
567  redisFree(ctx->sync);
568  ctx->sync = NULL;
569  }
570  ctx->tried = 0;
571  ctx->batch_count = 0;
572  }
573 
574  if (ctx != NULL) {
575  SCFree(ctx);
576  }
577 }
578 
579 #endif //#ifdef HAVE_LIBHIREDIS
#define SCLogDebug(...)
Definition: util-debug.h:335
#define unlikely(expr)
Definition: util-optimize.h:35
ConfNode * ConfNodeLookupChild(const ConfNode *node, const char *name)
Lookup a child configuration node by name.
Definition: conf.c:815
void(* Close)(struct LogFileCtx_ *fp)
#define SCCalloc(nm, a)
Definition: util-mem.h:205
const char * ConfNodeLookupChildValue(const ConfNode *node, const char *name)
Lookup the value of a child configuration node by name.
Definition: conf.c:843
int ConfGetChildValueBool(const ConfNode *base, const char *name, int *val)
Definition: conf.c:530
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:294
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:281
Definition: conf.h:32
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:254
#define SCFree(a)
Definition: util-mem.h:236
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:269
int ConfGetChildValueInt(const ConfNode *base, const char *name, intmax_t *val)
Definition: conf.c:469