31 #ifdef HAVE_LIBHIREDIS
33 #ifdef HAVE_LIBEVENT_PTHREADS
34 #include <event2/thread.h>
37 static const char * redis_lpush_cmd =
"LPUSH";
38 static const char * redis_rpush_cmd =
"RPUSH";
39 static const char * redis_publish_cmd =
"PUBLISH";
40 static const char * redis_default_key =
"suricata";
41 static const char * redis_default_server =
"127.0.0.1";
43 static int SCConfLogReopenSyncRedis(
LogFileCtx *log_ctx);
44 static void SCLogFileCloseRedis(
LogFileCtx *log_ctx);
49 void SCLogRedisInit(
void)
51 #ifdef HAVE_LIBEVENT_PTHREADS
52 evthread_use_pthreads();
58 static SCLogRedisContext *SCLogRedisContextAlloc(
void)
60 SCLogRedisContext*
ctx = (SCLogRedisContext*)
SCCalloc(1,
sizeof(SCLogRedisContext));
62 FatalError(
"Unable to allocate redis context");
78 static int SCConfLogReopenAsyncRedis(
LogFileCtx *log_ctx);
79 #include <hiredis/adapters/libevent.h>
83 static SCLogRedisContext *SCLogRedisContextAsyncAlloc(
void)
85 SCLogRedisContext*
ctx = (SCLogRedisContext*)
SCCalloc(1,
sizeof(SCLogRedisContext));
87 FatalError(
"Unable to allocate redis context");
106 static void SCRedisAsyncCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
108 redisReply *reply = r;
110 SCLogRedisContext *
ctx = log_ctx->redis;
113 if (
ctx->connected > 0)
114 SCLogInfo(
"Missing reply from redis, disconnected.");
118 event_base_loopbreak(
ctx->ev_base);
128 static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
130 redisReply *reply = r;
131 SCLogRedisContext *
ctx = privdata;
134 if (
ctx->connected == 0) {
141 if (
ctx->tried == 0) {
142 SCLogWarning(
"Failed to connect to Redis... (will keep trying)");
144 ctx->tried = time(NULL);
146 event_base_loopbreak(
ctx->ev_base);
154 static void SCLogAsyncRedisSendEcho(SCLogRedisContext *
ctx)
156 redisAsyncCommand(
ctx->async, SCRedisAsyncEchoCommandCallback,
ctx,
"ECHO suricata");
157 event_base_dispatch(
ctx->ev_base);
166 static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
176 static void SCLogAsyncRedisSendQuit(SCLogRedisContext *
ctx)
178 if (
ctx->connected) {
179 redisAsyncCommand(
ctx->async, SCRedisAsyncQuitCommandCallback,
ctx,
"QUIT");
180 SCLogInfo(
"QUIT Command sent to redis. Connection will terminate!");
183 redisAsyncFree(
ctx->async);
184 event_base_dispatch(
ctx->ev_base);
186 event_base_free(
ctx->ev_base);
194 static int SCConfLogReopenAsyncRedis(
LogFileCtx *log_ctx)
196 SCLogRedisContext *
ctx = log_ctx->redis;
197 const char *redis_server = log_ctx->redis_setup.server;
198 int redis_port = log_ctx->redis_setup.port;
201 if (
ctx->tried >= time(NULL)) {
205 if (strchr(redis_server,
'/') == NULL) {
206 ctx->async = redisAsyncConnect(redis_server, redis_port);
208 ctx->async = redisAsyncConnectUnix(redis_server);
211 if (
ctx->ev_base != NULL) {
212 event_base_free(
ctx->ev_base);
216 if (
ctx->async == NULL) {
218 ctx->tried = time(NULL);
222 if (
ctx->async != NULL &&
ctx->async->err) {
223 SCLogError(
"Error setting to redis async: [%s].",
ctx->async->errstr);
224 ctx->tried = time(NULL);
228 ctx->ev_base = event_base_new();
230 if (
ctx->ev_base == NULL) {
231 ctx->tried = time(NULL);
232 redisAsyncFree(
ctx->async);
237 redisLibeventAttach(
ctx->async,
ctx->ev_base);
239 log_ctx->redis =
ctx;
240 log_ctx->
Close = SCLogFileCloseRedis;
249 static int SCLogRedisWriteAsync(
LogFileCtx *file_ctx,
const char *
string,
size_t string_len)
251 SCLogRedisContext *
ctx = file_ctx->redis;
253 if (!
ctx->connected) {
254 if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
257 if (
ctx->tried == 0) {
260 SCLogAsyncRedisSendEcho(
ctx);
263 if (!
ctx->connected) {
267 if (
ctx->async == NULL) {
271 redisAsyncCommand(
ctx->async,
272 SCRedisAsyncCommandCallback,
275 file_ctx->redis_setup.command,
276 file_ctx->redis_setup.key,
279 event_base_loop(
ctx->ev_base, EVLOOP_NONBLOCK);
284 #endif// HAVE_LIBEVENT
289 static int SCConfLogReopenSyncRedis(
LogFileCtx *log_ctx)
291 SCLogRedisContext *
ctx = log_ctx->redis;
294 if (
ctx->tried >= time(NULL)) {
298 const char *redis_server = log_ctx->redis_setup.server;
299 int redis_port = log_ctx->redis_setup.port;
301 if (
ctx->sync != NULL) {
302 redisFree(
ctx->sync);
305 if (strchr(redis_server,
'/') == NULL) {
306 ctx->sync = redisConnect(redis_server, redis_port);
308 ctx->sync = redisConnectUnix(redis_server);
310 if (
ctx->sync == NULL) {
311 SCLogError(
"Error connecting to redis server.");
312 ctx->tried = time(NULL);
315 if (
ctx->sync->err) {
316 SCLogError(
"Error connecting to redis server: [%s].",
ctx->sync->errstr);
317 redisFree(
ctx->sync);
319 ctx->tried = time(NULL);
322 SCLogInfo(
"Connected to redis server [%s].", log_ctx->redis_setup.server);
324 log_ctx->redis =
ctx;
325 log_ctx->
Close = SCLogFileCloseRedis;
332 static int SCLogRedisWriteSync(
LogFileCtx *file_ctx,
const char *
string)
334 SCLogRedisContext *
ctx = file_ctx->redis;
336 redisContext *redis =
ctx->sync;
338 SCConfLogReopenSyncRedis(file_ctx);
341 SCLogDebug(
"Redis after re-open is not available.");
347 if (file_ctx->redis_setup.batch_size) {
348 redisAppendCommand(redis,
"%s %s %s",
349 file_ctx->redis_setup.command,
350 file_ctx->redis_setup.key,
352 time_t now = time(NULL);
353 if ((
ctx->batch_count == file_ctx->redis_setup.batch_size) || (
ctx->last_push < now)) {
356 int batch_size =
ctx->batch_count;
357 ctx->batch_count = 0;
358 ctx->last_push = now;
359 for (i = 0; i <= batch_size; i++) {
360 if (redisGetReply(redis, (
void **)&reply) == REDIS_OK) {
361 freeReplyObject(reply);
365 SCLogInfo(
"Error when fetching reply: %s (%d)",
369 switch (redis->err) {
372 SCLogInfo(
"Reopening connection to redis server");
373 SCConfLogReopenSyncRedis(file_ctx);
376 SCLogInfo(
"Reconnected to redis server");
377 redisAppendCommand(redis,
"%s %s %s",
378 file_ctx->redis_setup.command,
379 file_ctx->redis_setup.key,
384 SCLogInfo(
"Unable to reconnect to redis server");
398 redisReply *reply = redisCommand(redis,
"%s %s %s",
399 file_ctx->redis_setup.command,
400 file_ctx->redis_setup.key,
404 switch (reply->type) {
405 case REDIS_REPLY_ERROR:
407 SCConfLogReopenSyncRedis(file_ctx);
409 case REDIS_REPLY_INTEGER:
410 SCLogDebug(
"Redis integer %lld", reply->integer);
414 SCLogError(
"Redis default triggered with %d", reply->type);
415 SCConfLogReopenSyncRedis(file_ctx);
418 freeReplyObject(reply);
420 SCConfLogReopenSyncRedis(file_ctx);
434 int LogFileWriteRedis(
void *lf_ctx,
const char *
string,
size_t string_len)
437 if (file_ctx == NULL) {
443 if (file_ctx->redis_setup.is_async) {
444 return SCLogRedisWriteAsync(file_ctx,
string, string_len);
448 if (! file_ctx->redis_setup.is_async) {
449 return SCLogRedisWriteSync(file_ctx,
string);
459 int SCConfLogOpenRedis(
ConfNode *redis_node,
void *lf_ctx)
464 FatalError(
"redis does not support threaded output");
467 const char *redis_port = NULL;
468 const char *redis_mode = NULL;
481 if (!log_ctx->redis_setup.server) {
482 log_ctx->redis_setup.server = redis_default_server;
483 SCLogInfo(
"Using default redis server (127.0.0.1)");
489 if (!log_ctx->redis_setup.key) {
490 log_ctx->redis_setup.key = redis_default_key;
493 #ifndef HAVE_LIBEVENT
498 #endif //ifndef HAVE_LIBEVENT
500 log_ctx->redis_setup.is_async = is_async;
501 log_ctx->redis_setup.batch_size = 0;
509 if (ret && enabled) {
512 log_ctx->redis_setup.batch_size = val;
514 log_ctx->redis_setup.batch_size = 10;
519 log_ctx->redis_setup.batch_size = 0;
522 if (!strcmp(redis_mode,
"list") || !strcmp(redis_mode,
"lpush")) {
523 log_ctx->redis_setup.command = redis_lpush_cmd;
524 }
else if(!strcmp(redis_mode,
"rpush")){
525 log_ctx->redis_setup.command = redis_rpush_cmd;
526 }
else if(!strcmp(redis_mode,
"channel") || !strcmp(redis_mode,
"publish")) {
527 log_ctx->redis_setup.command = redis_publish_cmd;
533 if (!log_ctx->redis_setup.server) {
534 FatalError(
"Error allocating redis server string");
536 if (
StringParseUint16(&log_ctx->redis_setup.port, 10, 0, (
const char *)redis_port) < 0) {
537 FatalError(
"Invalid value for redis port: %s", redis_port);
539 log_ctx->
Close = SCLogFileCloseRedis;
543 log_ctx->redis = SCLogRedisContextAsyncAlloc();
547 log_ctx->redis = SCLogRedisContextAlloc();
548 SCConfLogReopenSyncRedis(log_ctx);
558 SCLogRedisContext *
ctx = log_ctx->redis;
563 if (log_ctx->redis_setup.is_async) {
564 #if HAVE_LIBEVENT == 1
566 if (
ctx->connected > 0) {
567 SCLogAsyncRedisSendQuit(
ctx);
569 if (
ctx->ev_base != NULL) {
570 event_base_free(
ctx->ev_base);
578 if (!log_ctx->redis_setup.is_async) {
582 for (i = 0; i <
ctx->batch_count; i++) {
583 redisGetReply(
ctx->sync, (
void **)&reply);
585 freeReplyObject(reply);
588 redisFree(
ctx->sync);
592 ctx->batch_count = 0;
600 #endif //#ifdef HAVE_LIBHIREDIS