suricata
util-log-redis.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2024 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Paulo Pacheco <fooinha@gmail.com>
22  *
23  * File-like output for logging: redis
24  */
25 #include "suricata-common.h" /* errno.h, string.h, etc. */
26 #include "util-log-redis.h"
27 #include "util-logopenfile.h"
28 #include "util-byte.h"
29 #include "util-debug.h"
30 
31 #ifdef HAVE_LIBHIREDIS
32 
33 #ifdef HAVE_LIBEVENT_PTHREADS
34 #include <event2/thread.h>
35 #endif /* HAVE_LIBEVENT_PTHREADS */
36 
37 static const char *redis_lpush_cmd = "LPUSH";
38 static const char *redis_rpush_cmd = "RPUSH";
39 static const char *redis_publish_cmd = "PUBLISH";
40 static const char *redis_xadd_cmd = "XADD";
41 static const char *redis_default_key = "suricata";
42 static const char *redis_default_server = "127.0.0.1";
43 static const char *redis_default_format = "%s %s %s";
44 static const char *redis_stream_format = "%s %s * eve %s";
45 static const char *redis_stream_format_maxlen_tmpl = "%s %s MAXLEN %c %d * eve %s";
46 
47 static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx);
48 static void SCLogFileCloseRedis(LogFileCtx *log_ctx);
49 
50 #define REDIS_MAX_STREAM_LENGTH_DEFAULT 100000
51 
52 /**
53  * \brief SCLogRedisInit() - Initializes global stuff before threads
54  */
55 void SCLogRedisInit(void)
56 {
57 #ifdef HAVE_LIBEVENT_PTHREADS
58  evthread_use_pthreads();
59 #endif /* HAVE_LIBEVENT_PTHREADS */
60 }
61 
62 /** \brief SCLogRedisContextAlloc() - Allocates and initializes redis context
63  */
64 static SCLogRedisContext *SCLogRedisContextAlloc(void)
65 {
66  SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
67  if (ctx == NULL) {
68  FatalError("Unable to allocate redis context");
69  }
70  ctx->sync = NULL;
71 #if HAVE_LIBEVENT
72  ctx->ev_base = NULL;
73  ctx->async = NULL;
74 #endif
75  ctx->batch_count = 0;
76  ctx->last_push = 0;
77  ctx->tried = 0;
78 
79  return ctx;
80 }
81 
82 #ifdef HAVE_LIBEVENT
83 
84 static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx);
85 #include <hiredis/adapters/libevent.h>
86 
87 /** \brief SCLogRedisAsyncContextAlloc() - Allocates and initializes redis context with async
88  */
89 static SCLogRedisContext *SCLogRedisContextAsyncAlloc(void)
90 {
91  SCLogRedisContext* ctx = (SCLogRedisContext*) SCCalloc(1, sizeof(SCLogRedisContext));
92  if (unlikely(ctx == NULL)) {
93  FatalError("Unable to allocate redis context");
94  }
95 
96  ctx->sync = NULL;
97  ctx->async = NULL;
98  ctx->ev_base = NULL;
99  ctx->state = REDIS_STATE_DISCONNECTED;
100  ctx->batch_count = 0;
101  ctx->last_push = 0;
102  ctx->tried = 0;
103 
104  return ctx;
105 }
106 
107 /** \brief SCRedisAsyncCommandCallback() Callback when reply from redis happens.
108  * \param ac redis async context
109  * \param r redis reply
110  * \param privdata opaque data with pointer to LogFileCtx
111  */
112 static void SCRedisAsyncCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
113 {
114  redisReply *reply = r;
115  LogFileCtx *log_ctx = privdata;
116  SCLogRedisContext *ctx = log_ctx->redis;
117 
118  if (reply == NULL) {
119  if (ctx->state != REDIS_STATE_DISCONNECTED)
120  SCLogInfo("Missing reply from redis, disconnected.");
121  ctx->state = REDIS_STATE_DISCONNECTED;
122  } else {
123  event_base_loopbreak(ctx->ev_base);
124  }
125 }
126 
127 /** \brief SCRedisAsyncAuthCallback() Callback when AUTH reply from redis happens.
128  * \param ac redis async context
129  * \param r redis reply
130  * \param privdata opaque data with pointer to LogFileCtx
131  */
132 static void SCRedisAsyncAuthCallback(redisAsyncContext *ac, void *r, void *privdata)
133 {
134  redisReply *reply = r;
135  LogFileCtx *log_ctx = privdata;
136  SCLogRedisContext *ctx = log_ctx->redis;
137 
138  if (reply == NULL) {
139  if (ctx->tried == 0) {
140  SCLogWarning("Failed to connect to Redis... (will keep trying)");
141  }
142  ctx->state = REDIS_STATE_DISCONNECTED;
143  ctx->tried = time(NULL);
144  } else {
145  if (reply->type != REDIS_REPLY_ERROR) {
146  SCLogInfo("Redis authenticated successfully.");
147  ctx->state = REDIS_STATE_AUTHENTICATED;
148  ctx->tried = 0;
149  } else {
150  if (ctx->tried == 0) {
151  SCLogWarning("Redis AUTH failed: %s (will keep trying)", reply->str);
152  }
153  ctx->state = REDIS_STATE_AUTH_FAILED;
154  ctx->tried = time(NULL);
155  }
156  }
157  event_base_loopbreak(ctx->ev_base);
158 }
159 
160 /** \brief SCLogAsyncRedisSendAuth() - Authenticates with redis
161  * \param log_ctx Log file context allocated by caller
162  */
163 static void SCLogAsyncRedisSendAuth(LogFileCtx *log_ctx)
164 {
165  SCLogRedisContext *ctx = log_ctx->redis;
166 
167  /* only try to reauth once per second */
168  if (ctx->tried >= time(NULL)) {
169  return;
170  }
171 
172  if (log_ctx->redis_setup.username != NULL) {
173  redisAsyncCommand(ctx->async, SCRedisAsyncAuthCallback, log_ctx, "AUTH %s %s",
174  log_ctx->redis_setup.username, log_ctx->redis_setup.password);
175  } else {
176  redisAsyncCommand(ctx->async, SCRedisAsyncAuthCallback, log_ctx, "AUTH %s",
177  log_ctx->redis_setup.password);
178  }
179  event_base_dispatch(ctx->ev_base);
180 }
181 
182 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an ECHO command reply
183  * This is used to check if redis is connected.
184  * \param ac redis async context
185  * \param r redis reply
186  * \param privdata opaque data with pointer to LogFileCtx
187  */
188 static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
189 {
190  redisReply *reply = r;
191  SCLogRedisContext * ctx = privdata;
192 
193  if (reply == NULL) {
194  if (ctx->tried == 0) {
195  SCLogWarning("Failed to connect to Redis... (will keep trying)");
196  }
197  ctx->state = REDIS_STATE_DISCONNECTED;
198  ctx->tried = time(NULL);
199  } else {
200  if (reply->type != REDIS_REPLY_ERROR) {
201  SCLogNotice("Connected to Redis.");
202  ctx->state = REDIS_STATE_CONNECTED;
203  ctx->tried = 0;
204  } else {
205  if (strncmp(reply->str, "NOAUTH", 6) == 0) {
206  if (ctx->tried == 0) {
207  SCLogWarning("Redis authentication required, but not configured.");
208  }
209  } else {
210  if (ctx->tried == 0) {
211  SCLogWarning("Redis ECHO command failed: %s", reply->str);
212  }
213  }
214  ctx->state = REDIS_STATE_ECHO_FAILED;
215  ctx->tried = time(NULL);
216  }
217  }
218  event_base_loopbreak(ctx->ev_base);
219 }
220 
221 /** \brief SCLogAsyncRedisSendEcho() - Emits and awaits response for an async ECHO command.
222  * It's used for check if redis is alive.
223  * \param ctx redis context
224  */
225 static void SCLogAsyncRedisSendEcho(SCLogRedisContext * ctx)
226 {
227  redisAsyncCommand(ctx->async, SCRedisAsyncEchoCommandCallback, ctx, "ECHO suricata");
228  event_base_dispatch(ctx->ev_base);
229 }
230 
231 /** \brief SCRedisAsyncEchoCommandCallback() Callback for an QUIT command reply
232  * This is used to terminate connection with redis.
233  * \param ac redis async context
234  * \param r redis reply
235  * \param privdata opaque data with pointer to LogFileCtx
236  */
237 static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac, void *r, void *privdata)
238 {
239  SCLogInfo("Disconnecting from redis!");
240 }
241 
242 /** \brief QUIT command
243  * Emits and awaits response for an async QUIT command.
244  * It's used to disconnect with redis
245  * \param ctx redis context
246  */
247 static void SCLogAsyncRedisSendQuit(SCLogRedisContext * ctx)
248 {
249  if (ctx->state != REDIS_STATE_DISCONNECTED) {
250  redisAsyncCommand(ctx->async, SCRedisAsyncQuitCommandCallback, ctx, "QUIT");
251  SCLogInfo("QUIT Command sent to redis. Connection will terminate!");
252  }
253 
254  redisAsyncFree(ctx->async);
255  event_base_dispatch(ctx->ev_base);
256  ctx->async = NULL;
257  event_base_free(ctx->ev_base);
258  ctx->ev_base = NULL;
259  ctx->state = REDIS_STATE_DISCONNECTED;
260 }
261 
262 /** \brief SCConfLogReopenAsyncRedis() Open or re-opens connection to redis for logging.
263  * \param log_ctx Log file context allocated by caller
264  */
265 static int SCConfLogReopenAsyncRedis(LogFileCtx *log_ctx)
266 {
267  SCLogRedisContext * ctx = log_ctx->redis;
268  const char *redis_server = log_ctx->redis_setup.server;
269  int redis_port = log_ctx->redis_setup.port;
270 
271  /* only try to reconnect once per second */
272  if (ctx->tried >= time(NULL)) {
273  return -1;
274  }
275 
276  if (strchr(redis_server, '/') == NULL) {
277  ctx->async = redisAsyncConnect(redis_server, redis_port);
278  } else {
279  ctx->async = redisAsyncConnectUnix(redis_server);
280  }
281 
282  if (ctx->ev_base != NULL) {
283  event_base_free(ctx->ev_base);
284  ctx->ev_base = NULL;
285  }
286 
287  if (ctx->async == NULL) {
288  SCLogError("Error allocate redis async.");
289  ctx->tried = time(NULL);
290  return -1;
291  }
292 
293  if (ctx->async != NULL && ctx->async->err) {
294  SCLogError("Error setting to redis async: [%s].", ctx->async->errstr);
295  ctx->tried = time(NULL);
296  return -1;
297  }
298 
299  ctx->ev_base = event_base_new();
300 
301  if (ctx->ev_base == NULL) {
302  ctx->tried = time(NULL);
303  redisAsyncFree(ctx->async);
304  ctx->async = NULL;
305  return -1;
306  }
307 
308  redisLibeventAttach(ctx->async, ctx->ev_base);
309 
310  log_ctx->redis = ctx;
311  log_ctx->Close = SCLogFileCloseRedis;
312  return 0;
313 }
314 
315 /** \brief SCLogAsyncRedisIsReady() Determines whether is ready to send data.
316  * \param file_ctx Log file context allocated by caller
317  */
318 static inline bool SCLogAsyncRedisIsReady(LogFileCtx *file_ctx)
319 {
320  SCLogRedisContext *ctx = file_ctx->redis;
321 
322  return file_ctx->redis_setup.password ? ctx->state == REDIS_STATE_AUTHENTICATED
323  : ctx->state == REDIS_STATE_CONNECTED;
324 }
325 
326 /** \brief SCLogRedisWriteAsync() writes string to redis output in async mode
327  * \param file_ctx Log file context allocated by caller
328  * \param string Buffer to output
329  */
330 static int SCLogRedisWriteAsync(LogFileCtx *file_ctx, const char *string, size_t string_len)
331 {
332  SCLogRedisContext *ctx = file_ctx->redis;
333 
334  if (!SCLogAsyncRedisIsReady(file_ctx)) {
335  if (ctx->state == REDIS_STATE_DISCONNECTED) {
336  if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
337  return -1;
338  }
339  }
340  if (ctx->tried == 0) {
341  SCLogNotice("Trying to connect to Redis");
342  }
343  if (file_ctx->redis_setup.password == NULL) {
344  // Just verify the connection is alive with ECHO
345  SCLogAsyncRedisSendEcho(ctx);
346  } else {
347  // Send AUTH to authenticate and verify the connection is alive
348  SCLogAsyncRedisSendAuth(file_ctx);
349  }
350  }
351 
352  if (!SCLogAsyncRedisIsReady(file_ctx)) {
353  return -1;
354  }
355 
356  if (ctx->async == NULL) {
357  return -1;
358  }
359 
360  redisAsyncCommand(ctx->async, SCRedisAsyncCommandCallback, file_ctx,
361  file_ctx->redis_setup.format, file_ctx->redis_setup.command, file_ctx->redis_setup.key,
362  string);
363 
364  event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
365 
366  return 0;
367 }
368 
369 #endif// HAVE_LIBEVENT
370 
371 /** \brief SCConfLogReopenSyncRedis() Open or re-opens connection to redis for logging.
372  * \param log_ctx Log file context allocated by caller
373  */
374 static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx)
375 {
376  SCLogRedisContext * ctx = log_ctx->redis;
377 
378  /* only try to reconnect once per second */
379  if (ctx->tried >= time(NULL)) {
380  return -1;
381  }
382 
383  const char *redis_server = log_ctx->redis_setup.server;
384  int redis_port = log_ctx->redis_setup.port;
385 
386  if (ctx->sync != NULL) {
387  redisFree(ctx->sync);
388  }
389 
390  if (strchr(redis_server, '/') == NULL) {
391  ctx->sync = redisConnect(redis_server, redis_port);
392  } else {
393  ctx->sync = redisConnectUnix(redis_server);
394  }
395  if (ctx->sync == NULL) {
396  SCLogError("Error connecting to redis server.");
397  ctx->tried = time(NULL);
398  return -1;
399  }
400  if (ctx->sync->err) {
401  SCLogError("Error connecting to redis server: [%s].", ctx->sync->errstr);
402  redisFree(ctx->sync);
403  ctx->sync = NULL;
404  ctx->tried = time(NULL);
405  return -1;
406  }
407  SCLogInfo("Connected to redis server [%s].", log_ctx->redis_setup.server);
408 
409  if (log_ctx->redis_setup.password != NULL) {
410  redisReply *reply;
411  if (log_ctx->redis_setup.username != NULL) {
412  reply = redisCommand(ctx->sync, "AUTH %s %s", log_ctx->redis_setup.username,
413  log_ctx->redis_setup.password);
414  } else {
415  reply = redisCommand(ctx->sync, "AUTH %s", log_ctx->redis_setup.password);
416  }
417 
418  if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
419  SCLogWarning("Redis AUTH failed: %s", reply ? reply->str : ctx->sync->errstr);
420  if (reply) {
421  freeReplyObject(reply);
422  }
423  redisFree(ctx->sync);
424  ctx->sync = NULL;
425  ctx->tried = time(NULL);
426  return -1;
427  }
428  freeReplyObject(reply);
429  SCLogInfo("Redis authenticated successfully.");
430  }
431 
432  /* Check if we are really ready to write logs */
433  redisReply *reply = redisCommand(ctx->sync, "ECHO suricata");
434  if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
435  if (reply != NULL && strncmp(reply->str, "NOAUTH", 6) == 0) {
436  SCLogWarning("Redis authentication required, but not configured.");
437  } else {
438  SCLogWarning("Redis ECHO failed: %s", reply ? reply->str : ctx->sync->errstr);
439  }
440  if (reply) {
441  freeReplyObject(reply);
442  }
443  redisFree(ctx->sync);
444  ctx->sync = NULL;
445  ctx->tried = time(NULL);
446  return -1;
447  }
448  freeReplyObject(reply);
449 
450  log_ctx->redis = ctx;
451  log_ctx->Close = SCLogFileCloseRedis;
452  return 0;
453 }
454 /** \brief SCLogRedisWriteSync() writes string to redis output in sync mode
455  * \param file_ctx Log file context allocated by caller
456  * \param string Buffer to output
457  */
458 static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
459 {
460  SCLogRedisContext * ctx = file_ctx->redis;
461  int ret = -1;
462  redisContext *redis = ctx->sync;
463  if (redis == NULL) {
464  SCConfLogReopenSyncRedis(file_ctx);
465  redis = ctx->sync;
466  if (redis == NULL) {
467  SCLogDebug("Redis after re-open is not available.");
468  return -1;
469  }
470  }
471 
472  /* synchronous mode */
473  if (file_ctx->redis_setup.batch_size) {
474  redisAppendCommand(redis, file_ctx->redis_setup.format, file_ctx->redis_setup.command,
475  file_ctx->redis_setup.key, string);
476  time_t now = time(NULL);
477  if ((ctx->batch_count == file_ctx->redis_setup.batch_size) || (ctx->last_push < now)) {
478  redisReply *reply;
479  int i;
480  int batch_size = ctx->batch_count;
481  ctx->batch_count = 0;
482  ctx->last_push = now;
483  for (i = 0; i <= batch_size; i++) {
484  if (redisGetReply(redis, (void **)&reply) == REDIS_OK) {
485  freeReplyObject(reply);
486  ret = 0;
487  } else {
488  if (redis->err) {
489  SCLogInfo("Error when fetching reply: %s (%d)",
490  redis->errstr,
491  redis->err);
492  }
493  switch (redis->err) {
494  case REDIS_ERR_EOF:
495  case REDIS_ERR_IO:
496  SCLogInfo("Reopening connection to redis server");
497  SCConfLogReopenSyncRedis(file_ctx);
498  redis = ctx->sync;
499  if (redis) {
500  SCLogInfo("Reconnected to redis server");
501  redisAppendCommand(redis, file_ctx->redis_setup.format,
502  file_ctx->redis_setup.command, file_ctx->redis_setup.key,
503  string);
504  ctx->batch_count++;
505  return 0;
506  } else {
507  SCLogInfo("Unable to reconnect to redis server");
508  return -1;
509  }
510  break;
511  default:
512  SCLogWarning("Unsupported error code %d", redis->err);
513  return -1;
514  }
515  }
516  }
517  } else {
518  ctx->batch_count++;
519  }
520  } else {
521  redisReply *reply = redisCommand(redis, file_ctx->redis_setup.format,
522  file_ctx->redis_setup.command, file_ctx->redis_setup.key, string);
523  /* We may lose the reply if disconnection happens*/
524  if (reply) {
525  switch (reply->type) {
526  case REDIS_REPLY_ERROR:
527  SCLogWarning("Redis error: %s", reply->str);
528  SCConfLogReopenSyncRedis(file_ctx);
529  break;
530  case REDIS_REPLY_INTEGER:
531  SCLogDebug("Redis integer %lld", reply->integer);
532  ret = 0;
533  break;
534  case REDIS_REPLY_STRING:
535  SCLogDebug("Redis string %s", reply->str);
536  ret = 0;
537  break;
538  default:
539  SCLogError("Redis default triggered with %d", reply->type);
540  SCConfLogReopenSyncRedis(file_ctx);
541  break;
542  }
543  freeReplyObject(reply);
544  } else {
545  SCConfLogReopenSyncRedis(file_ctx);
546  }
547  }
548  return ret;
549 }
550 
551 /**
552  * \brief LogFileWriteRedis() writes log data to redis output.
553  * \param log_ctx Log file context allocated by caller
554  * \param string buffer with data to write
555  * \param string_len data length
556  * \retval 0 on success;
557  * \retval -1 on failure;
558  */
559 int LogFileWriteRedis(void *lf_ctx, const char *string, size_t string_len)
560 {
561  LogFileCtx *file_ctx = lf_ctx;
562  if (file_ctx == NULL) {
563  return -1;
564  }
565 
566 #if HAVE_LIBEVENT
567  /* async mode on */
568  if (file_ctx->redis_setup.is_async) {
569  return SCLogRedisWriteAsync(file_ctx, string, string_len);
570  }
571 #endif
572  /* sync mode */
573  if (! file_ctx->redis_setup.is_async) {
574  return SCLogRedisWriteSync(file_ctx, string);
575  }
576  return -1;
577 }
578 
579 /** \brief configure and initializes redis output logging
580  * \param conf ConfNode structure for the output section in question
581  * \param log_ctx Log file context allocated by caller
582  * \retval 0 on success
583  */
584 int SCConfLogOpenRedis(SCConfNode *redis_node, void *lf_ctx)
585 {
586  LogFileCtx *log_ctx = lf_ctx;
587 
588  if (log_ctx->threaded) {
589  FatalError("redis does not support threaded output");
590  }
591 
592  const char *redis_port = NULL;
593  const char *redis_mode = NULL;
594 
595  int is_async = 0;
596 
597  if (redis_node) {
598  log_ctx->redis_setup.server = SCConfNodeLookupChildValue(redis_node, "server");
599  log_ctx->redis_setup.key = SCConfNodeLookupChildValue(redis_node, "key");
600  log_ctx->redis_setup.username = SCConfNodeLookupChildValue(redis_node, "username");
601  log_ctx->redis_setup.password = SCConfNodeLookupChildValue(redis_node, "password");
602 
603  redis_port = SCConfNodeLookupChildValue(redis_node, "port");
604  redis_mode = SCConfNodeLookupChildValue(redis_node, "mode");
605 
606  (void)SCConfGetChildValueBool(redis_node, "async", &is_async);
607  }
608  if (!log_ctx->redis_setup.server) {
609  log_ctx->redis_setup.server = redis_default_server;
610  SCLogInfo("Using default redis server (127.0.0.1)");
611  }
612  if (!redis_port)
613  redis_port = "6379";
614  if (!redis_mode)
615  redis_mode = "list";
616  if (!log_ctx->redis_setup.key) {
617  log_ctx->redis_setup.key = redis_default_key;
618  }
619  if (log_ctx->redis_setup.username && !log_ctx->redis_setup.password) {
620  SCLogWarning("Redis username configured without password; ignoring username.");
621  log_ctx->redis_setup.username = NULL;
622  }
623 
624 #ifndef HAVE_LIBEVENT
625  if (is_async) {
626  SCLogWarning("async option not available.");
627  }
628  is_async = 0;
629 #endif //ifndef HAVE_LIBEVENT
630 
631  log_ctx->redis_setup.is_async = is_async;
632  log_ctx->redis_setup.batch_size = 0;
633  if (redis_node) {
634  SCConfNode *pipelining = SCConfNodeLookupChild(redis_node, "pipelining");
635  if (pipelining) {
636  int enabled = 0;
637  int ret;
638  intmax_t val;
639  ret = SCConfGetChildValueBool(pipelining, "enabled", &enabled);
640  if (ret && enabled) {
641  ret = SCConfGetChildValueInt(pipelining, "batch-size", &val);
642  if (ret) {
643  log_ctx->redis_setup.batch_size = val;
644  } else {
645  log_ctx->redis_setup.batch_size = 10;
646  }
647  }
648  }
649  } else {
650  log_ctx->redis_setup.batch_size = 0;
651  }
652 
653  log_ctx->redis_setup.format = redis_default_format;
654  if (!strcmp(redis_mode, "list") || !strcmp(redis_mode,"lpush")) {
655  log_ctx->redis_setup.command = redis_lpush_cmd;
656  } else if(!strcmp(redis_mode, "rpush")){
657  log_ctx->redis_setup.command = redis_rpush_cmd;
658  } else if(!strcmp(redis_mode,"channel") || !strcmp(redis_mode,"publish")) {
659  log_ctx->redis_setup.command = redis_publish_cmd;
660  } else if (!strcmp(redis_mode, "stream") || !strcmp(redis_mode, "xadd")) {
661  int exact;
662  intmax_t maxlen;
663  log_ctx->redis_setup.command = redis_xadd_cmd;
664  log_ctx->redis_setup.format = redis_stream_format;
665  if (SCConfGetChildValueBool(redis_node, "stream-trim-exact", &exact) == 0) {
666  exact = 0;
667  }
668  if (SCConfGetChildValueInt(redis_node, "stream-maxlen", &maxlen) == 0) {
669  maxlen = REDIS_MAX_STREAM_LENGTH_DEFAULT;
670  }
671  if (maxlen > 0) {
672  /* we do not need a lot of space here since we only build another
673  format string, whose length is limited by the length of the
674  maxlen integer formatted as a string */
675  log_ctx->redis_setup.stream_format = SCCalloc(100, sizeof(char));
676  snprintf(log_ctx->redis_setup.stream_format, 100, redis_stream_format_maxlen_tmpl, "%s",
677  "%s", exact ? '=' : '~', maxlen, "%s");
678  log_ctx->redis_setup.format = log_ctx->redis_setup.stream_format;
679  }
680  } else {
681  FatalError("Invalid redis mode: %s", redis_mode);
682  }
683 
684  /* store server params for reconnection */
685  if (!log_ctx->redis_setup.server) {
686  FatalError("Error allocating redis server string");
687  }
688  if (StringParseUint16(&log_ctx->redis_setup.port, 10, 0, (const char *)redis_port) < 0) {
689  FatalError("Invalid value for redis port: %s", redis_port);
690  }
691  log_ctx->Close = SCLogFileCloseRedis;
692 
693 #ifdef HAVE_LIBEVENT
694  if (is_async) {
695  log_ctx->redis = SCLogRedisContextAsyncAlloc();
696  }
697 #endif /*HAVE_LIBEVENT*/
698  if (! is_async) {
699  log_ctx->redis = SCLogRedisContextAlloc();
700  SCConfLogReopenSyncRedis(log_ctx);
701  }
702  return 0;
703 }
704 
705 /** \brief SCLogFileCloseRedis() Closes redis log more
706  * \param log_ctx Log file context allocated by caller
707  */
708 void SCLogFileCloseRedis(LogFileCtx *log_ctx)
709 {
710  SCLogRedisContext * ctx = log_ctx->redis;
711  if (ctx == NULL) {
712  return;
713  }
714  /* asynchronous */
715  if (log_ctx->redis_setup.is_async) {
716 #if HAVE_LIBEVENT == 1
717  if (ctx->async) {
718  if (ctx->state != REDIS_STATE_DISCONNECTED) {
719  SCLogAsyncRedisSendQuit(ctx);
720  }
721  if (ctx->ev_base != NULL) {
722  event_base_free(ctx->ev_base);
723  ctx->ev_base = NULL;
724  }
725  }
726 #endif
727  }
728 
729  /* synchronous */
730  if (!log_ctx->redis_setup.is_async) {
731  if (ctx->sync) {
732  redisReply *reply;
733  int i;
734  for (i = 0; i < ctx->batch_count; i++) {
735  redisGetReply(ctx->sync, (void **)&reply);
736  if (reply) {
737  freeReplyObject(reply);
738  }
739  }
740  redisFree(ctx->sync);
741  ctx->sync = NULL;
742  }
743  ctx->tried = 0;
744  ctx->batch_count = 0;
745  }
746 
747  if (ctx != NULL) {
748  SCFree(ctx);
749  }
750 }
751 
752 #endif //#ifdef HAVE_LIBHIREDIS
util-byte.h
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:282
ctx
struct Thresholds ctx
LogFileCtx_
Definition: util-logopenfile.h:72
StringParseUint16
int StringParseUint16(uint16_t *res, int base, size_t len, const char *str)
Definition: util-byte.c:296
SCConfGetChildValueBool
int SCConfGetChildValueBool(const SCConfNode *base, const char *name, int *val)
Definition: conf.c:516
util-log-redis.h
SCConfNodeLookupChildValue
const char * SCConfNodeLookupChildValue(const SCConfNode *node, const char *name)
Lookup the value of a child configuration node by name.
Definition: conf.c:855
util-debug.h
SCLogWarning
#define SCLogWarning(...)
Macro used to log WARNING messages.
Definition: util-debug.h:262
SCConfGetChildValueInt
int SCConfGetChildValueInt(const SCConfNode *base, const char *name, intmax_t *val)
Definition: conf.c:450
SCLogInfo
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:232
SCConfNodeLookupChild
SCConfNode * SCConfNodeLookupChild(const SCConfNode *node, const char *name)
Lookup a child configuration node by name.
Definition: conf.c:827
suricata-common.h
FatalError
#define FatalError(...)
Definition: util-debug.h:517
SCLogError
#define SCLogError(...)
Macro used to log ERROR messages.
Definition: util-debug.h:274
SCFree
#define SCFree(p)
Definition: util-mem.h:61
util-logopenfile.h
LogFileCtx_::Close
void(* Close)(struct LogFileCtx_ *fp)
Definition: util-logopenfile.h:88
SCLogNotice
#define SCLogNotice(...)
Macro used to log NOTICE messages.
Definition: util-debug.h:250
SCCalloc
#define SCCalloc(nm, sz)
Definition: util-mem.h:53
SCConfNode_
Definition: conf.h:37
LogFileCtx_::threaded
bool threaded
Definition: util-logopenfile.h:98