suricata
util-log-redis.c
Go to the documentation of this file.
1 /* vi: set et ts=4: */
2 /* Copyright (C) 2007-2021 Open Information Security Foundation
3  *
4  * You can copy, redistribute or modify this Program under the terms of
5  * the GNU General Public License version 2 as published by the Free
6  * Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * version 2 along with this program; if not, write to the Free Software
15  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16  * 02110-1301, USA.
17  */
18 
19 /**
20  * \file
21  *
22  * \author Paulo Pacheco <fooinha@gmail.com>
23  *
24  * File-like output for logging: redis
25  */
26 #include "suricata-common.h" /* errno.h, string.h, etc. */
27 #include "util-log-redis.h"
28 #include "util-logopenfile.h"
29 #include "util-byte.h"
30 
31 #ifdef HAVE_LIBHIREDIS
32 
33 #ifdef HAVE_LIBEVENT_PTHREADS
34 #include <event2/thread.h>
35 #endif /* HAVE_LIBEVENT_PTHREADS */
36 
37 static const char * redis_lpush_cmd = "LPUSH";
38 static const char * redis_rpush_cmd = "RPUSH";
39 static const char * redis_publish_cmd = "PUBLISH";
40 static const char * redis_default_key = "suricata";
41 static const char * redis_default_server = "127.0.0.1";
42 
43 static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx);
44 static void SCLogFileCloseRedis(LogFileCtx *log_ctx);
45 
46 /**
47  * \brief SCLogRedisInit() - Initializes global stuff before threads
48  */
49 void SCLogRedisInit()
50 {
51 #ifdef HAVE_LIBEVENT_PTHREADS
52  evthread_use_pthreads();
53 #endif /* HAVE_LIBEVENT_PTHREADS */
54 }
55 
56 /** \brief SCLogRedisContextAlloc() - Allocates and initializes redis context
57  */
58 static SCLogRedisContext *SCLogRedisContextAlloc(void)
59 {
60  SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
61  if (ctx == NULL) {
62  FatalError(SC_ERR_FATAL, "Unable to allocate redis context");
63  }
64  ctx->sync = NULL;
65 #if HAVE_LIBEVENT
66  ctx->ev_base = NULL;
67  ctx->async = NULL;
68 #endif
69  ctx->batch_count = 0;
70  ctx->last_push = 0;
71  ctx->tried = 0;
72 
73  return ctx;
74 }
75 
76 #ifdef HAVE_LIBEVENT
77 
78 static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx);
79 #include <hiredis/adapters/libevent.h>
80 
81 /** \brief SCLogRedisAsyncContextAlloc() - Allocates and initalizes redis context with async
82  */
83 static SCLogRedisContext *SCLogRedisContextAsyncAlloc(void)
84 {
85  SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
86  if (unlikely(ctx == NULL)) {
87  FatalError(SC_ERR_FATAL, "Unable to allocate redis context");
88  }
89 
90  ctx->sync = NULL;
91  ctx->async = NULL;
92  ctx->ev_base = NULL;
93  ctx->connected = 0;
94  ctx->batch_count = 0;
95  ctx->last_push = 0;
96  ctx->tried = 0;
97 
98  return ctx;
99 }
100 
101 /** \brief SCRedisAsyncCommandCallback() Callback when reply from redis happens.
102  * \param ac redis async context
103  * \param r redis reply
104  * \param privvata opaque datq with pointer to LogFileCtx
105  */
106 static void SCRedisAsyncCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
107 {
108  redisReply *reply = r;
109  LogFileCtx *log_ctx = privdata;
110  SCLogRedisContext *ctx = log_ctx->redis;
111 
112  if (reply == NULL) {
113  if (ctx->connected > 0)
114  SCLogInfo("Missing reply from redis, disconnected.");
115  ctx->connected = 0;
116  } else {
117  ctx->connected = 1;
118  event_base_loopbreak(ctx->ev_base);
119  }
120 }
121 
122 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an ECHO command reply
123  * This is used to check if redis is connected.
124  * \param ac redis async context
125  * \param r redis reply
126  * \param privvata opaque datq with pointer to LogFileCtx
127  */
128 static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
129 {
130  redisReply *reply = r;
131  SCLogRedisContext * ctx = privdata;
132 
133  if (reply) {
134  if (ctx->connected == 0) {
135  SCLogNotice("Connected to Redis.");
136  ctx->connected = 1;
137  ctx->tried = 0;
138  }
139  } else {
140  ctx->connected = 0;
141  if (ctx->tried == 0) {
142  SCLogWarning(SC_ERR_SOCKET, "Failed to connect to Redis... (will keep trying)");
143  }
144  ctx->tried = time(NULL);
145  }
146  event_base_loopbreak(ctx->ev_base);
147 }
148 
149 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
150  * Emits and awaits response for an async ECHO command.
151  * It's used for check if redis is alive.
152  * \param ctx redis context
153  */
154 static void SCLogAsyncRedisSendEcho(SCLogRedisContext * ctx)
155 {
156  redisAsyncCommand(ctx->async, SCRedisAsyncEchoCommandCallback, ctx, "ECHO suricata");
157  event_base_dispatch(ctx->ev_base);
158 }
159 
160 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
161  * This is used to terminate connection with redis.
162  * \param ac redis async context
163  * \param r redis reply
164  * \param privvata opaque datq with pointer to LogFileCtx
165  */
166 static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
167 {
168  SCLogInfo("Disconnecting from redis!");
169 }
170 
171 /** \brief QUIT command
172  * Emits and awaits response for an async QUIT command.
173  * It's used to disconnect with redis
174  * \param ctx redis context
175  */
176 static void SCLogAsyncRedisSendQuit(SCLogRedisContext * ctx)
177 {
178  if (ctx->connected) {
179  redisAsyncCommand(ctx->async, SCRedisAsyncQuitCommandCallback, ctx, "QUIT");
180  SCLogInfo("QUIT Command sent to redis. Connection will terminate!");
181  }
182 
183  redisAsyncFree(ctx->async);
184  event_base_dispatch(ctx->ev_base);
185  ctx->async = NULL;
186  event_base_free(ctx->ev_base);
187  ctx->ev_base = NULL;
188  ctx->connected = 0;
189 }
190 
191 /** \brief SCConfLogReopenAsyncRedis() Open or re-opens connection to redis for logging.
192  * \param log_ctx Log file context allocated by caller
193  */
194 static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx)
195 {
196  SCLogRedisContext * ctx = log_ctx->redis;
197  const char *redis_server = log_ctx->redis_setup.server;
198  int redis_port = log_ctx->redis_setup.port;
199 
200  /* only try to reconnect once per second */
201  if (ctx->tried >= time(NULL)) {
202  return -1;
203  }
204 
205  if (strchr(redis_server, '/') == NULL) {
206  ctx->async = redisAsyncConnect(redis_server, redis_port);
207  } else {
208  ctx->async = redisAsyncConnectUnix(redis_server);
209  }
210 
211  if (ctx->ev_base != NULL) {
212  event_base_free(ctx->ev_base);
213  ctx->ev_base = NULL;
214  }
215 
216  if (ctx->async == NULL) {
217  SCLogError(SC_ERR_MEM_ALLOC, "Error allocate redis async.");
218  ctx->tried = time(NULL);
219  return -1;
220  }
221 
222  if (ctx->async != NULL && ctx->async->err) {
223  SCLogError(SC_ERR_SOCKET, "Error setting to redis async: [%s].", ctx->async->errstr);
224  ctx->tried = time(NULL);
225  return -1;
226  }
227 
228  ctx->ev_base = event_base_new();
229 
230  if (ctx->ev_base == NULL) {
231  ctx->tried = time(NULL);
232  redisAsyncFree(ctx->async);
233  ctx->async = NULL;
234  return -1;
235  }
236 
237  redisLibeventAttach(ctx->async, ctx->ev_base);
238 
239  log_ctx->redis = ctx;
240  log_ctx->Close = SCLogFileCloseRedis;
241  return 0;
242 }
243 
244 
245 /** \brief SCLogRedisWriteAsync() writes string to redis output in async mode
246  * \param file_ctx Log file context allocated by caller
247  * \param string Buffer to output
248  */
249 static int SCLogRedisWriteAsync(LogFileCtx *file_ctx, const char *string, size_t string_len)
250 {
251  SCLogRedisContext *ctx = file_ctx->redis;
252 
253  if (! ctx->connected) {
254  if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
255  return -1;
256  }
257  if (ctx->tried == 0) {
258  SCLogNotice("Trying to connect to Redis");
259  }
260  SCLogAsyncRedisSendEcho(ctx);
261  }
262 
263  if (!ctx->connected) {
264  return -1;
265  }
266 
267  if (ctx->async == NULL) {
268  return -1;
269  }
270 
271  redisAsyncCommand(ctx->async,
272  SCRedisAsyncCommandCallback,
273  file_ctx,
274  "%s %s %s",
275  file_ctx->redis_setup.command,
276  file_ctx->redis_setup.key,
277  string);
278 
279  event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
280 
281  return 0;
282 }
283 
284 #endif// HAVE_LIBEVENT
285 
286 /** \brief SCConfLogReopenSyncRedis() Open or re-opens connection to redis for logging.
287  * \param log_ctx Log file context allocated by caller
288  */
289 static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx)
290 {
291  SCLogRedisContext * ctx = log_ctx->redis;
292 
293  /* only try to reconnect once per second */
294  if (ctx->tried >= time(NULL)) {
295  return -1;
296  }
297 
298  const char *redis_server = log_ctx->redis_setup.server;
299  int redis_port = log_ctx->redis_setup.port;
300 
301  if (ctx->sync != NULL) {
302  redisFree(ctx->sync);
303  }
304 
305  if (strchr(redis_server, '/') == NULL) {
306  ctx->sync = redisConnect(redis_server, redis_port);
307  } else {
308  ctx->sync = redisConnectUnix(redis_server);
309  }
310  if (ctx->sync == NULL) {
311  SCLogError(SC_ERR_SOCKET, "Error connecting to redis server.");
312  ctx->tried = time(NULL);
313  return -1;
314  }
315  if (ctx->sync->err) {
316  SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: [%s].", ctx->sync->errstr);
317  redisFree(ctx->sync);
318  ctx->sync = NULL;
319  ctx->tried = time(NULL);
320  return -1;
321  }
322  SCLogInfo("Connected to redis server [%s].", log_ctx->redis_setup.server);
323 
324  log_ctx->redis = ctx;
325  log_ctx->Close = SCLogFileCloseRedis;
326  return 0;
327 }
328 /** \brief SCLogRedisWriteSync() writes string to redis output in sync mode
329  * \param file_ctx Log file context allocated by caller
330  * \param string Buffer to output
331  */
332 static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
333 {
334  SCLogRedisContext * ctx = file_ctx->redis;
335  int ret = -1;
336  redisContext *redis = ctx->sync;
337  if (redis == NULL) {
338  SCConfLogReopenSyncRedis(file_ctx);
339  redis = ctx->sync;
340  if (redis == NULL) {
341  SCLogDebug("Redis after re-open is not available.");
342  return -1;
343  }
344  }
345 
346  /* synchronous mode */
347  if (file_ctx->redis_setup.batch_size) {
348  redisAppendCommand(redis, "%s %s %s",
349  file_ctx->redis_setup.command,
350  file_ctx->redis_setup.key,
351  string);
352  time_t now = time(NULL);
353  if ((ctx->batch_count == file_ctx->redis_setup.batch_size) || (ctx->last_push < now)) {
354  redisReply *reply;
355  int i;
356  int batch_size = ctx->batch_count;
357  ctx->batch_count = 0;
358  ctx->last_push = now;
359  for (i = 0; i <= batch_size; i++) {
360  if (redisGetReply(redis, (void **)&reply) == REDIS_OK) {
361  freeReplyObject(reply);
362  ret = 0;
363  } else {
364  if (redis->err) {
365  SCLogInfo("Error when fetching reply: %s (%d)",
366  redis->errstr,
367  redis->err);
368  }
369  switch (redis->err) {
370  case REDIS_ERR_EOF:
371  case REDIS_ERR_IO:
372  SCLogInfo("Reopening connection to redis server");
373  SCConfLogReopenSyncRedis(file_ctx);
374  redis = ctx->sync;
375  if (redis) {
376  SCLogInfo("Reconnected to redis server");
377  redisAppendCommand(redis, "%s %s %s",
378  file_ctx->redis_setup.command,
379  file_ctx->redis_setup.key,
380  string);
381  ctx->batch_count++;
382  return 0;
383  } else {
384  SCLogInfo("Unable to reconnect to redis server");
385  return -1;
386  }
387  break;
388  default:
390  "Unsupported error code %d",
391  redis->err);
392  return -1;
393  }
394  }
395  }
396  } else {
397  ctx->batch_count++;
398  }
399  } else {
400  redisReply *reply = redisCommand(redis, "%s %s %s",
401  file_ctx->redis_setup.command,
402  file_ctx->redis_setup.key,
403  string);
404  /* We may lose the reply if disconnection happens*/
405  if (reply) {
406  switch (reply->type) {
407  case REDIS_REPLY_ERROR:
408  SCLogWarning(SC_ERR_SOCKET, "Redis error: %s", reply->str);
409  SCConfLogReopenSyncRedis(file_ctx);
410  break;
411  case REDIS_REPLY_INTEGER:
412  SCLogDebug("Redis integer %lld", reply->integer);
413  ret = 0;
414  break;
415  default:
417  "Redis default triggered with %d", reply->type);
418  SCConfLogReopenSyncRedis(file_ctx);
419  break;
420  }
421  freeReplyObject(reply);
422  } else {
423  SCConfLogReopenSyncRedis(file_ctx);
424  }
425  }
426  return ret;
427 }
428 
429 /**
430  * \brief LogFileWriteRedis() writes log data to redis output.
431  * \param log_ctx Log file context allocated by caller
432  * \param string buffer with data to write
433  * \param string_len data length
434  * \retval 0 on success;
435  * \retval -1 on failure;
436  */
437 int LogFileWriteRedis(void *lf_ctx, const char *string, size_t string_len)
438 {
439  LogFileCtx *file_ctx = lf_ctx;
440  if (file_ctx == NULL) {
441  return -1;
442  }
443 
444 #if HAVE_LIBEVENT
445  /* async mode on */
446  if (file_ctx->redis_setup.is_async) {
447  return SCLogRedisWriteAsync(file_ctx, string, string_len);
448  }
449 #endif
450  /* sync mode */
451  if (! file_ctx->redis_setup.is_async) {
452  return SCLogRedisWriteSync(file_ctx, string);
453  }
454  return -1;
455 }
456 
457 /** \brief configure and initializes redis output logging
458  * \param conf ConfNode structure for the output section in question
459  * \param log_ctx Log file context allocated by caller
460  * \retval 0 on success
461  */
462 int SCConfLogOpenRedis(ConfNode *redis_node, void *lf_ctx)
463 {
464  LogFileCtx *log_ctx = lf_ctx;
465 
466  if (log_ctx->threaded) {
467  FatalError(SC_ERR_FATAL, "redis does not support threaded output");
468  }
469 
470  const char *redis_port = NULL;
471  const char *redis_mode = NULL;
472 
473  int is_async = 0;
474 
475  if (redis_node) {
476  log_ctx->redis_setup.server = ConfNodeLookupChildValue(redis_node, "server");
477  log_ctx->redis_setup.key = ConfNodeLookupChildValue(redis_node, "key");
478 
479  redis_port = ConfNodeLookupChildValue(redis_node, "port");
480  redis_mode = ConfNodeLookupChildValue(redis_node, "mode");
481 
482  (void)ConfGetChildValueBool(redis_node, "async", &is_async);
483  }
484  if (!log_ctx->redis_setup.server) {
485  log_ctx->redis_setup.server = redis_default_server;
486  SCLogInfo("Using default redis server (127.0.0.1)");
487  }
488  if (!redis_port)
489  redis_port = "6379";
490  if (!redis_mode)
491  redis_mode = "list";
492  if (!log_ctx->redis_setup.key) {
493  log_ctx->redis_setup.key = redis_default_key;
494  }
495 
496 #ifndef HAVE_LIBEVENT
497  if (is_async) {
498  SCLogWarning(SC_ERR_NO_REDIS_ASYNC, "async option not available.");
499  }
500  is_async = 0;
501 #endif //ifndef HAVE_LIBEVENT
502 
503  log_ctx->redis_setup.is_async = is_async;
504  log_ctx->redis_setup.batch_size = 0;
505  if (redis_node) {
506  ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining");
507  if (pipelining) {
508  int enabled = 0;
509  int ret;
510  intmax_t val;
511  ret = ConfGetChildValueBool(pipelining, "enabled", &enabled);
512  if (ret && enabled) {
513  ret = ConfGetChildValueInt(pipelining, "batch-size", &val);
514  if (ret) {
515  log_ctx->redis_setup.batch_size = val;
516  } else {
517  log_ctx->redis_setup.batch_size = 10;
518  }
519  }
520  }
521  } else {
522  log_ctx->redis_setup.batch_size = 0;
523  }
524 
525  if (!strcmp(redis_mode, "list") || !strcmp(redis_mode,"lpush")) {
526  log_ctx->redis_setup.command = redis_lpush_cmd;
527  } else if(!strcmp(redis_mode, "rpush")){
528  log_ctx->redis_setup.command = redis_rpush_cmd;
529  } else if(!strcmp(redis_mode,"channel") || !strcmp(redis_mode,"publish")) {
530  log_ctx->redis_setup.command = redis_publish_cmd;
531  } else {
532  FatalError(SC_ERR_FATAL, "Invalid redis mode");
533  }
534 
535  /* store server params for reconnection */
536  if (!log_ctx->redis_setup.server) {
537  FatalError(SC_ERR_FATAL, "Error allocating redis server string");
538  }
539  if (StringParseUint16(&log_ctx->redis_setup.port, 10, 0, (const char *)redis_port) < 0) {
540  FatalError(SC_ERR_INVALID_VALUE, "Invalid value for redis port: %s", redis_port);
541  }
542  log_ctx->Close = SCLogFileCloseRedis;
543 
544 #ifdef HAVE_LIBEVENT
545  if (is_async) {
546  log_ctx->redis = SCLogRedisContextAsyncAlloc();
547  }
548 #endif /*HAVE_LIBEVENT*/
549  if (! is_async) {
550  log_ctx->redis = SCLogRedisContextAlloc();
551  SCConfLogReopenSyncRedis(log_ctx);
552  }
553  return 0;
554 }
555 
556 /** \brief SCLogFileCloseRedis() Closes redis log more
557  * \param log_ctx Log file context allocated by caller
558  */
559 void SCLogFileCloseRedis(LogFileCtx *log_ctx)
560 {
561  SCLogRedisContext * ctx = log_ctx->redis;
562  if (ctx == NULL) {
563  return;
564  }
565  /* asynchronous */
566  if (log_ctx->redis_setup.is_async) {
567 #if HAVE_LIBEVENT == 1
568  if (ctx->async) {
569  if (ctx->connected > 0) {
570  SCLogAsyncRedisSendQuit(ctx);
571  }
572  if (ctx->ev_base != NULL) {
573  event_base_free(ctx->ev_base);
574  ctx->ev_base = NULL;
575  }
576  }
577 #endif
578  }
579 
580  /* synchronous */
581  if (!log_ctx->redis_setup.is_async) {
582  if (ctx->sync) {
583  redisReply *reply;
584  int i;
585  for (i = 0; i < ctx->batch_count; i++) {
586  redisGetReply(ctx->sync, (void **)&reply);
587  if (reply) {
588  freeReplyObject(reply);
589  }
590  }
591  redisFree(ctx->sync);
592  ctx->sync = NULL;
593  }
594  ctx->tried = 0;
595  ctx->batch_count = 0;
596  }
597 
598  if (ctx != NULL) {
599  SCFree(ctx);
600  }
601 }
602 
603 #endif //#ifdef HAVE_LIBHIREDIS
util-byte.h
ConfGetChildValueInt
int ConfGetChildValueInt(const ConfNode *base, const char *name, intmax_t *val)
Definition: conf.c:424
SC_ERR_INVALID_VALUE
@ SC_ERR_INVALID_VALUE
Definition: util-error.h:160
SC_ERR_SOCKET
@ SC_ERR_SOCKET
Definition: util-error.h:232
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:296
LogFileCtx_
Definition: util-logopenfile.h:64
StringParseUint16
int StringParseUint16(uint16_t *res, int base, size_t len, const char *str)
Definition: util-byte.c:336
util-log-redis.h
SCLogInfo
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:215
ConfNodeLookupChild
ConfNode * ConfNodeLookupChild(const ConfNode *node, const char *name)
Lookup a child configuration node by name.
Definition: conf.c:770
suricata-common.h
SCLogError
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:255
FatalError
#define FatalError(x,...)
Definition: util-debug.h:530
ConfGetChildValueBool
int ConfGetChildValueBool(const ConfNode *base, const char *name, int *val)
Definition: conf.c:485
SCLogWarning
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:242
SCFree
#define SCFree(p)
Definition: util-mem.h:61
ConfNode_
Definition: conf.h:32
util-logopenfile.h
SC_ERR_FATAL
@ SC_ERR_FATAL
Definition: util-error.h:203
SC_ERR_MEM_ALLOC
@ SC_ERR_MEM_ALLOC
Definition: util-error.h:31
LogFileCtx_::Close
void(* Close)(struct LogFileCtx_ *fp)
Definition: util-logopenfile.h:81
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:230
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
SC_ERR_NO_REDIS_ASYNC
@ SC_ERR_NO_REDIS_ASYNC
Definition: util-error.h:333
LogFileCtx_::threaded
bool threaded
Definition: util-logopenfile.h:90
ConfNodeLookupChildValue
const char * ConfNodeLookupChildValue(const ConfNode *node, const char *name)
Lookup the value of a child configuration node by name.
Definition: conf.c:798