32 #ifdef HAVE_LIBHIREDIS
34 #ifdef HAVE_LIBEVENT_PTHREADS
35 #include <event2/thread.h>
38 static const char * redis_lpush_cmd =
"LPUSH";
39 static const char * redis_rpush_cmd =
"RPUSH";
40 static const char * redis_publish_cmd =
"PUBLISH";
41 static const char * redis_default_key =
"suricata";
42 static const char * redis_default_server =
"127.0.0.1";
44 static int SCConfLogReopenSyncRedis(
LogFileCtx *log_ctx);
45 static void SCLogFileCloseRedis(
LogFileCtx *log_ctx);
50 void SCLogRedisInit(
void)
52 #ifdef HAVE_LIBEVENT_PTHREADS
53 evthread_use_pthreads();
59 static SCLogRedisContext *SCLogRedisContextAlloc(
void)
61 SCLogRedisContext* ctx = (SCLogRedisContext*)
SCCalloc(1,
sizeof(SCLogRedisContext));
63 FatalError(
"Unable to allocate redis context");
79 static int SCConfLogReopenAsyncRedis(
LogFileCtx *log_ctx);
80 #include <hiredis/adapters/libevent.h>
84 static SCLogRedisContext *SCLogRedisContextAsyncAlloc(
void)
86 SCLogRedisContext* ctx = (SCLogRedisContext*)
SCCalloc(1,
sizeof(SCLogRedisContext));
88 FatalError(
"Unable to allocate redis context");
107 static void SCRedisAsyncCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
109 redisReply *reply = r;
111 SCLogRedisContext *ctx = log_ctx->redis;
114 if (ctx->connected > 0)
115 SCLogInfo(
"Missing reply from redis, disconnected.");
119 event_base_loopbreak(ctx->ev_base);
129 static void SCRedisAsyncEchoCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
131 redisReply *reply = r;
132 SCLogRedisContext * ctx = privdata;
135 if (ctx->connected == 0) {
142 if (ctx->tried == 0) {
143 SCLogWarning(
"Failed to connect to Redis... (will keep trying)");
145 ctx->tried = time(NULL);
147 event_base_loopbreak(ctx->ev_base);
155 static void SCLogAsyncRedisSendEcho(SCLogRedisContext * ctx)
157 redisAsyncCommand(ctx->async, SCRedisAsyncEchoCommandCallback, ctx,
"ECHO suricata");
158 event_base_dispatch(ctx->ev_base);
167 static void SCRedisAsyncQuitCommandCallback(redisAsyncContext *ac,
void *r,
void *privdata)
177 static void SCLogAsyncRedisSendQuit(SCLogRedisContext * ctx)
179 if (ctx->connected) {
180 redisAsyncCommand(ctx->async, SCRedisAsyncQuitCommandCallback, ctx,
"QUIT");
181 SCLogInfo(
"QUIT Command sent to redis. Connection will terminate!");
184 redisAsyncFree(ctx->async);
185 event_base_dispatch(ctx->ev_base);
187 event_base_free(ctx->ev_base);
195 static int SCConfLogReopenAsyncRedis(
LogFileCtx *log_ctx)
197 SCLogRedisContext * ctx = log_ctx->redis;
198 const char *redis_server = log_ctx->redis_setup.server;
199 int redis_port = log_ctx->redis_setup.port;
202 if (ctx->tried >= time(NULL)) {
206 if (strchr(redis_server,
'/') == NULL) {
207 ctx->async = redisAsyncConnect(redis_server, redis_port);
209 ctx->async = redisAsyncConnectUnix(redis_server);
212 if (ctx->ev_base != NULL) {
213 event_base_free(ctx->ev_base);
217 if (ctx->async == NULL) {
219 ctx->tried = time(NULL);
223 if (ctx->async != NULL && ctx->async->err) {
224 SCLogError(
"Error setting to redis async: [%s].", ctx->async->errstr);
225 ctx->tried = time(NULL);
229 ctx->ev_base = event_base_new();
231 if (ctx->ev_base == NULL) {
232 ctx->tried = time(NULL);
233 redisAsyncFree(ctx->async);
238 redisLibeventAttach(ctx->async, ctx->ev_base);
240 log_ctx->redis = ctx;
241 log_ctx->
Close = SCLogFileCloseRedis;
250 static int SCLogRedisWriteAsync(
LogFileCtx *file_ctx,
const char *
string,
size_t string_len)
252 SCLogRedisContext *ctx = file_ctx->redis;
254 if (! ctx->connected) {
255 if (SCConfLogReopenAsyncRedis(file_ctx) == -1) {
258 if (ctx->tried == 0) {
261 SCLogAsyncRedisSendEcho(ctx);
264 if (!ctx->connected) {
268 if (ctx->async == NULL) {
272 redisAsyncCommand(ctx->async,
273 SCRedisAsyncCommandCallback,
276 file_ctx->redis_setup.command,
277 file_ctx->redis_setup.key,
280 event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
285 #endif// HAVE_LIBEVENT
290 static int SCConfLogReopenSyncRedis(
LogFileCtx *log_ctx)
292 SCLogRedisContext * ctx = log_ctx->redis;
295 if (ctx->tried >= time(NULL)) {
299 const char *redis_server = log_ctx->redis_setup.server;
300 int redis_port = log_ctx->redis_setup.port;
302 if (ctx->sync != NULL) {
303 redisFree(ctx->sync);
306 if (strchr(redis_server,
'/') == NULL) {
307 ctx->sync = redisConnect(redis_server, redis_port);
309 ctx->sync = redisConnectUnix(redis_server);
311 if (ctx->sync == NULL) {
312 SCLogError(
"Error connecting to redis server.");
313 ctx->tried = time(NULL);
316 if (ctx->sync->err) {
317 SCLogError(
"Error connecting to redis server: [%s].", ctx->sync->errstr);
318 redisFree(ctx->sync);
320 ctx->tried = time(NULL);
323 SCLogInfo(
"Connected to redis server [%s].", log_ctx->redis_setup.server);
325 log_ctx->redis = ctx;
326 log_ctx->
Close = SCLogFileCloseRedis;
333 static int SCLogRedisWriteSync(
LogFileCtx *file_ctx,
const char *
string)
335 SCLogRedisContext * ctx = file_ctx->redis;
337 redisContext *redis = ctx->sync;
339 SCConfLogReopenSyncRedis(file_ctx);
342 SCLogDebug(
"Redis after re-open is not available.");
348 if (file_ctx->redis_setup.batch_size) {
349 redisAppendCommand(redis,
"%s %s %s",
350 file_ctx->redis_setup.command,
351 file_ctx->redis_setup.key,
353 time_t now = time(NULL);
354 if ((ctx->batch_count == file_ctx->redis_setup.batch_size) || (ctx->last_push < now)) {
357 int batch_size = ctx->batch_count;
358 ctx->batch_count = 0;
359 ctx->last_push = now;
360 for (i = 0; i <= batch_size; i++) {
361 if (redisGetReply(redis, (
void **)&reply) == REDIS_OK) {
362 freeReplyObject(reply);
366 SCLogInfo(
"Error when fetching reply: %s (%d)",
370 switch (redis->err) {
373 SCLogInfo(
"Reopening connection to redis server");
374 SCConfLogReopenSyncRedis(file_ctx);
377 SCLogInfo(
"Reconnected to redis server");
378 redisAppendCommand(redis,
"%s %s %s",
379 file_ctx->redis_setup.command,
380 file_ctx->redis_setup.key,
385 SCLogInfo(
"Unable to reconnect to redis server");
399 redisReply *reply = redisCommand(redis,
"%s %s %s",
400 file_ctx->redis_setup.command,
401 file_ctx->redis_setup.key,
405 switch (reply->type) {
406 case REDIS_REPLY_ERROR:
408 SCConfLogReopenSyncRedis(file_ctx);
410 case REDIS_REPLY_INTEGER:
411 SCLogDebug(
"Redis integer %lld", reply->integer);
415 SCLogError(
"Redis default triggered with %d", reply->type);
416 SCConfLogReopenSyncRedis(file_ctx);
419 freeReplyObject(reply);
421 SCConfLogReopenSyncRedis(file_ctx);
435 int LogFileWriteRedis(
void *lf_ctx,
const char *
string,
size_t string_len)
438 if (file_ctx == NULL) {
444 if (file_ctx->redis_setup.is_async) {
445 return SCLogRedisWriteAsync(file_ctx,
string, string_len);
449 if (! file_ctx->redis_setup.is_async) {
450 return SCLogRedisWriteSync(file_ctx,
string);
460 int SCConfLogOpenRedis(
ConfNode *redis_node,
void *lf_ctx)
465 FatalError(
"redis does not support threaded output");
468 const char *redis_port = NULL;
469 const char *redis_mode = NULL;
482 if (!log_ctx->redis_setup.server) {
483 log_ctx->redis_setup.server = redis_default_server;
484 SCLogInfo(
"Using default redis server (127.0.0.1)");
490 if (!log_ctx->redis_setup.key) {
491 log_ctx->redis_setup.key = redis_default_key;
494 #ifndef HAVE_LIBEVENT
499 #endif //ifndef HAVE_LIBEVENT
501 log_ctx->redis_setup.is_async = is_async;
502 log_ctx->redis_setup.batch_size = 0;
510 if (ret && enabled) {
513 log_ctx->redis_setup.batch_size = val;
515 log_ctx->redis_setup.batch_size = 10;
520 log_ctx->redis_setup.batch_size = 0;
523 if (!strcmp(redis_mode,
"list") || !strcmp(redis_mode,
"lpush")) {
524 log_ctx->redis_setup.command = redis_lpush_cmd;
525 }
else if(!strcmp(redis_mode,
"rpush")){
526 log_ctx->redis_setup.command = redis_rpush_cmd;
527 }
else if(!strcmp(redis_mode,
"channel") || !strcmp(redis_mode,
"publish")) {
528 log_ctx->redis_setup.command = redis_publish_cmd;
534 if (!log_ctx->redis_setup.server) {
535 FatalError(
"Error allocating redis server string");
537 if (
StringParseUint16(&log_ctx->redis_setup.port, 10, 0, (
const char *)redis_port) < 0) {
538 FatalError(
"Invalid value for redis port: %s", redis_port);
540 log_ctx->
Close = SCLogFileCloseRedis;
544 log_ctx->redis = SCLogRedisContextAsyncAlloc();
548 log_ctx->redis = SCLogRedisContextAlloc();
549 SCConfLogReopenSyncRedis(log_ctx);
559 SCLogRedisContext * ctx = log_ctx->redis;
564 if (log_ctx->redis_setup.is_async) {
565 #if HAVE_LIBEVENT == 1
567 if (ctx->connected > 0) {
568 SCLogAsyncRedisSendQuit(ctx);
570 if (ctx->ev_base != NULL) {
571 event_base_free(ctx->ev_base);
579 if (!log_ctx->redis_setup.is_async) {
583 for (i = 0; i < ctx->batch_count; i++) {
584 redisGetReply(ctx->sync, (
void **)&reply);
586 freeReplyObject(reply);
589 redisFree(ctx->sync);
593 ctx->batch_count = 0;
601 #endif //#ifdef HAVE_LIBHIREDIS