suricata
util-log-redis.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2021 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Paulo Pacheco <fooinha@gmail.com>
22  *
23  * File-like output for logging: redis
24  */
25 #include "suricata-common.h" /* errno.h, string.h, etc. */
26 #include "util-log-redis.h"
27 #include "util-logopenfile.h"
28 #include "util-byte.h"
29 #include "util-debug.h"
30 
31 #ifdef HAVE_LIBHIREDIS
32 
33 #ifdef HAVE_LIBEVENT_PTHREADS
34 #include <event2/thread.h>
35 #endif /* HAVE_LIBEVENT_PTHREADS */
36 
37 static const char * redis_lpush_cmd = "LPUSH";
38 static const char * redis_rpush_cmd = "RPUSH";
39 static const char * redis_publish_cmd = "PUBLISH";
40 static const char * redis_default_key = "suricata";
41 static const char * redis_default_server = "127.0.0.1";
42 
43 static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx);
44 static void SCLogFileCloseRedis(LogFileCtx *log_ctx);
45 
46 /**
47  * \brief SCLogRedisInit() - Initializes global stuff before threads
48  */
49 void SCLogRedisInit(void)
50 {
51 #ifdef HAVE_LIBEVENT_PTHREADS
52  evthread_use_pthreads();
53 #endif /* HAVE_LIBEVENT_PTHREADS */
54 }
55 
56 /** \brief SCLogRedisContextAlloc() - Allocates and initializes redis context
57  */
58 static SCLogRedisContext *SCLogRedisContextAlloc(void)
59 {
60  SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
61  if (ctx == NULL) {
62  FatalError("Unable to allocate redis context");
63  }
64  ctx->sync = NULL;
65 #if HAVE_LIBEVENT
66  ctx->ev_base = NULL;
67  ctx->async = NULL;
68 #endif
69  ctx->batch_count = 0;
70  ctx->last_push = 0;
71  ctx->tried = 0;
72 
73  return ctx;
74 }
75 
76 #ifdef HAVE_LIBEVENT
77 
78 static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx);
79 #include <hiredis/adapters/libevent.h>
80 
81 /** \brief SCLogRedisAsyncContextAlloc() - Allocates and initializes redis context with async
82  */
83 static SCLogRedisContext *SCLogRedisContextAsyncAlloc(void)
84 {
85  SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
86  if (unlikely(ctx == NULL)) {
87  FatalError("Unable to allocate redis context");
88  }
89 
90  ctx->sync = NULL;
91  ctx->async = NULL;
92  ctx->ev_base = NULL;
93  ctx->connected = 0;
94  ctx->batch_count = 0;
95  ctx->last_push = 0;
96  ctx->tried = 0;
97 
98  return ctx;
99 }
100 
101 /** \brief SCRedisAsyncCommandCallback() Callback when reply from redis happens.
102  * \param ac redis async context
103  * \param r redis reply
104  * \param privdata opaque data with pointer to LogFileCtx
105  */
106 static void SCRedisAsyncCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
107 {
108  redisReply *reply = r;
109  LogFileCtx *log_ctx = privdata;
110  SCLogRedisContext *ctx = log_ctx->redis;
111 
112  if (reply == NULL) {
113  if (ctx->connected > 0)
114  SCLogInfo("Missing reply from redis, disconnected.");
115  ctx->connected = 0;
116  } else {
117  ctx->connected = 1;
118  event_base_loopbreak(ctx->ev_base);
119  }
120 }
121 
122 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an ECHO command reply
123  * This is used to check if redis is connected.
124  * \param ac redis async context
125  * \param r redis reply
126  * \param privdata opaque data with pointer to LogFileCtx
127  */
128 static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
129 {
130  redisReply *reply = r;
131  SCLogRedisContext * ctx = privdata;
132 
133  if (reply) {
134  if (ctx->connected == 0) {
135  SCLogNotice("Connected to Redis.");
136  ctx->connected = 1;
137  ctx->tried = 0;
138  }
139  } else {
140  ctx->connected = 0;
141  if (ctx->tried == 0) {
142  SCLogWarning("Failed to connect to Redis... (will keep trying)");
143  }
144  ctx->tried = time(NULL);
145  }
146  event_base_loopbreak(ctx->ev_base);
147 }
148 
149 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
150  * Emits and awaits response for an async ECHO command.
151  * It's used for check if redis is alive.
152  * \param ctx redis context
153  */
154 static void SCLogAsyncRedisSendEcho(SCLogRedisContext * ctx)
155 {
156  redisAsyncCommand(ctx->async, SCRedisAsyncEchoCommandCallback, ctx, "ECHO suricata");
157  event_base_dispatch(ctx->ev_base);
158 }
159 
160 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
161  * This is used to terminate connection with redis.
162  * \param ac redis async context
163  * \param r redis reply
164  * \param privdata opaque data with pointer to LogFileCtx
165  */
166 static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
167 {
168  SCLogInfo("Disconnecting from redis!");
169 }
170 
171 /** \brief QUIT command
172  * Emits and awaits response for an async QUIT command.
173  * It's used to disconnect with redis
174  * \param ctx redis context
175  */
176 static void SCLogAsyncRedisSendQuit(SCLogRedisContext * ctx)
177 {
178  if (ctx->connected) {
179  redisAsyncCommand(ctx->async, SCRedisAsyncQuitCommandCallback, ctx, "QUIT");
180  SCLogInfo("QUIT Command sent to redis. Connection will terminate!");
181  }
182 
183  redisAsyncFree(ctx->async);
184  event_base_dispatch(ctx->ev_base);
185  ctx->async = NULL;
186  event_base_free(ctx->ev_base);
187  ctx->ev_base = NULL;
188  ctx->connected = 0;
189 }
190 
191 /** \brief SCConfLogReopenAsyncRedis() Open or re-opens connection to redis for logging.
192  * \param log_ctx Log file context allocated by caller
193  */
194 static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx)
195 {
196  SCLogRedisContext * ctx = log_ctx->redis;
197  const char *redis_server = log_ctx->redis_setup.server;
198  int redis_port = log_ctx->redis_setup.port;
199 
200  /* only try to reconnect once per second */
201  if (ctx->tried >= time(NULL)) {
202  return -1;
203  }
204 
205  if (strchr(redis_server, '/') == NULL) {
206  ctx->async = redisAsyncConnect(redis_server, redis_port);
207  } else {
208  ctx->async = redisAsyncConnectUnix(redis_server);
209  }
210 
211  if (ctx->ev_base != NULL) {
212  event_base_free(ctx->ev_base);
213  ctx->ev_base = NULL;
214  }
215 
216  if (ctx->async == NULL) {
217  SCLogError("Error allocate redis async.");
218  ctx->tried = time(NULL);
219  return -1;
220  }
221 
222  if (ctx->async != NULL && ctx->async->err) {
223  SCLogError("Error setting to redis async: [%s].", ctx->async->errstr);
224  ctx->tried = time(NULL);
225  return -1;
226  }
227 
228  ctx->ev_base = event_base_new();
229 
230  if (ctx->ev_base == NULL) {
231  ctx->tried = time(NULL);
232  redisAsyncFree(ctx->async);
233  ctx->async = NULL;
234  return -1;
235  }
236 
237  redisLibeventAttach(ctx->async, ctx->ev_base);
238 
239  log_ctx->redis = ctx;
240  log_ctx->Close = SCLogFileCloseRedis;
241  return 0;
242 }
243 
244 
245 /** \brief SCLogRedisWriteAsync() writes string to redis output in async mode
246  * \param file_ctx Log file context allocated by caller
247  * \param string Buffer to output
248  */
249 static int SCLogRedisWriteAsync(LogFileCtx *file_ctx, const char *string, size_t string_len)
250 {
251  SCLogRedisContext *ctx = file_ctx->redis;
252 
253  if (! ctx->connected) {
254  if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
255  return -1;
256  }
257  if (ctx->tried == 0) {
258  SCLogNotice("Trying to connect to Redis");
259  }
260  SCLogAsyncRedisSendEcho(ctx);
261  }
262 
263  if (!ctx->connected) {
264  return -1;
265  }
266 
267  if (ctx->async == NULL) {
268  return -1;
269  }
270 
271  redisAsyncCommand(ctx->async,
272  SCRedisAsyncCommandCallback,
273  file_ctx,
274  "%s %s %s",
275  file_ctx->redis_setup.command,
276  file_ctx->redis_setup.key,
277  string);
278 
279  event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
280 
281  return 0;
282 }
283 
284 #endif// HAVE_LIBEVENT
285 
286 /** \brief SCConfLogReopenSyncRedis() Open or re-opens connection to redis for logging.
287  * \param log_ctx Log file context allocated by caller
288  */
289 static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx)
290 {
291  SCLogRedisContext * ctx = log_ctx->redis;
292 
293  /* only try to reconnect once per second */
294  if (ctx->tried >= time(NULL)) {
295  return -1;
296  }
297 
298  const char *redis_server = log_ctx->redis_setup.server;
299  int redis_port = log_ctx->redis_setup.port;
300 
301  if (ctx->sync != NULL) {
302  redisFree(ctx->sync);
303  }
304 
305  if (strchr(redis_server, '/') == NULL) {
306  ctx->sync = redisConnect(redis_server, redis_port);
307  } else {
308  ctx->sync = redisConnectUnix(redis_server);
309  }
310  if (ctx->sync == NULL) {
311  SCLogError("Error connecting to redis server.");
312  ctx->tried = time(NULL);
313  return -1;
314  }
315  if (ctx->sync->err) {
316  SCLogError("Error connecting to redis server: [%s].", ctx->sync->errstr);
317  redisFree(ctx->sync);
318  ctx->sync = NULL;
319  ctx->tried = time(NULL);
320  return -1;
321  }
322  SCLogInfo("Connected to redis server [%s].", log_ctx->redis_setup.server);
323 
324  log_ctx->redis = ctx;
325  log_ctx->Close = SCLogFileCloseRedis;
326  return 0;
327 }
328 /** \brief SCLogRedisWriteSync() writes string to redis output in sync mode
329  * \param file_ctx Log file context allocated by caller
330  * \param string Buffer to output
331  */
332 static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
333 {
334  SCLogRedisContext * ctx = file_ctx->redis;
335  int ret = -1;
336  redisContext *redis = ctx->sync;
337  if (redis == NULL) {
338  SCConfLogReopenSyncRedis(file_ctx);
339  redis = ctx->sync;
340  if (redis == NULL) {
341  SCLogDebug("Redis after re-open is not available.");
342  return -1;
343  }
344  }
345 
346  /* synchronous mode */
347  if (file_ctx->redis_setup.batch_size) {
348  redisAppendCommand(redis, "%s %s %s",
349  file_ctx->redis_setup.command,
350  file_ctx->redis_setup.key,
351  string);
352  time_t now = time(NULL);
353  if ((ctx->batch_count == file_ctx->redis_setup.batch_size) || (ctx->last_push < now)) {
354  redisReply *reply;
355  int i;
356  int batch_size = ctx->batch_count;
357  ctx->batch_count = 0;
358  ctx->last_push = now;
359  for (i = 0; i <= batch_size; i++) {
360  if (redisGetReply(redis, (void **)&reply) == REDIS_OK) {
361  freeReplyObject(reply);
362  ret = 0;
363  } else {
364  if (redis->err) {
365  SCLogInfo("Error when fetching reply: %s (%d)",
366  redis->errstr,
367  redis->err);
368  }
369  switch (redis->err) {
370  case REDIS_ERR_EOF:
371  case REDIS_ERR_IO:
372  SCLogInfo("Reopening connection to redis server");
373  SCConfLogReopenSyncRedis(file_ctx);
374  redis = ctx->sync;
375  if (redis) {
376  SCLogInfo("Reconnected to redis server");
377  redisAppendCommand(redis, "%s %s %s",
378  file_ctx->redis_setup.command,
379  file_ctx->redis_setup.key,
380  string);
381  ctx->batch_count++;
382  return 0;
383  } else {
384  SCLogInfo("Unable to reconnect to redis server");
385  return -1;
386  }
387  break;
388  default:
389  SCLogWarning("Unsupported error code %d", redis->err);
390  return -1;
391  }
392  }
393  }
394  } else {
395  ctx->batch_count++;
396  }
397  } else {
398  redisReply *reply = redisCommand(redis, "%s %s %s",
399  file_ctx->redis_setup.command,
400  file_ctx->redis_setup.key,
401  string);
402  /* We may lose the reply if disconnection happens*/
403  if (reply) {
404  switch (reply->type) {
405  case REDIS_REPLY_ERROR:
406  SCLogWarning("Redis error: %s", reply->str);
407  SCConfLogReopenSyncRedis(file_ctx);
408  break;
409  case REDIS_REPLY_INTEGER:
410  SCLogDebug("Redis integer %lld", reply->integer);
411  ret = 0;
412  break;
413  default:
414  SCLogError("Redis default triggered with %d", reply->type);
415  SCConfLogReopenSyncRedis(file_ctx);
416  break;
417  }
418  freeReplyObject(reply);
419  } else {
420  SCConfLogReopenSyncRedis(file_ctx);
421  }
422  }
423  return ret;
424 }
425 
426 /**
427  * \brief LogFileWriteRedis() writes log data to redis output.
428  * \param log_ctx Log file context allocated by caller
429  * \param string buffer with data to write
430  * \param string_len data length
431  * \retval 0 on success;
432  * \retval -1 on failure;
433  */
434 int LogFileWriteRedis(void *lf_ctx, const char *string, size_t string_len)
435 {
436  LogFileCtx *file_ctx = lf_ctx;
437  if (file_ctx == NULL) {
438  return -1;
439  }
440 
441 #if HAVE_LIBEVENT
442  /* async mode on */
443  if (file_ctx->redis_setup.is_async) {
444  return SCLogRedisWriteAsync(file_ctx, string, string_len);
445  }
446 #endif
447  /* sync mode */
448  if (! file_ctx->redis_setup.is_async) {
449  return SCLogRedisWriteSync(file_ctx, string);
450  }
451  return -1;
452 }
453 
454 /** \brief configure and initializes redis output logging
455  * \param conf ConfNode structure for the output section in question
456  * \param log_ctx Log file context allocated by caller
457  * \retval 0 on success
458  */
459 int SCConfLogOpenRedis(ConfNode *redis_node, void *lf_ctx)
460 {
461  LogFileCtx *log_ctx = lf_ctx;
462 
463  if (log_ctx->threaded) {
464  FatalError("redis does not support threaded output");
465  }
466 
467  const char *redis_port = NULL;
468  const char *redis_mode = NULL;
469 
470  int is_async = 0;
471 
472  if (redis_node) {
473  log_ctx->redis_setup.server = ConfNodeLookupChildValue(redis_node, "server");
474  log_ctx->redis_setup.key = ConfNodeLookupChildValue(redis_node, "key");
475 
476  redis_port = ConfNodeLookupChildValue(redis_node, "port");
477  redis_mode = ConfNodeLookupChildValue(redis_node, "mode");
478 
479  (void)ConfGetChildValueBool(redis_node, "async", &is_async);
480  }
481  if (!log_ctx->redis_setup.server) {
482  log_ctx->redis_setup.server = redis_default_server;
483  SCLogInfo("Using default redis server (127.0.0.1)");
484  }
485  if (!redis_port)
486  redis_port = "6379";
487  if (!redis_mode)
488  redis_mode = "list";
489  if (!log_ctx->redis_setup.key) {
490  log_ctx->redis_setup.key = redis_default_key;
491  }
492 
493 #ifndef HAVE_LIBEVENT
494  if (is_async) {
495  SCLogWarning("async option not available.");
496  }
497  is_async = 0;
498 #endif //ifndef HAVE_LIBEVENT
499 
500  log_ctx->redis_setup.is_async = is_async;
501  log_ctx->redis_setup.batch_size = 0;
502  if (redis_node) {
503  ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining");
504  if (pipelining) {
505  int enabled = 0;
506  int ret;
507  intmax_t val;
508  ret = ConfGetChildValueBool(pipelining, "enabled", &enabled);
509  if (ret && enabled) {
510  ret = ConfGetChildValueInt(pipelining, "batch-size", &val);
511  if (ret) {
512  log_ctx->redis_setup.batch_size = val;
513  } else {
514  log_ctx->redis_setup.batch_size = 10;
515  }
516  }
517  }
518  } else {
519  log_ctx->redis_setup.batch_size = 0;
520  }
521 
522  if (!strcmp(redis_mode, "list") || !strcmp(redis_mode,"lpush")) {
523  log_ctx->redis_setup.command = redis_lpush_cmd;
524  } else if(!strcmp(redis_mode, "rpush")){
525  log_ctx->redis_setup.command = redis_rpush_cmd;
526  } else if(!strcmp(redis_mode,"channel") || !strcmp(redis_mode,"publish")) {
527  log_ctx->redis_setup.command = redis_publish_cmd;
528  } else {
529  FatalError("Invalid redis mode");
530  }
531 
532  /* store server params for reconnection */
533  if (!log_ctx->redis_setup.server) {
534  FatalError("Error allocating redis server string");
535  }
536  if (StringParseUint16(&log_ctx->redis_setup.port, 10, 0, (const char *)redis_port) < 0) {
537  FatalError("Invalid value for redis port: %s", redis_port);
538  }
539  log_ctx->Close = SCLogFileCloseRedis;
540 
541 #ifdef HAVE_LIBEVENT
542  if (is_async) {
543  log_ctx->redis = SCLogRedisContextAsyncAlloc();
544  }
545 #endif /*HAVE_LIBEVENT*/
546  if (! is_async) {
547  log_ctx->redis = SCLogRedisContextAlloc();
548  SCConfLogReopenSyncRedis(log_ctx);
549  }
550  return 0;
551 }
552 
553 /** \brief SCLogFileCloseRedis() Closes redis log more
554  * \param log_ctx Log file context allocated by caller
555  */
556 void SCLogFileCloseRedis(LogFileCtx *log_ctx)
557 {
558  SCLogRedisContext * ctx = log_ctx->redis;
559  if (ctx == NULL) {
560  return;
561  }
562  /* asynchronous */
563  if (log_ctx->redis_setup.is_async) {
564 #if HAVE_LIBEVENT == 1
565  if (ctx->async) {
566  if (ctx->connected > 0) {
567  SCLogAsyncRedisSendQuit(ctx);
568  }
569  if (ctx->ev_base != NULL) {
570  event_base_free(ctx->ev_base);
571  ctx->ev_base = NULL;
572  }
573  }
574 #endif
575  }
576 
577  /* synchronous */
578  if (!log_ctx->redis_setup.is_async) {
579  if (ctx->sync) {
580  redisReply *reply;
581  int i;
582  for (i = 0; i < ctx->batch_count; i++) {
583  redisGetReply(ctx->sync, (void **)&reply);
584  if (reply) {
585  freeReplyObject(reply);
586  }
587  }
588  redisFree(ctx->sync);
589  ctx->sync = NULL;
590  }
591  ctx->tried = 0;
592  ctx->batch_count = 0;
593  }
594 
595  if (ctx != NULL) {
596  SCFree(ctx);
597  }
598 }
599 
600 #endif //#ifdef HAVE_LIBHIREDIS
util-byte.h
ConfGetChildValueInt
int ConfGetChildValueInt(const ConfNode *base, const char *name, intmax_t *val)
Definition: conf.c:434
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
LogFileCtx_
Definition: util-logopenfile.h:76
StringParseUint16
int StringParseUint16(uint16_t *res, int base, size_t len, const char *str)
Definition: util-byte.c:337
util-log-redis.h
util-debug.h
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
SCLogInfo
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:224
ConfNodeLookupChild
ConfNode * ConfNodeLookupChild(const ConfNode *node, const char *name)
Lookup a child configuration node by name.
Definition: conf.c:786
suricata-common.h
FatalError
#define FatalError(...)
Definition: util-debug.h:502
ConfGetChildValueBool
int ConfGetChildValueBool(const ConfNode *base, const char *name, int *val)
Definition: conf.c:501
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
SCFree
#define SCFree(p)
Definition: util-mem.h:61
ConfNode_
Definition: conf.h:32
util-logopenfile.h
LogFileCtx_::Close
void(* Close)(struct LogFileCtx_ *fp)
Definition: util-logopenfile.h:92
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
LogFileCtx_::threaded
bool threaded
Definition: util-logopenfile.h:101
ConfNodeLookupChildValue
const char * ConfNodeLookupChildValue(const ConfNode *node, const char *name)
Lookup the value of a child configuration node by name.
Definition: conf.c:814