31 #ifdef HAVE_LIBHIREDIS
33 #ifdef HAVE_LIBEVENT_PTHREADS
34 #include <event2/thread.h>
37 static const char *redis_lpush_cmd =
"LPUSH";
38 static const char *redis_rpush_cmd =
"RPUSH";
39 static const char *redis_publish_cmd =
"PUBLISH";
40 static const char *redis_xadd_cmd =
"XADD";
41 static const char *redis_default_key =
"suricata";
42 static const char *redis_default_server =
"127.0.0.1";
43 static const char *redis_default_format =
"%s %s %s";
44 static const char *redis_stream_format =
"%s %s * eve %s";
45 static const char *redis_stream_format_maxlen_tmpl =
"%s %s MAXLEN %c %d * eve %s";
47 static int SCConfLogReopenSyncRedis(
LogFileCtx *log_ctx);
48 static void SCLogFileCloseRedis(
LogFileCtx *log_ctx);
50 #define REDIS_MAX_STREAM_LENGTH_DEFAULT 100000
55 void SCLogRedisInit(
void)
57 #ifdef HAVE_LIBEVENT_PTHREADS
58 evthread_use_pthreads();
64 static SCLogRedisContext *SCLogRedisContextAlloc(
void)
66 SCLogRedisContext*
ctx = (SCLogRedisContext*)
SCCalloc(1,
sizeof(SCLogRedisContext));
68 FatalError(
"Unable to allocate redis context");
84 static int SCConfLogReopenAsyncRedis(
LogFileCtx *log_ctx);
85 #include <hiredis/adapters/libevent.h>
89 static SCLogRedisContext *SCLogRedisContextAsyncAlloc(
void)
91 SCLogRedisContext*
ctx = (SCLogRedisContext*)
SCCalloc(1,
sizeof(SCLogRedisContext));
93 FatalError(
"Unable to allocate redis context");
99 ctx->state = REDIS_STATE_DISCONNECTED;
100 ctx->batch_count = 0;
112 static void SCRedisAsyncCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
114 redisReply *reply = r;
116 SCLogRedisContext *
ctx = log_ctx->redis;
119 if (
ctx->state != REDIS_STATE_DISCONNECTED)
120 SCLogInfo(
"Missing reply from redis, disconnected.");
121 ctx->state = REDIS_STATE_DISCONNECTED;
123 event_base_loopbreak(
ctx->ev_base);
132 static void SCRedisAsyncAuthCallback(redisAsyncContext *ac,
void *r,
void *privdata)
134 redisReply *reply = r;
136 SCLogRedisContext *
ctx = log_ctx->redis;
139 if (
ctx->tried == 0) {
140 SCLogWarning(
"Failed to connect to Redis... (will keep trying)");
142 ctx->state = REDIS_STATE_DISCONNECTED;
143 ctx->tried = time(NULL);
145 if (reply->type != REDIS_REPLY_ERROR) {
146 SCLogInfo(
"Redis authenticated successfully.");
147 ctx->state = REDIS_STATE_AUTHENTICATED;
150 if (
ctx->tried == 0) {
151 SCLogWarning(
"Redis AUTH failed: %s (will keep trying)", reply->str);
153 ctx->state = REDIS_STATE_AUTH_FAILED;
154 ctx->tried = time(NULL);
157 event_base_loopbreak(
ctx->ev_base);
163 static void SCLogAsyncRedisSendAuth(
LogFileCtx *log_ctx)
165 SCLogRedisContext *
ctx = log_ctx->redis;
168 if (
ctx->tried >= time(NULL)) {
172 if (log_ctx->redis_setup.username != NULL) {
173 redisAsyncCommand(
ctx->async, SCRedisAsyncAuthCallback, log_ctx,
"AUTH %s %s",
174 log_ctx->redis_setup.username, log_ctx->redis_setup.password);
176 redisAsyncCommand(
ctx->async, SCRedisAsyncAuthCallback, log_ctx,
"AUTH %s",
177 log_ctx->redis_setup.password);
179 event_base_dispatch(
ctx->ev_base);
188 static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
190 redisReply *reply = r;
191 SCLogRedisContext *
ctx = privdata;
194 if (
ctx->tried == 0) {
195 SCLogWarning(
"Failed to connect to Redis... (will keep trying)");
197 ctx->state = REDIS_STATE_DISCONNECTED;
198 ctx->tried = time(NULL);
200 if (reply->type != REDIS_REPLY_ERROR) {
202 ctx->state = REDIS_STATE_CONNECTED;
205 if (strncmp(reply->str,
"NOAUTH", 6) == 0) {
206 if (
ctx->tried == 0) {
207 SCLogWarning(
"Redis authentication required, but not configured.");
210 if (
ctx->tried == 0) {
211 SCLogWarning(
"Redis ECHO command failed: %s", reply->str);
214 ctx->state = REDIS_STATE_ECHO_FAILED;
215 ctx->tried = time(NULL);
218 event_base_loopbreak(
ctx->ev_base);
225 static void SCLogAsyncRedisSendEcho(SCLogRedisContext *
ctx)
227 redisAsyncCommand(
ctx->async, SCRedisAsyncEchoCommandCallback,
ctx,
"ECHO suricata");
228 event_base_dispatch(
ctx->ev_base);
237 static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
247 static void SCLogAsyncRedisSendQuit(SCLogRedisContext *
ctx)
249 if (
ctx->state != REDIS_STATE_DISCONNECTED) {
250 redisAsyncCommand(
ctx->async, SCRedisAsyncQuitCommandCallback,
ctx,
"QUIT");
251 SCLogInfo(
"QUIT Command sent to redis. Connection will terminate!");
254 redisAsyncFree(
ctx->async);
255 event_base_dispatch(
ctx->ev_base);
257 event_base_free(
ctx->ev_base);
259 ctx->state = REDIS_STATE_DISCONNECTED;
265 static int SCConfLogReopenAsyncRedis(
LogFileCtx *log_ctx)
267 SCLogRedisContext *
ctx = log_ctx->redis;
268 const char *redis_server = log_ctx->redis_setup.server;
269 int redis_port = log_ctx->redis_setup.port;
272 if (
ctx->tried >= time(NULL)) {
276 if (strchr(redis_server,
'/') == NULL) {
277 ctx->async = redisAsyncConnect(redis_server, redis_port);
279 ctx->async = redisAsyncConnectUnix(redis_server);
282 if (
ctx->ev_base != NULL) {
283 event_base_free(
ctx->ev_base);
287 if (
ctx->async == NULL) {
289 ctx->tried = time(NULL);
293 if (
ctx->async != NULL &&
ctx->async->err) {
294 SCLogError(
"Error setting to redis async: [%s].",
ctx->async->errstr);
295 ctx->tried = time(NULL);
299 ctx->ev_base = event_base_new();
301 if (
ctx->ev_base == NULL) {
302 ctx->tried = time(NULL);
303 redisAsyncFree(
ctx->async);
308 redisLibeventAttach(
ctx->async,
ctx->ev_base);
310 log_ctx->redis =
ctx;
311 log_ctx->
Close = SCLogFileCloseRedis;
318 static inline bool SCLogAsyncRedisIsReady(
LogFileCtx *file_ctx)
320 SCLogRedisContext *
ctx = file_ctx->redis;
322 return file_ctx->redis_setup.password ?
ctx->state == REDIS_STATE_AUTHENTICATED
323 :
ctx->state == REDIS_STATE_CONNECTED;
330 static int SCLogRedisWriteAsync(
LogFileCtx *file_ctx,
const char *
string,
size_t string_len)
332 SCLogRedisContext *
ctx = file_ctx->redis;
334 if (!SCLogAsyncRedisIsReady(file_ctx)) {
335 if (
ctx->state == REDIS_STATE_DISCONNECTED) {
336 if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
340 if (
ctx->tried == 0) {
343 if (file_ctx->redis_setup.password == NULL) {
345 SCLogAsyncRedisSendEcho(
ctx);
348 SCLogAsyncRedisSendAuth(file_ctx);
352 if (!SCLogAsyncRedisIsReady(file_ctx)) {
356 if (
ctx->async == NULL) {
360 redisAsyncCommand(
ctx->async, SCRedisAsyncCommandCallback, file_ctx,
361 file_ctx->redis_setup.format, file_ctx->redis_setup.command, file_ctx->redis_setup.key,
364 event_base_loop(
ctx->ev_base, EVLOOP_NONBLOCK);
369 #endif// HAVE_LIBEVENT
374 static int SCConfLogReopenSyncRedis(
LogFileCtx *log_ctx)
376 SCLogRedisContext *
ctx = log_ctx->redis;
379 if (
ctx->tried >= time(NULL)) {
383 const char *redis_server = log_ctx->redis_setup.server;
384 int redis_port = log_ctx->redis_setup.port;
386 if (
ctx->sync != NULL) {
387 redisFree(
ctx->sync);
390 if (strchr(redis_server,
'/') == NULL) {
391 ctx->sync = redisConnect(redis_server, redis_port);
393 ctx->sync = redisConnectUnix(redis_server);
395 if (
ctx->sync == NULL) {
396 SCLogError(
"Error connecting to redis server.");
397 ctx->tried = time(NULL);
400 if (
ctx->sync->err) {
401 SCLogError(
"Error connecting to redis server: [%s].",
ctx->sync->errstr);
402 redisFree(
ctx->sync);
404 ctx->tried = time(NULL);
407 SCLogInfo(
"Connected to redis server [%s].", log_ctx->redis_setup.server);
409 if (log_ctx->redis_setup.password != NULL) {
411 if (log_ctx->redis_setup.username != NULL) {
412 reply = redisCommand(
ctx->sync,
"AUTH %s %s", log_ctx->redis_setup.username,
413 log_ctx->redis_setup.password);
415 reply = redisCommand(
ctx->sync,
"AUTH %s", log_ctx->redis_setup.password);
418 if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
419 SCLogWarning(
"Redis AUTH failed: %s", reply ? reply->str :
ctx->sync->errstr);
421 freeReplyObject(reply);
423 redisFree(
ctx->sync);
425 ctx->tried = time(NULL);
428 freeReplyObject(reply);
429 SCLogInfo(
"Redis authenticated successfully.");
433 redisReply *reply = redisCommand(
ctx->sync,
"ECHO suricata");
434 if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
435 if (reply != NULL && strncmp(reply->str,
"NOAUTH", 6) == 0) {
436 SCLogWarning(
"Redis authentication required, but not configured.");
438 SCLogWarning(
"Redis ECHO failed: %s", reply ? reply->str :
ctx->sync->errstr);
441 freeReplyObject(reply);
443 redisFree(
ctx->sync);
445 ctx->tried = time(NULL);
448 freeReplyObject(reply);
450 log_ctx->redis =
ctx;
451 log_ctx->
Close = SCLogFileCloseRedis;
458 static int SCLogRedisWriteSync(
LogFileCtx *file_ctx,
const char *
string)
460 SCLogRedisContext *
ctx = file_ctx->redis;
462 redisContext *redis =
ctx->sync;
464 SCConfLogReopenSyncRedis(file_ctx);
467 SCLogDebug(
"Redis after re-open is not available.");
473 if (file_ctx->redis_setup.batch_size) {
474 redisAppendCommand(redis, file_ctx->redis_setup.format, file_ctx->redis_setup.command,
475 file_ctx->redis_setup.key,
string);
476 time_t now = time(NULL);
477 if ((
ctx->batch_count == file_ctx->redis_setup.batch_size) || (
ctx->last_push < now)) {
480 int batch_size =
ctx->batch_count;
481 ctx->batch_count = 0;
482 ctx->last_push = now;
483 for (i = 0; i <= batch_size; i++) {
484 if (redisGetReply(redis, (
void **)&reply) == REDIS_OK) {
485 freeReplyObject(reply);
489 SCLogInfo(
"Error when fetching reply: %s (%d)",
493 switch (redis->err) {
496 SCLogInfo(
"Reopening connection to redis server");
497 SCConfLogReopenSyncRedis(file_ctx);
500 SCLogInfo(
"Reconnected to redis server");
501 redisAppendCommand(redis, file_ctx->redis_setup.format,
502 file_ctx->redis_setup.command, file_ctx->redis_setup.key,
507 SCLogInfo(
"Unable to reconnect to redis server");
521 redisReply *reply = redisCommand(redis, file_ctx->redis_setup.format,
522 file_ctx->redis_setup.command, file_ctx->redis_setup.key,
string);
525 switch (reply->type) {
526 case REDIS_REPLY_ERROR:
528 SCConfLogReopenSyncRedis(file_ctx);
530 case REDIS_REPLY_INTEGER:
531 SCLogDebug(
"Redis integer %lld", reply->integer);
534 case REDIS_REPLY_STRING:
539 SCLogError(
"Redis default triggered with %d", reply->type);
540 SCConfLogReopenSyncRedis(file_ctx);
543 freeReplyObject(reply);
545 SCConfLogReopenSyncRedis(file_ctx);
559 int LogFileWriteRedis(
void *lf_ctx,
const char *
string,
size_t string_len)
562 if (file_ctx == NULL) {
568 if (file_ctx->redis_setup.is_async) {
569 return SCLogRedisWriteAsync(file_ctx,
string, string_len);
573 if (! file_ctx->redis_setup.is_async) {
574 return SCLogRedisWriteSync(file_ctx,
string);
584 int SCConfLogOpenRedis(
SCConfNode *redis_node,
void *lf_ctx)
589 FatalError(
"redis does not support threaded output");
592 const char *redis_port = NULL;
593 const char *redis_mode = NULL;
608 if (!log_ctx->redis_setup.server) {
609 log_ctx->redis_setup.server = redis_default_server;
610 SCLogInfo(
"Using default redis server (127.0.0.1)");
616 if (!log_ctx->redis_setup.key) {
617 log_ctx->redis_setup.key = redis_default_key;
619 if (log_ctx->redis_setup.username && !log_ctx->redis_setup.password) {
620 SCLogWarning(
"Redis username configured without password; ignoring username.");
621 log_ctx->redis_setup.username = NULL;
624 #ifndef HAVE_LIBEVENT
629 #endif //ifndef HAVE_LIBEVENT
631 log_ctx->redis_setup.is_async = is_async;
632 log_ctx->redis_setup.batch_size = 0;
640 if (ret && enabled) {
643 log_ctx->redis_setup.batch_size = val;
645 log_ctx->redis_setup.batch_size = 10;
650 log_ctx->redis_setup.batch_size = 0;
653 log_ctx->redis_setup.format = redis_default_format;
654 if (!strcmp(redis_mode,
"list") || !strcmp(redis_mode,
"lpush")) {
655 log_ctx->redis_setup.command = redis_lpush_cmd;
656 }
else if(!strcmp(redis_mode,
"rpush")){
657 log_ctx->redis_setup.command = redis_rpush_cmd;
658 }
else if(!strcmp(redis_mode,
"channel") || !strcmp(redis_mode,
"publish")) {
659 log_ctx->redis_setup.command = redis_publish_cmd;
660 }
else if (!strcmp(redis_mode,
"stream") || !strcmp(redis_mode,
"xadd")) {
663 log_ctx->redis_setup.command = redis_xadd_cmd;
664 log_ctx->redis_setup.format = redis_stream_format;
669 maxlen = REDIS_MAX_STREAM_LENGTH_DEFAULT;
675 log_ctx->redis_setup.stream_format =
SCCalloc(100,
sizeof(
char));
676 snprintf(log_ctx->redis_setup.stream_format, 100, redis_stream_format_maxlen_tmpl,
"%s",
677 "%s", exact ?
'=' :
'~', maxlen,
"%s");
678 log_ctx->redis_setup.format = log_ctx->redis_setup.stream_format;
681 FatalError(
"Invalid redis mode: %s", redis_mode);
685 if (!log_ctx->redis_setup.server) {
686 FatalError(
"Error allocating redis server string");
688 if (
StringParseUint16(&log_ctx->redis_setup.port, 10, 0, (
const char *)redis_port) < 0) {
689 FatalError(
"Invalid value for redis port: %s", redis_port);
691 log_ctx->
Close = SCLogFileCloseRedis;
695 log_ctx->redis = SCLogRedisContextAsyncAlloc();
699 log_ctx->redis = SCLogRedisContextAlloc();
700 SCConfLogReopenSyncRedis(log_ctx);
710 SCLogRedisContext *
ctx = log_ctx->redis;
715 if (log_ctx->redis_setup.is_async) {
716 #if HAVE_LIBEVENT == 1
718 if (
ctx->state != REDIS_STATE_DISCONNECTED) {
719 SCLogAsyncRedisSendQuit(
ctx);
721 if (
ctx->ev_base != NULL) {
722 event_base_free(
ctx->ev_base);
730 if (!log_ctx->redis_setup.is_async) {
734 for (i = 0; i <
ctx->batch_count; i++) {
735 redisGetReply(
ctx->sync, (
void **)&reply);
737 freeReplyObject(reply);
740 redisFree(
ctx->sync);
744 ctx->batch_count = 0;
752 #endif //#ifdef HAVE_LIBHIREDIS