31 #ifdef HAVE_LIBHIREDIS
33 #ifdef HAVE_LIBEVENT_PTHREADS
34 #include <event2/thread.h>
37 static const char * redis_lpush_cmd =
"LPUSH";
38 static const char * redis_rpush_cmd =
"RPUSH";
39 static const char * redis_publish_cmd =
"PUBLISH";
40 static const char * redis_default_key =
"suricata";
41 static const char * redis_default_server =
"127.0.0.1";
43 static int SCConfLogReopenSyncRedis(
LogFileCtx *log_ctx);
44 static void SCLogFileCloseRedis(
LogFileCtx *log_ctx);
51 #ifdef HAVE_LIBEVENT_PTHREADS
52 evthread_use_pthreads();
58 static SCLogRedisContext *SCLogRedisContextAlloc(
void)
60 SCLogRedisContext* ctx = (SCLogRedisContext*)
SCCalloc(1,
sizeof(SCLogRedisContext));
78 static int SCConfLogReopenAsyncRedis(
LogFileCtx *log_ctx);
79 #include <hiredis/adapters/libevent.h>
83 static SCLogRedisContext *SCLogRedisContextAsyncAlloc(
void)
85 SCLogRedisContext* ctx = (SCLogRedisContext*)
SCCalloc(1,
sizeof(SCLogRedisContext));
106 static void SCRedisAsyncCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
108 redisReply *reply = r;
110 SCLogRedisContext *ctx = log_ctx->redis;
113 if (ctx->connected > 0)
114 SCLogInfo(
"Missing reply from redis, disconnected.");
118 event_base_loopbreak(ctx->ev_base);
128 static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
130 redisReply *reply = r;
131 SCLogRedisContext * ctx = privdata;
134 if (ctx->connected == 0) {
141 if (ctx->tried == 0) {
144 ctx->tried = time(NULL);
146 event_base_loopbreak(ctx->ev_base);
154 static void SCLogAsyncRedisSendEcho(SCLogRedisContext * ctx)
156 redisAsyncCommand(ctx->async, SCRedisAsyncEchoCommandCallback, ctx,
"ECHO suricata");
157 event_base_dispatch(ctx->ev_base);
166 static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
176 static void SCLogAsyncRedisSendQuit(SCLogRedisContext * ctx)
178 if (ctx->connected) {
179 redisAsyncCommand(ctx->async, SCRedisAsyncQuitCommandCallback, ctx,
"QUIT");
180 SCLogInfo(
"QUIT Command sent to redis. Connection will terminate!");
183 redisAsyncFree(ctx->async);
184 event_base_dispatch(ctx->ev_base);
186 event_base_free(ctx->ev_base);
194 static int SCConfLogReopenAsyncRedis(
LogFileCtx *log_ctx)
196 SCLogRedisContext * ctx = log_ctx->redis;
197 const char *redis_server = log_ctx->redis_setup.server;
198 int redis_port = log_ctx->redis_setup.port;
201 if (ctx->tried >= time(NULL)) {
205 if (strchr(redis_server,
'/') == NULL) {
206 ctx->async = redisAsyncConnect(redis_server, redis_port);
208 ctx->async = redisAsyncConnectUnix(redis_server);
211 if (ctx->ev_base != NULL) {
212 event_base_free(ctx->ev_base);
216 if (ctx->async == NULL) {
218 ctx->tried = time(NULL);
222 if (ctx->async != NULL && ctx->async->err) {
224 ctx->tried = time(NULL);
228 ctx->ev_base = event_base_new();
230 if (ctx->ev_base == NULL) {
231 ctx->tried = time(NULL);
232 redisAsyncFree(ctx->async);
237 redisLibeventAttach(ctx->async, ctx->ev_base);
239 log_ctx->redis = ctx;
240 log_ctx->
Close = SCLogFileCloseRedis;
249 static int SCLogRedisWriteAsync(
LogFileCtx *file_ctx,
const char *
string,
size_t string_len)
251 SCLogRedisContext *ctx = file_ctx->redis;
253 if (! ctx->connected) {
254 if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
257 if (ctx->tried == 0) {
260 SCLogAsyncRedisSendEcho(ctx);
263 if (!ctx->connected) {
267 if (ctx->async == NULL) {
271 redisAsyncCommand(ctx->async,
272 SCRedisAsyncCommandCallback,
275 file_ctx->redis_setup.command,
276 file_ctx->redis_setup.key,
279 event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
284 #endif// HAVE_LIBEVENT
289 static int SCConfLogReopenSyncRedis(
LogFileCtx *log_ctx)
291 SCLogRedisContext * ctx = log_ctx->redis;
294 if (ctx->tried >= time(NULL)) {
298 const char *redis_server = log_ctx->redis_setup.server;
299 int redis_port = log_ctx->redis_setup.port;
301 if (ctx->sync != NULL) {
302 redisFree(ctx->sync);
305 if (strchr(redis_server,
'/') == NULL) {
306 ctx->sync = redisConnect(redis_server, redis_port);
308 ctx->sync = redisConnectUnix(redis_server);
310 if (ctx->sync == NULL) {
312 ctx->tried = time(NULL);
315 if (ctx->sync->err) {
317 redisFree(ctx->sync);
319 ctx->tried = time(NULL);
322 SCLogInfo(
"Connected to redis server [%s].", log_ctx->redis_setup.server);
324 log_ctx->redis = ctx;
325 log_ctx->
Close = SCLogFileCloseRedis;
332 static int SCLogRedisWriteSync(
LogFileCtx *file_ctx,
const char *
string)
334 SCLogRedisContext * ctx = file_ctx->redis;
336 redisContext *redis = ctx->sync;
338 SCConfLogReopenSyncRedis(file_ctx);
341 SCLogDebug(
"Redis after re-open is not available.");
347 if (file_ctx->redis_setup.batch_size) {
348 redisAppendCommand(redis,
"%s %s %s",
349 file_ctx->redis_setup.command,
350 file_ctx->redis_setup.key,
352 time_t now = time(NULL);
353 if ((ctx->batch_count == file_ctx->redis_setup.batch_size) || (ctx->last_push < now)) {
356 int batch_size = ctx->batch_count;
357 ctx->batch_count = 0;
358 ctx->last_push = now;
359 for (i = 0; i <= batch_size; i++) {
360 if (redisGetReply(redis, (
void **)&reply) == REDIS_OK) {
361 freeReplyObject(reply);
365 SCLogInfo(
"Error when fetching reply: %s (%d)",
369 switch (redis->err) {
372 SCLogInfo(
"Reopening connection to redis server");
373 SCConfLogReopenSyncRedis(file_ctx);
376 SCLogInfo(
"Reconnected to redis server");
377 redisAppendCommand(redis,
"%s %s %s",
378 file_ctx->redis_setup.command,
379 file_ctx->redis_setup.key,
384 SCLogInfo(
"Unable to reconnect to redis server");
390 "Unsupported error code %d",
400 redisReply *reply = redisCommand(redis,
"%s %s %s",
401 file_ctx->redis_setup.command,
402 file_ctx->redis_setup.key,
406 switch (reply->type) {
407 case REDIS_REPLY_ERROR:
409 SCConfLogReopenSyncRedis(file_ctx);
411 case REDIS_REPLY_INTEGER:
412 SCLogDebug(
"Redis integer %lld", reply->integer);
417 "Redis default triggered with %d", reply->type);
418 SCConfLogReopenSyncRedis(file_ctx);
421 freeReplyObject(reply);
423 SCConfLogReopenSyncRedis(file_ctx);
437 int LogFileWriteRedis(
void *lf_ctx,
const char *
string,
size_t string_len)
440 if (file_ctx == NULL) {
446 if (file_ctx->redis_setup.is_async) {
447 return SCLogRedisWriteAsync(file_ctx,
string, string_len);
451 if (! file_ctx->redis_setup.is_async) {
452 return SCLogRedisWriteSync(file_ctx,
string);
462 int SCConfLogOpenRedis(
ConfNode *redis_node,
void *lf_ctx)
470 const char *redis_port = NULL;
471 const char *redis_mode = NULL;
484 if (!log_ctx->redis_setup.server) {
485 log_ctx->redis_setup.server = redis_default_server;
486 SCLogInfo(
"Using default redis server (127.0.0.1)");
492 if (!log_ctx->redis_setup.key) {
493 log_ctx->redis_setup.key = redis_default_key;
496 #ifndef HAVE_LIBEVENT
501 #endif //ifndef HAVE_LIBEVENT
503 log_ctx->redis_setup.is_async = is_async;
504 log_ctx->redis_setup.batch_size = 0;
512 if (ret && enabled) {
515 log_ctx->redis_setup.batch_size = val;
517 log_ctx->redis_setup.batch_size = 10;
522 log_ctx->redis_setup.batch_size = 0;
525 if (!strcmp(redis_mode,
"list") || !strcmp(redis_mode,
"lpush")) {
526 log_ctx->redis_setup.command = redis_lpush_cmd;
527 }
else if(!strcmp(redis_mode,
"rpush")){
528 log_ctx->redis_setup.command = redis_rpush_cmd;
529 }
else if(!strcmp(redis_mode,
"channel") || !strcmp(redis_mode,
"publish")) {
530 log_ctx->redis_setup.command = redis_publish_cmd;
536 if (!log_ctx->redis_setup.server) {
539 if (
StringParseUint16(&log_ctx->redis_setup.port, 10, 0, (
const char *)redis_port) < 0) {
542 log_ctx->
Close = SCLogFileCloseRedis;
546 log_ctx->redis = SCLogRedisContextAsyncAlloc();
550 log_ctx->redis = SCLogRedisContextAlloc();
551 SCConfLogReopenSyncRedis(log_ctx);
561 SCLogRedisContext * ctx = log_ctx->redis;
566 if (log_ctx->redis_setup.is_async) {
567 #if HAVE_LIBEVENT == 1
569 if (ctx->connected > 0) {
570 SCLogAsyncRedisSendQuit(ctx);
572 if (ctx->ev_base != NULL) {
573 event_base_free(ctx->ev_base);
581 if (!log_ctx->redis_setup.is_async) {
585 for (i = 0; i < ctx->batch_count; i++) {
586 redisGetReply(ctx->sync, (
void **)&reply);
588 freeReplyObject(reply);
591 redisFree(ctx->sync);
595 ctx->batch_count = 0;
603 #endif //#ifdef HAVE_LIBHIREDIS