suricata
util-log-redis.c
Go to the documentation of this file.
1 /* vi: set et ts=4: */
2 /* Copyright (C) 2007-2021 Open Information Security Foundation
3  *
4  * You can copy, redistribute or modify this Program under the terms of
5  * the GNU General Public License version 2 as published by the Free
6  * Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * version 2 along with this program; if not, write to the Free Software
15  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16  * 02110-1301, USA.
17  */
18 
19 /**
20  * \file
21  *
22  * \author Paulo Pacheco <fooinha@gmail.com>
23  *
24  * File-like output for logging: redis
25  */
26 #include "suricata-common.h" /* errno.h, string.h, etc. */
27 #include "util-log-redis.h"
28 #include "util-logopenfile.h"
29 #include "util-byte.h"
30 #include "util-debug.h"
31 
32 #ifdef HAVE_LIBHIREDIS
33 
34 #ifdef HAVE_LIBEVENT_PTHREADS
35 #include <event2/thread.h>
36 #endif /* HAVE_LIBEVENT_PTHREADS */
37 
38 static const char * redis_lpush_cmd = "LPUSH";
39 static const char * redis_rpush_cmd = "RPUSH";
40 static const char * redis_publish_cmd = "PUBLISH";
41 static const char * redis_default_key = "suricata";
42 static const char * redis_default_server = "127.0.0.1";
43 
44 static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx);
45 static void SCLogFileCloseRedis(LogFileCtx *log_ctx);
46 
47 /**
48  * \brief SCLogRedisInit() - Initializes global stuff before threads
49  */
50 void SCLogRedisInit(void)
51 {
52 #ifdef HAVE_LIBEVENT_PTHREADS
53  evthread_use_pthreads();
54 #endif /* HAVE_LIBEVENT_PTHREADS */
55 }
56 
57 /** \brief SCLogRedisContextAlloc() - Allocates and initializes redis context
58  */
59 static SCLogRedisContext *SCLogRedisContextAlloc(void)
60 {
61  SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
62  if (ctx == NULL) {
63  FatalError("Unable to allocate redis context");
64  }
65  ctx->sync = NULL;
66 #if HAVE_LIBEVENT
67  ctx->ev_base = NULL;
68  ctx->async = NULL;
69 #endif
70  ctx->batch_count = 0;
71  ctx->last_push = 0;
72  ctx->tried = 0;
73 
74  return ctx;
75 }
76 
77 #ifdef HAVE_LIBEVENT
78 
79 static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx);
80 #include <hiredis/adapters/libevent.h>
81 
82 /** \brief SCLogRedisAsyncContextAlloc() - Allocates and initializes redis context with async
83  */
84 static SCLogRedisContext *SCLogRedisContextAsyncAlloc(void)
85 {
86  SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
87  if (unlikely(ctx == NULL)) {
88  FatalError("Unable to allocate redis context");
89  }
90 
91  ctx->sync = NULL;
92  ctx->async = NULL;
93  ctx->ev_base = NULL;
94  ctx->connected = 0;
95  ctx->batch_count = 0;
96  ctx->last_push = 0;
97  ctx->tried = 0;
98 
99  return ctx;
100 }
101 
102 /** \brief SCRedisAsyncCommandCallback() Callback when reply from redis happens.
103  * \param ac redis async context
104  * \param r redis reply
105  * \param privdata opaque data with pointer to LogFileCtx
106  */
107 static void SCRedisAsyncCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
108 {
109  redisReply *reply = r;
110  LogFileCtx *log_ctx = privdata;
111  SCLogRedisContext *ctx = log_ctx->redis;
112 
113  if (reply == NULL) {
114  if (ctx->connected > 0)
115  SCLogInfo("Missing reply from redis, disconnected.");
116  ctx->connected = 0;
117  } else {
118  ctx->connected = 1;
119  event_base_loopbreak(ctx->ev_base);
120  }
121 }
122 
123 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an ECHO command reply
124  * This is used to check if redis is connected.
125  * \param ac redis async context
126  * \param r redis reply
127  * \param privdata opaque data with pointer to LogFileCtx
128  */
129 static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
130 {
131  redisReply *reply = r;
132  SCLogRedisContext * ctx = privdata;
133 
134  if (reply) {
135  if (ctx->connected == 0) {
136  SCLogNotice("Connected to Redis.");
137  ctx->connected = 1;
138  ctx->tried = 0;
139  }
140  } else {
141  ctx->connected = 0;
142  if (ctx->tried == 0) {
143  SCLogWarning("Failed to connect to Redis... (will keep trying)");
144  }
145  ctx->tried = time(NULL);
146  }
147  event_base_loopbreak(ctx->ev_base);
148 }
149 
150 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
151  * Emits and awaits response for an async ECHO command.
152  * It's used for check if redis is alive.
153  * \param ctx redis context
154  */
155 static void SCLogAsyncRedisSendEcho(SCLogRedisContext * ctx)
156 {
157  redisAsyncCommand(ctx->async, SCRedisAsyncEchoCommandCallback, ctx, "ECHO suricata");
158  event_base_dispatch(ctx->ev_base);
159 }
160 
161 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
162  * This is used to terminate connection with redis.
163  * \param ac redis async context
164  * \param r redis reply
165  * \param privdata opaque data with pointer to LogFileCtx
166  */
167 static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
168 {
169  SCLogInfo("Disconnecting from redis!");
170 }
171 
172 /** \brief QUIT command
173  * Emits and awaits response for an async QUIT command.
174  * It's used to disconnect with redis
175  * \param ctx redis context
176  */
177 static void SCLogAsyncRedisSendQuit(SCLogRedisContext * ctx)
178 {
179  if (ctx->connected) {
180  redisAsyncCommand(ctx->async, SCRedisAsyncQuitCommandCallback, ctx, "QUIT");
181  SCLogInfo("QUIT Command sent to redis. Connection will terminate!");
182  }
183 
184  redisAsyncFree(ctx->async);
185  event_base_dispatch(ctx->ev_base);
186  ctx->async = NULL;
187  event_base_free(ctx->ev_base);
188  ctx->ev_base = NULL;
189  ctx->connected = 0;
190 }
191 
192 /** \brief SCConfLogReopenAsyncRedis() Open or re-opens connection to redis for logging.
193  * \param log_ctx Log file context allocated by caller
194  */
195 static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx)
196 {
197  SCLogRedisContext * ctx = log_ctx->redis;
198  const char *redis_server = log_ctx->redis_setup.server;
199  int redis_port = log_ctx->redis_setup.port;
200 
201  /* only try to reconnect once per second */
202  if (ctx->tried >= time(NULL)) {
203  return -1;
204  }
205 
206  if (strchr(redis_server, '/') == NULL) {
207  ctx->async = redisAsyncConnect(redis_server, redis_port);
208  } else {
209  ctx->async = redisAsyncConnectUnix(redis_server);
210  }
211 
212  if (ctx->ev_base != NULL) {
213  event_base_free(ctx->ev_base);
214  ctx->ev_base = NULL;
215  }
216 
217  if (ctx->async == NULL) {
218  SCLogError("Error allocate redis async.");
219  ctx->tried = time(NULL);
220  return -1;
221  }
222 
223  if (ctx->async != NULL && ctx->async->err) {
224  SCLogError("Error setting to redis async: [%s].", ctx->async->errstr);
225  ctx->tried = time(NULL);
226  return -1;
227  }
228 
229  ctx->ev_base = event_base_new();
230 
231  if (ctx->ev_base == NULL) {
232  ctx->tried = time(NULL);
233  redisAsyncFree(ctx->async);
234  ctx->async = NULL;
235  return -1;
236  }
237 
238  redisLibeventAttach(ctx->async, ctx->ev_base);
239 
240  log_ctx->redis = ctx;
241  log_ctx->Close = SCLogFileCloseRedis;
242  return 0;
243 }
244 
245 
246 /** \brief SCLogRedisWriteAsync() writes string to redis output in async mode
247  * \param file_ctx Log file context allocated by caller
248  * \param string Buffer to output
249  */
250 static int SCLogRedisWriteAsync(LogFileCtx *file_ctx, const char *string, size_t string_len)
251 {
252  SCLogRedisContext *ctx = file_ctx->redis;
253 
254  if (! ctx->connected) {
255  if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
256  return -1;
257  }
258  if (ctx->tried == 0) {
259  SCLogNotice("Trying to connect to Redis");
260  }
261  SCLogAsyncRedisSendEcho(ctx);
262  }
263 
264  if (!ctx->connected) {
265  return -1;
266  }
267 
268  if (ctx->async == NULL) {
269  return -1;
270  }
271 
272  redisAsyncCommand(ctx->async,
273  SCRedisAsyncCommandCallback,
274  file_ctx,
275  "%s %s %s",
276  file_ctx->redis_setup.command,
277  file_ctx->redis_setup.key,
278  string);
279 
280  event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
281 
282  return 0;
283 }
284 
285 #endif// HAVE_LIBEVENT
286 
287 /** \brief SCConfLogReopenSyncRedis() Open or re-opens connection to redis for logging.
288  * \param log_ctx Log file context allocated by caller
289  */
290 static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx)
291 {
292  SCLogRedisContext * ctx = log_ctx->redis;
293 
294  /* only try to reconnect once per second */
295  if (ctx->tried >= time(NULL)) {
296  return -1;
297  }
298 
299  const char *redis_server = log_ctx->redis_setup.server;
300  int redis_port = log_ctx->redis_setup.port;
301 
302  if (ctx->sync != NULL) {
303  redisFree(ctx->sync);
304  }
305 
306  if (strchr(redis_server, '/') == NULL) {
307  ctx->sync = redisConnect(redis_server, redis_port);
308  } else {
309  ctx->sync = redisConnectUnix(redis_server);
310  }
311  if (ctx->sync == NULL) {
312  SCLogError("Error connecting to redis server.");
313  ctx->tried = time(NULL);
314  return -1;
315  }
316  if (ctx->sync->err) {
317  SCLogError("Error connecting to redis server: [%s].", ctx->sync->errstr);
318  redisFree(ctx->sync);
319  ctx->sync = NULL;
320  ctx->tried = time(NULL);
321  return -1;
322  }
323  SCLogInfo("Connected to redis server [%s].", log_ctx->redis_setup.server);
324 
325  log_ctx->redis = ctx;
326  log_ctx->Close = SCLogFileCloseRedis;
327  return 0;
328 }
329 /** \brief SCLogRedisWriteSync() writes string to redis output in sync mode
330  * \param file_ctx Log file context allocated by caller
331  * \param string Buffer to output
332  */
333 static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
334 {
335  SCLogRedisContext * ctx = file_ctx->redis;
336  int ret = -1;
337  redisContext *redis = ctx->sync;
338  if (redis == NULL) {
339  SCConfLogReopenSyncRedis(file_ctx);
340  redis = ctx->sync;
341  if (redis == NULL) {
342  SCLogDebug("Redis after re-open is not available.");
343  return -1;
344  }
345  }
346 
347  /* synchronous mode */
348  if (file_ctx->redis_setup.batch_size) {
349  redisAppendCommand(redis, "%s %s %s",
350  file_ctx->redis_setup.command,
351  file_ctx->redis_setup.key,
352  string);
353  time_t now = time(NULL);
354  if ((ctx->batch_count == file_ctx->redis_setup.batch_size) || (ctx->last_push < now)) {
355  redisReply *reply;
356  int i;
357  int batch_size = ctx->batch_count;
358  ctx->batch_count = 0;
359  ctx->last_push = now;
360  for (i = 0; i <= batch_size; i++) {
361  if (redisGetReply(redis, (void **)&reply) == REDIS_OK) {
362  freeReplyObject(reply);
363  ret = 0;
364  } else {
365  if (redis->err) {
366  SCLogInfo("Error when fetching reply: %s (%d)",
367  redis->errstr,
368  redis->err);
369  }
370  switch (redis->err) {
371  case REDIS_ERR_EOF:
372  case REDIS_ERR_IO:
373  SCLogInfo("Reopening connection to redis server");
374  SCConfLogReopenSyncRedis(file_ctx);
375  redis = ctx->sync;
376  if (redis) {
377  SCLogInfo("Reconnected to redis server");
378  redisAppendCommand(redis, "%s %s %s",
379  file_ctx->redis_setup.command,
380  file_ctx->redis_setup.key,
381  string);
382  ctx->batch_count++;
383  return 0;
384  } else {
385  SCLogInfo("Unable to reconnect to redis server");
386  return -1;
387  }
388  break;
389  default:
390  SCLogWarning("Unsupported error code %d", redis->err);
391  return -1;
392  }
393  }
394  }
395  } else {
396  ctx->batch_count++;
397  }
398  } else {
399  redisReply *reply = redisCommand(redis, "%s %s %s",
400  file_ctx->redis_setup.command,
401  file_ctx->redis_setup.key,
402  string);
403  /* We may lose the reply if disconnection happens*/
404  if (reply) {
405  switch (reply->type) {
406  case REDIS_REPLY_ERROR:
407  SCLogWarning("Redis error: %s", reply->str);
408  SCConfLogReopenSyncRedis(file_ctx);
409  break;
410  case REDIS_REPLY_INTEGER:
411  SCLogDebug("Redis integer %lld", reply->integer);
412  ret = 0;
413  break;
414  default:
415  SCLogError("Redis default triggered with %d", reply->type);
416  SCConfLogReopenSyncRedis(file_ctx);
417  break;
418  }
419  freeReplyObject(reply);
420  } else {
421  SCConfLogReopenSyncRedis(file_ctx);
422  }
423  }
424  return ret;
425 }
426 
427 /**
428  * \brief LogFileWriteRedis() writes log data to redis output.
429  * \param log_ctx Log file context allocated by caller
430  * \param string buffer with data to write
431  * \param string_len data length
432  * \retval 0 on success;
433  * \retval -1 on failure;
434  */
435 int LogFileWriteRedis(void *lf_ctx, const char *string, size_t string_len)
436 {
437  LogFileCtx *file_ctx = lf_ctx;
438  if (file_ctx == NULL) {
439  return -1;
440  }
441 
442 #if HAVE_LIBEVENT
443  /* async mode on */
444  if (file_ctx->redis_setup.is_async) {
445  return SCLogRedisWriteAsync(file_ctx, string, string_len);
446  }
447 #endif
448  /* sync mode */
449  if (! file_ctx->redis_setup.is_async) {
450  return SCLogRedisWriteSync(file_ctx, string);
451  }
452  return -1;
453 }
454 
455 /** \brief configure and initializes redis output logging
456  * \param conf ConfNode structure for the output section in question
457  * \param log_ctx Log file context allocated by caller
458  * \retval 0 on success
459  */
460 int SCConfLogOpenRedis(ConfNode *redis_node, void *lf_ctx)
461 {
462  LogFileCtx *log_ctx = lf_ctx;
463 
464  if (log_ctx->threaded) {
465  FatalError("redis does not support threaded output");
466  }
467 
468  const char *redis_port = NULL;
469  const char *redis_mode = NULL;
470 
471  int is_async = 0;
472 
473  if (redis_node) {
474  log_ctx->redis_setup.server = ConfNodeLookupChildValue(redis_node, "server");
475  log_ctx->redis_setup.key = ConfNodeLookupChildValue(redis_node, "key");
476 
477  redis_port = ConfNodeLookupChildValue(redis_node, "port");
478  redis_mode = ConfNodeLookupChildValue(redis_node, "mode");
479 
480  (void)ConfGetChildValueBool(redis_node, "async", &is_async);
481  }
482  if (!log_ctx->redis_setup.server) {
483  log_ctx->redis_setup.server = redis_default_server;
484  SCLogInfo("Using default redis server (127.0.0.1)");
485  }
486  if (!redis_port)
487  redis_port = "6379";
488  if (!redis_mode)
489  redis_mode = "list";
490  if (!log_ctx->redis_setup.key) {
491  log_ctx->redis_setup.key = redis_default_key;
492  }
493 
494 #ifndef HAVE_LIBEVENT
495  if (is_async) {
496  SCLogWarning("async option not available.");
497  }
498  is_async = 0;
499 #endif //ifndef HAVE_LIBEVENT
500 
501  log_ctx->redis_setup.is_async = is_async;
502  log_ctx->redis_setup.batch_size = 0;
503  if (redis_node) {
504  ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining");
505  if (pipelining) {
506  int enabled = 0;
507  int ret;
508  intmax_t val;
509  ret = ConfGetChildValueBool(pipelining, "enabled", &enabled);
510  if (ret && enabled) {
511  ret = ConfGetChildValueInt(pipelining, "batch-size", &val);
512  if (ret) {
513  log_ctx->redis_setup.batch_size = val;
514  } else {
515  log_ctx->redis_setup.batch_size = 10;
516  }
517  }
518  }
519  } else {
520  log_ctx->redis_setup.batch_size = 0;
521  }
522 
523  if (!strcmp(redis_mode, "list") || !strcmp(redis_mode,"lpush")) {
524  log_ctx->redis_setup.command = redis_lpush_cmd;
525  } else if(!strcmp(redis_mode, "rpush")){
526  log_ctx->redis_setup.command = redis_rpush_cmd;
527  } else if(!strcmp(redis_mode,"channel") || !strcmp(redis_mode,"publish")) {
528  log_ctx->redis_setup.command = redis_publish_cmd;
529  } else {
530  FatalError("Invalid redis mode");
531  }
532 
533  /* store server params for reconnection */
534  if (!log_ctx->redis_setup.server) {
535  FatalError("Error allocating redis server string");
536  }
537  if (StringParseUint16(&log_ctx->redis_setup.port, 10, 0, (const char *)redis_port) < 0) {
538  FatalError("Invalid value for redis port: %s", redis_port);
539  }
540  log_ctx->Close = SCLogFileCloseRedis;
541 
542 #ifdef HAVE_LIBEVENT
543  if (is_async) {
544  log_ctx->redis = SCLogRedisContextAsyncAlloc();
545  }
546 #endif /*HAVE_LIBEVENT*/
547  if (! is_async) {
548  log_ctx->redis = SCLogRedisContextAlloc();
549  SCConfLogReopenSyncRedis(log_ctx);
550  }
551  return 0;
552 }
553 
554 /** \brief SCLogFileCloseRedis() Closes redis log more
555  * \param log_ctx Log file context allocated by caller
556  */
557 void SCLogFileCloseRedis(LogFileCtx *log_ctx)
558 {
559  SCLogRedisContext * ctx = log_ctx->redis;
560  if (ctx == NULL) {
561  return;
562  }
563  /* asynchronous */
564  if (log_ctx->redis_setup.is_async) {
565 #if HAVE_LIBEVENT == 1
566  if (ctx->async) {
567  if (ctx->connected > 0) {
568  SCLogAsyncRedisSendQuit(ctx);
569  }
570  if (ctx->ev_base != NULL) {
571  event_base_free(ctx->ev_base);
572  ctx->ev_base = NULL;
573  }
574  }
575 #endif
576  }
577 
578  /* synchronous */
579  if (!log_ctx->redis_setup.is_async) {
580  if (ctx->sync) {
581  redisReply *reply;
582  int i;
583  for (i = 0; i < ctx->batch_count; i++) {
584  redisGetReply(ctx->sync, (void **)&reply);
585  if (reply) {
586  freeReplyObject(reply);
587  }
588  }
589  redisFree(ctx->sync);
590  ctx->sync = NULL;
591  }
592  ctx->tried = 0;
593  ctx->batch_count = 0;
594  }
595 
596  if (ctx != NULL) {
597  SCFree(ctx);
598  }
599 }
600 
601 #endif //#ifdef HAVE_LIBHIREDIS
util-byte.h
ConfGetChildValueInt
int ConfGetChildValueInt(const ConfNode *base, const char *name, intmax_t *val)
Definition: conf.c:434
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
LogFileCtx_
Definition: util-logopenfile.h:72
StringParseUint16
int StringParseUint16(uint16_t *res, int base, size_t len, const char *str)
Definition: util-byte.c:337
util-log-redis.h
util-debug.h
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:249
SCLogInfo
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:224
ConfNodeLookupChild
ConfNode * ConfNodeLookupChild(const ConfNode *node, const char *name)
Lookup a child configuration node by name.
Definition: conf.c:786
suricata-common.h
FatalError
#define FatalError(...)
Definition: util-debug.h:502
ConfGetChildValueBool
int ConfGetChildValueBool(const ConfNode *base, const char *name, int *val)
Definition: conf.c:501
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:261
SCFree
#define SCFree(p)
Definition: util-mem.h:61
ConfNode_
Definition: conf.h:32
util-logopenfile.h
LogFileCtx_::Close
void(* Close)(struct LogFileCtx_ *fp)
Definition: util-logopenfile.h:89
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:237
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
LogFileCtx_::threaded
bool threaded
Definition: util-logopenfile.h:98
ConfNodeLookupChildValue
const char * ConfNodeLookupChildValue(const ConfNode *node, const char *name)
Lookup the value of a child configuration node by name.
Definition: conf.c:814