suricata
log-flush.c
Go to the documentation of this file.
1 /* Copyright (C) 2024 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Jeff Lucovsky <jlucovsky@oisf.net>
22  */
23 
24 #include "suricata-common.h"
25 #include "suricata.h"
26 #include "detect.h"
27 #include "detect-engine.h"
28 #include "flow-worker.h"
29 #include "log-flush.h"
30 #include "tm-threads.h"
31 #include "conf.h"
32 #include "conf-yaml-loader.h"
33 #include "util-privs.h"
34 
35 /**
36  * \brief Trigger detect threads to flush their output logs
37  *
38  * This function is intended to be called at regular intervals to force
39  * buffered log data to be persisted
40  */
41 static void WorkerFlushLogs(void)
42 {
43  SCEnter();
44 
45  /* count detect threads in use */
46  uint32_t no_of_detect_tvs = TmThreadCountThreadsByTmmFlags(TM_FLAG_DETECT_TM);
47  /* can be zero in unix socket mode */
48  if (no_of_detect_tvs == 0) {
49  return;
50  }
51 
52  /* prepare swap structures */
53  void *fw_threads[no_of_detect_tvs];
54  ThreadVars *detect_tvs[no_of_detect_tvs];
55  memset(fw_threads, 0x00, (no_of_detect_tvs * sizeof(void *)));
56  memset(detect_tvs, 0x00, (no_of_detect_tvs * sizeof(ThreadVars *)));
57 
58  /* start by initiating the log flushes */
59 
60  uint32_t i = 0;
62  /* get reference to tv's and setup fw_threads array */
63  for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
64  if ((tv->tmm_flags & TM_FLAG_DETECT_TM) == 0) {
65  continue;
66  }
67  for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
68  TmModule *tm = TmModuleGetById(s->tm_id);
69  if (!(tm->flags & TM_FLAG_DETECT_TM)) {
70  continue;
71  }
72 
73  if (suricata_ctl_flags != 0) {
75  goto error;
76  }
77 
78  fw_threads[i] = FlowWorkerGetThreadData(SC_ATOMIC_GET(s->slot_data));
79  if (fw_threads[i]) {
80  FlowWorkerSetFlushAck(fw_threads[i]);
81  SCLogDebug("Setting flush-ack for thread %s[i=%d]", tv->printable_name, i);
82  detect_tvs[i] = tv;
83  }
84 
85  i++;
86  break;
87  }
88  }
89  BUG_ON(i != no_of_detect_tvs);
90 
92 
93  SCLogDebug("Creating flush pseudo packets for %d threads", no_of_detect_tvs);
94  InjectPacketsForFlush(detect_tvs, no_of_detect_tvs);
95 
96  uint32_t threads_done = 0;
97 retry:
98  for (i = 0; i < no_of_detect_tvs; i++) {
99  if (suricata_ctl_flags != 0) {
100  threads_done = no_of_detect_tvs;
101  break;
102  }
103  usleep(1000);
104  if (fw_threads[i] && FlowWorkerGetFlushAck(fw_threads[i])) {
105  SCLogDebug("thread slot %d has ack'd flush request", i);
106  threads_done++;
107  } else if (detect_tvs[i]) {
108  SCLogDebug("thread slot %d not yet ack'd flush request", i);
109  TmThreadsCaptureBreakLoop(detect_tvs[i]);
110  }
111  }
112  if (threads_done < no_of_detect_tvs) {
113  threads_done = 0;
114  SleepMsec(250);
115  goto retry;
116  }
117 
118 error:
119  return;
120 }
121 
122 static int OutputFlushInterval(void)
123 {
124  intmax_t output_flush_interval = 0;
125  if (ConfGetInt("heartbeat.output-flush-interval", &output_flush_interval) == 0) {
126  output_flush_interval = 0;
127  }
128  if (output_flush_interval < 0 || output_flush_interval > 60) {
129  SCLogConfig("flush_interval must be 0 or less than 60; using 0");
130  output_flush_interval = 0;
131  }
132 
133  return (int)output_flush_interval;
134 }
135 
136 static void *LogFlusherWakeupThread(void *arg)
137 {
138  int output_flush_interval = OutputFlushInterval();
139  /* This was checked by the logic creating this thread */
140  BUG_ON(output_flush_interval == 0);
141 
142  SCLogConfig("Using output-flush-interval of %d seconds", output_flush_interval);
143  /*
144  * Calculate the number of sleep intervals based on the output flush interval. This is necessary
145  * because this thread pauses a fixed amount of time to react to shutdown situations more
146  * quickly.
147  */
148  const int log_flush_sleep_time = 500; /* milliseconds */
149  const int flush_wait_count = (1000 * output_flush_interval) / log_flush_sleep_time;
150 
151  ThreadVars *tv_local = (ThreadVars *)arg;
152  SCSetThreadName(tv_local->name);
153 
154  if (tv_local->thread_setup_flags != 0)
155  TmThreadSetupOptions(tv_local);
156 
157  /* Set the threads capability */
158  tv_local->cap_flags = 0;
159  SCDropCaps(tv_local);
160 
162 
163  int wait_count = 0;
164  uint64_t worker_flush_count = 0;
165  bool run = TmThreadsWaitForUnpause(tv_local);
166  while (run) {
167  usleep(log_flush_sleep_time * 1000);
168 
169  if (++wait_count == flush_wait_count) {
170  worker_flush_count++;
171  WorkerFlushLogs();
172  wait_count = 0;
173  }
174 
175  if (TmThreadsCheckFlag(tv_local, THV_KILL)) {
176  break;
177  }
178  }
179 
181  TmThreadWaitForFlag(tv_local, THV_DEINIT);
182  TmThreadsSetFlag(tv_local, THV_CLOSED);
183  SCLogInfo("%s: initiated %" PRIu64 " flushes", tv_local->name, worker_flush_count);
184  return NULL;
185 }
186 
187 void LogFlushThreads(void)
188 {
189  if (0 == OutputFlushInterval()) {
190  SCLogConfig("log flusher thread not used with heartbeat.output-flush-interval of 0");
191  return;
192  }
193 
194  ThreadVars *tv_log_flush =
195  TmThreadCreateMgmtThread(thread_name_heartbeat, LogFlusherWakeupThread, 1);
196  if (!tv_log_flush || (TmThreadSpawn(tv_log_flush) != 0)) {
197  FatalError("Unable to create and start log flush thread");
198  }
199 }
tm-threads.h
ConfGetInt
int ConfGetInt(const char *name, intmax_t *val)
Retrieve a configuration value as an integer.
Definition: conf.c:399
log-flush.h
TmThreadSpawn
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1663
detect-engine.h
TmThreadSetupOptions
TmEcode TmThreadSetupOptions(ThreadVars *tv)
Set the thread options (cpu affinitythread). Priority should be already set by pthread_create.
Definition: tm-threads.c:854
ThreadVars_::name
char name[16]
Definition: threadvars.h:65
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:269
TmThreadsSetFlag
void TmThreadsSetFlag(ThreadVars *tv, uint32_t flag)
Set a thread flag.
Definition: tm-threads.c:101
TmThreadWaitForFlag
void TmThreadWaitForFlag(ThreadVars *tv, uint32_t flags)
Waits till the specified flag(s) is(are) set. We don't bother if the kill flag has been set or not on...
Definition: tm-threads.c:1760
TM_FLAG_DETECT_TM
#define TM_FLAG_DETECT_TM
Definition: tm-modules.h:35
THV_DEINIT
#define THV_DEINIT
Definition: threadvars.h:45
SCSetThreadName
#define SCSetThreadName(n)
Definition: threads.h:303
THV_RUNNING
#define THV_RUNNING
Definition: threadvars.h:55
TmThreadCountThreadsByTmmFlags
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags)
returns a count of all the threads that match the flag
Definition: tm-threads.c:2010
SCMutexLock
#define SCMutexLock(mut)
Definition: threads-debug.h:117
tv_root
ThreadVars * tv_root[TVT_MAX]
Definition: tm-threads.c:82
util-privs.h
SCDropCaps
#define SCDropCaps(...)
Definition: util-privs.h:89
ThreadVars_::cap_flags
uint8_t cap_flags
Definition: threadvars.h:81
THV_RUNNING_DONE
#define THV_RUNNING_DONE
Definition: threadvars.h:46
ThreadVars_::tmm_flags
uint8_t tmm_flags
Definition: threadvars.h:79
ThreadVars_::tm_slots
struct TmSlot_ * tm_slots
Definition: threadvars.h:96
SCMutexUnlock
#define SCMutexUnlock(mut)
Definition: threads-debug.h:119
flow-worker.h
SCEnter
#define SCEnter(...)
Definition: util-debug.h:271
TVT_PPT
@ TVT_PPT
Definition: tm-threads-common.h:87
detect.h
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
THV_KILL
#define THV_KILL
Definition: threadvars.h:40
ThreadVars_::next
struct ThreadVars_ * next
Definition: threadvars.h:125
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:309
tv_root_lock
SCMutex tv_root_lock
Definition: tm-threads.c:85
FlowWorkerGetFlushAck
bool FlowWorkerGetFlushAck(void *flow_worker)
Definition: flow-worker.c:742
conf-yaml-loader.h
conf.h
TmModuleGetById
TmModule * TmModuleGetById(int id)
Returns a TM Module by its id.
Definition: tm-modules.c:69
TmSlot_
Definition: tm-threads.h:53
ThreadVars_::thread_setup_flags
uint8_t thread_setup_flags
Definition: threadvars.h:69
TmThreadCreateMgmtThread
ThreadVars * TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int mucond)
Creates and returns the TV instance for a Management thread(MGMT). This function supports only custom...
Definition: tm-threads.c:1073
SCLogInfo
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:224
TmModule_
Definition: tm-modules.h:43
THV_INIT_DONE
#define THV_INIT_DONE
Definition: threadvars.h:37
SleepMsec
#define SleepMsec(msec)
Definition: tm-threads.h:45
LogFlushThreads
void LogFlushThreads(void)
Definition: log-flush.c:187
suricata-common.h
FlowWorkerGetThreadData
void * FlowWorkerGetThreadData(void *flow_worker)
Definition: flow-worker.c:737
thread_name_heartbeat
const char * thread_name_heartbeat
Definition: runmodes.c:76
TmThreadsWaitForUnpause
bool TmThreadsWaitForUnpause(ThreadVars *tv)
Wait for a thread to become unpaused.
Definition: tm-threads.c:364
FlowWorkerSetFlushAck
void FlowWorkerSetFlushAck(void *flow_worker)
Definition: flow-worker.c:748
FatalError
#define FatalError(...)
Definition: util-debug.h:502
ThreadVars_::printable_name
char * printable_name
Definition: threadvars.h:66
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
SCLogConfig
struct SCLogConfig_ SCLogConfig
Holds the config state used by the logging api.
InjectPacketsForFlush
void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs)
Definition: detect-engine.c:2329
suricata.h
TmSlot_::slot_next
struct TmSlot_ * slot_next
Definition: tm-threads.h:62
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
TmThreadsCheckFlag
int TmThreadsCheckFlag(ThreadVars *tv, uint32_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:93
THV_CLOSED
#define THV_CLOSED
Definition: threadvars.h:42
TmModule_::flags
uint8_t flags
Definition: tm-modules.h:76
suricata_ctl_flags
volatile uint8_t suricata_ctl_flags
Definition: suricata.c:169