31 #ifdef HAVE_LIBHIREDIS
33 #ifdef HAVE_LIBEVENT_PTHREADS
34 #include <event2/thread.h>
37 static const char *redis_lpush_cmd =
"LPUSH";
38 static const char *redis_rpush_cmd =
"RPUSH";
39 static const char *redis_publish_cmd =
"PUBLISH";
40 static const char *redis_xadd_cmd =
"XADD";
41 static const char *redis_default_key =
"suricata";
42 static const char *redis_default_server =
"127.0.0.1";
43 static const char *redis_default_format =
"%s %s %s";
44 static const char *redis_stream_format =
"%s %s * eve %s";
45 static const char *redis_stream_format_maxlen_tmpl =
"%s %s MAXLEN %c %d * eve %s";
47 static int SCConfLogReopenSyncRedis(
LogFileCtx *log_ctx);
48 static void SCLogFileCloseRedis(
LogFileCtx *log_ctx);
50 #define REDIS_MAX_STREAM_LENGTH_DEFAULT 100000
55 void SCLogRedisInit(
void)
57 #ifdef HAVE_LIBEVENT_PTHREADS
58 evthread_use_pthreads();
64 static SCLogRedisContext *SCLogRedisContextAlloc(
void)
66 SCLogRedisContext*
ctx = (SCLogRedisContext*)
SCCalloc(1,
sizeof(SCLogRedisContext));
68 FatalError(
"Unable to allocate redis context");
84 static int SCConfLogReopenAsyncRedis(
LogFileCtx *log_ctx);
85 #include <hiredis/adapters/libevent.h>
89 static SCLogRedisContext *SCLogRedisContextAsyncAlloc(
void)
91 SCLogRedisContext*
ctx = (SCLogRedisContext*)
SCCalloc(1,
sizeof(SCLogRedisContext));
93 FatalError(
"Unable to allocate redis context");
100 ctx->batch_count = 0;
112 static void SCRedisAsyncCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
114 redisReply *reply = r;
116 SCLogRedisContext *
ctx = log_ctx->redis;
119 if (
ctx->connected > 0)
120 SCLogInfo(
"Missing reply from redis, disconnected.");
124 event_base_loopbreak(
ctx->ev_base);
134 static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
136 redisReply *reply = r;
137 SCLogRedisContext *
ctx = privdata;
140 if (
ctx->connected == 0) {
147 if (
ctx->tried == 0) {
148 SCLogWarning(
"Failed to connect to Redis... (will keep trying)");
150 ctx->tried = time(NULL);
152 event_base_loopbreak(
ctx->ev_base);
160 static void SCLogAsyncRedisSendEcho(SCLogRedisContext *
ctx)
162 redisAsyncCommand(
ctx->async, SCRedisAsyncEchoCommandCallback,
ctx,
"ECHO suricata");
163 event_base_dispatch(
ctx->ev_base);
172 static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
182 static void SCLogAsyncRedisSendQuit(SCLogRedisContext *
ctx)
184 if (
ctx->connected) {
185 redisAsyncCommand(
ctx->async, SCRedisAsyncQuitCommandCallback,
ctx,
"QUIT");
186 SCLogInfo(
"QUIT Command sent to redis. Connection will terminate!");
189 redisAsyncFree(
ctx->async);
190 event_base_dispatch(
ctx->ev_base);
192 event_base_free(
ctx->ev_base);
200 static int SCConfLogReopenAsyncRedis(
LogFileCtx *log_ctx)
202 SCLogRedisContext *
ctx = log_ctx->redis;
203 const char *redis_server = log_ctx->redis_setup.server;
204 int redis_port = log_ctx->redis_setup.port;
207 if (
ctx->tried >= time(NULL)) {
211 if (strchr(redis_server,
'/') == NULL) {
212 ctx->async = redisAsyncConnect(redis_server, redis_port);
214 ctx->async = redisAsyncConnectUnix(redis_server);
217 if (
ctx->ev_base != NULL) {
218 event_base_free(
ctx->ev_base);
222 if (
ctx->async == NULL) {
224 ctx->tried = time(NULL);
228 if (
ctx->async != NULL &&
ctx->async->err) {
229 SCLogError(
"Error setting to redis async: [%s].",
ctx->async->errstr);
230 ctx->tried = time(NULL);
234 ctx->ev_base = event_base_new();
236 if (
ctx->ev_base == NULL) {
237 ctx->tried = time(NULL);
238 redisAsyncFree(
ctx->async);
243 redisLibeventAttach(
ctx->async,
ctx->ev_base);
245 log_ctx->redis =
ctx;
246 log_ctx->
Close = SCLogFileCloseRedis;
255 static int SCLogRedisWriteAsync(
LogFileCtx *file_ctx,
const char *
string,
size_t string_len)
257 SCLogRedisContext *
ctx = file_ctx->redis;
259 if (!
ctx->connected) {
260 if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
263 if (
ctx->tried == 0) {
266 SCLogAsyncRedisSendEcho(
ctx);
269 if (!
ctx->connected) {
273 if (
ctx->async == NULL) {
277 redisAsyncCommand(
ctx->async, SCRedisAsyncCommandCallback, file_ctx,
278 file_ctx->redis_setup.format, file_ctx->redis_setup.command, file_ctx->redis_setup.key,
281 event_base_loop(
ctx->ev_base, EVLOOP_NONBLOCK);
286 #endif// HAVE_LIBEVENT
291 static int SCConfLogReopenSyncRedis(
LogFileCtx *log_ctx)
293 SCLogRedisContext *
ctx = log_ctx->redis;
296 if (
ctx->tried >= time(NULL)) {
300 const char *redis_server = log_ctx->redis_setup.server;
301 int redis_port = log_ctx->redis_setup.port;
303 if (
ctx->sync != NULL) {
304 redisFree(
ctx->sync);
307 if (strchr(redis_server,
'/') == NULL) {
308 ctx->sync = redisConnect(redis_server, redis_port);
310 ctx->sync = redisConnectUnix(redis_server);
312 if (
ctx->sync == NULL) {
313 SCLogError(
"Error connecting to redis server.");
314 ctx->tried = time(NULL);
317 if (
ctx->sync->err) {
318 SCLogError(
"Error connecting to redis server: [%s].",
ctx->sync->errstr);
319 redisFree(
ctx->sync);
321 ctx->tried = time(NULL);
324 SCLogInfo(
"Connected to redis server [%s].", log_ctx->redis_setup.server);
326 log_ctx->redis =
ctx;
327 log_ctx->
Close = SCLogFileCloseRedis;
334 static int SCLogRedisWriteSync(
LogFileCtx *file_ctx,
const char *
string)
336 SCLogRedisContext *
ctx = file_ctx->redis;
338 redisContext *redis =
ctx->sync;
340 SCConfLogReopenSyncRedis(file_ctx);
343 SCLogDebug(
"Redis after re-open is not available.");
349 if (file_ctx->redis_setup.batch_size) {
350 redisAppendCommand(redis, file_ctx->redis_setup.format, file_ctx->redis_setup.command,
351 file_ctx->redis_setup.key,
string);
352 time_t now = time(NULL);
353 if ((
ctx->batch_count == file_ctx->redis_setup.batch_size) || (
ctx->last_push < now)) {
356 int batch_size =
ctx->batch_count;
357 ctx->batch_count = 0;
358 ctx->last_push = now;
359 for (i = 0; i <= batch_size; i++) {
360 if (redisGetReply(redis, (
void **)&reply) == REDIS_OK) {
361 freeReplyObject(reply);
365 SCLogInfo(
"Error when fetching reply: %s (%d)",
369 switch (redis->err) {
372 SCLogInfo(
"Reopening connection to redis server");
373 SCConfLogReopenSyncRedis(file_ctx);
376 SCLogInfo(
"Reconnected to redis server");
377 redisAppendCommand(redis, file_ctx->redis_setup.format,
378 file_ctx->redis_setup.command, file_ctx->redis_setup.key,
383 SCLogInfo(
"Unable to reconnect to redis server");
397 redisReply *reply = redisCommand(redis, file_ctx->redis_setup.format,
398 file_ctx->redis_setup.command, file_ctx->redis_setup.key,
string);
401 switch (reply->type) {
402 case REDIS_REPLY_ERROR:
404 SCConfLogReopenSyncRedis(file_ctx);
406 case REDIS_REPLY_INTEGER:
407 SCLogDebug(
"Redis integer %lld", reply->integer);
410 case REDIS_REPLY_STRING:
415 SCLogError(
"Redis default triggered with %d", reply->type);
416 SCConfLogReopenSyncRedis(file_ctx);
419 freeReplyObject(reply);
421 SCConfLogReopenSyncRedis(file_ctx);
435 int LogFileWriteRedis(
void *lf_ctx,
const char *
string,
size_t string_len)
438 if (file_ctx == NULL) {
444 if (file_ctx->redis_setup.is_async) {
445 return SCLogRedisWriteAsync(file_ctx,
string, string_len);
449 if (! file_ctx->redis_setup.is_async) {
450 return SCLogRedisWriteSync(file_ctx,
string);
460 int SCConfLogOpenRedis(
ConfNode *redis_node,
void *lf_ctx)
465 FatalError(
"redis does not support threaded output");
468 const char *redis_port = NULL;
469 const char *redis_mode = NULL;
482 if (!log_ctx->redis_setup.server) {
483 log_ctx->redis_setup.server = redis_default_server;
484 SCLogInfo(
"Using default redis server (127.0.0.1)");
490 if (!log_ctx->redis_setup.key) {
491 log_ctx->redis_setup.key = redis_default_key;
494 #ifndef HAVE_LIBEVENT
499 #endif //ifndef HAVE_LIBEVENT
501 log_ctx->redis_setup.is_async = is_async;
502 log_ctx->redis_setup.batch_size = 0;
510 if (ret && enabled) {
513 log_ctx->redis_setup.batch_size = val;
515 log_ctx->redis_setup.batch_size = 10;
520 log_ctx->redis_setup.batch_size = 0;
523 log_ctx->redis_setup.format = redis_default_format;
524 if (!strcmp(redis_mode,
"list") || !strcmp(redis_mode,
"lpush")) {
525 log_ctx->redis_setup.command = redis_lpush_cmd;
526 }
else if(!strcmp(redis_mode,
"rpush")){
527 log_ctx->redis_setup.command = redis_rpush_cmd;
528 }
else if(!strcmp(redis_mode,
"channel") || !strcmp(redis_mode,
"publish")) {
529 log_ctx->redis_setup.command = redis_publish_cmd;
530 }
else if (!strcmp(redis_mode,
"stream") || !strcmp(redis_mode,
"xadd")) {
533 log_ctx->redis_setup.command = redis_xadd_cmd;
534 log_ctx->redis_setup.format = redis_stream_format;
539 maxlen = REDIS_MAX_STREAM_LENGTH_DEFAULT;
545 log_ctx->redis_setup.stream_format =
SCCalloc(100,
sizeof(
char));
546 snprintf(log_ctx->redis_setup.stream_format, 100, redis_stream_format_maxlen_tmpl,
"%s",
547 "%s", exact ?
'=' :
'~', maxlen,
"%s");
548 log_ctx->redis_setup.format = log_ctx->redis_setup.stream_format;
551 FatalError(
"Invalid redis mode: %s", redis_mode);
555 if (!log_ctx->redis_setup.server) {
556 FatalError(
"Error allocating redis server string");
558 if (
StringParseUint16(&log_ctx->redis_setup.port, 10, 0, (
const char *)redis_port) < 0) {
559 FatalError(
"Invalid value for redis port: %s", redis_port);
561 log_ctx->
Close = SCLogFileCloseRedis;
565 log_ctx->redis = SCLogRedisContextAsyncAlloc();
569 log_ctx->redis = SCLogRedisContextAlloc();
570 SCConfLogReopenSyncRedis(log_ctx);
580 SCLogRedisContext *
ctx = log_ctx->redis;
585 if (log_ctx->redis_setup.is_async) {
586 #if HAVE_LIBEVENT == 1
588 if (
ctx->connected > 0) {
589 SCLogAsyncRedisSendQuit(
ctx);
591 if (
ctx->ev_base != NULL) {
592 event_base_free(
ctx->ev_base);
600 if (!log_ctx->redis_setup.is_async) {
604 for (i = 0; i <
ctx->batch_count; i++) {
605 redisGetReply(
ctx->sync, (
void **)&reply);
607 freeReplyObject(reply);
610 redisFree(
ctx->sync);
614 ctx->batch_count = 0;
622 #endif //#ifdef HAVE_LIBHIREDIS