suricata
flow-hash.c
Go to the documentation of this file.
1 /* Copyright (C) 2007-2024 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Victor Julien <victor@inliniac.net>
22  * \author Pablo Rincon Crespo <pablo.rincon.crespo@gmail.com>
23  *
24  * Flow Hashing functions.
25  */
26 
27 #include "suricata-common.h"
28 #include "threads.h"
29 
30 #include "decode.h"
31 #include "detect-engine-state.h"
32 
33 #include "flow.h"
34 #include "flow-hash.h"
35 #include "flow-util.h"
36 #include "flow-private.h"
37 #include "flow-manager.h"
38 #include "flow-storage.h"
39 #include "flow-timeout.h"
40 #include "flow-spare-pool.h"
41 #include "flow-callbacks.h"
42 #include "app-layer-parser.h"
43 
44 #include "util-time.h"
45 #include "util-debug.h"
46 #include "util-device-private.h"
47 
48 #include "util-hash-lookup3.h"
49 
50 #include "conf.h"
51 #include "output.h"
52 #include "output-flow.h"
53 #include "stream-tcp.h"
54 #include "util-exception-policy.h"
55 
57 
58 
59 FlowBucket *flow_hash;
60 SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx);
61 SC_ATOMIC_EXTERN(unsigned int, flow_flags);
62 
63 static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv, const SCTime_t ts);
64 
65 /** \brief compare two raw ipv6 addrs
66  *
67  * \note we don't care about the real ipv6 ip's, this is just
68  * to consistently fill the FlowHashKey6 struct, without all
69  * the SCNtohl calls.
70  *
71  * \warning do not use elsewhere unless you know what you're doing.
72  * detect-engine-address-ipv6.c's AddressIPv6GtU32 is likely
73  * what you are looking for.
74  */
75 static inline int FlowHashRawAddressIPv6GtU32(const uint32_t *a, const uint32_t *b)
76 {
77  for (uint8_t i = 0; i < 4; i++) {
78  if (a[i] > b[i])
79  return 1;
80  if (a[i] < b[i])
81  break;
82  }
83 
84  return 0;
85 }
86 
87 typedef struct FlowHashKey4_ {
88  union {
89  struct {
90  uint32_t addrs[2];
91  uint16_t ports[2];
92  uint8_t proto; /**< u8 so proto and recur and livedev add up to u32 */
93  uint8_t recur;
94  uint16_t livedev;
96  uint16_t pad[1];
97  };
98  const uint32_t u32[6];
99  };
101 
102 typedef struct FlowHashKey6_ {
103  union {
104  struct {
105  uint32_t src[4], dst[4];
106  uint16_t ports[2];
107  uint8_t proto; /**< u8 so proto and recur and livedev add up to u32 */
108  uint8_t recur;
109  uint16_t livedev;
111  uint16_t pad[1];
112  };
113  const uint32_t u32[12];
114  };
116 
117 uint32_t FlowGetIpPairProtoHash(const Packet *p)
118 {
119  uint32_t hash = 0;
120  if (PacketIsIPv4(p)) {
121  FlowHashKey4 fhk = {
122  .pad[0] = 0,
123  };
124 
125  int ai = (p->src.addr_data32[0] > p->dst.addr_data32[0]);
126  fhk.addrs[1 - ai] = p->src.addr_data32[0];
127  fhk.addrs[ai] = p->dst.addr_data32[0];
128 
129  fhk.ports[0] = 0xfedc;
130  fhk.ports[1] = 0xba98;
131 
132  fhk.proto = (uint8_t)p->proto;
133  /* g_recurlvl_mask sets the recursion_level to 0 if
134  * decoder.recursion-level.use-for-tracking is disabled.
135  */
136  fhk.recur = (uint8_t)p->recursion_level & g_recurlvl_mask;
137  /* g_vlan_mask sets the vlan_ids to 0 if vlan.use-for-tracking
138  * is disabled. */
139  fhk.vlan_id[0] = p->vlan_id[0] & g_vlan_mask;
140  fhk.vlan_id[1] = p->vlan_id[1] & g_vlan_mask;
141  fhk.vlan_id[2] = p->vlan_id[2] & g_vlan_mask;
142 
143  hash = hashword(fhk.u32, ARRAY_SIZE(fhk.u32), flow_config.hash_rand);
144  } else if (PacketIsIPv6(p)) {
145  FlowHashKey6 fhk = {
146  .pad[0] = 0,
147  };
148  if (FlowHashRawAddressIPv6GtU32(p->src.addr_data32, p->dst.addr_data32)) {
149  fhk.src[0] = p->src.addr_data32[0];
150  fhk.src[1] = p->src.addr_data32[1];
151  fhk.src[2] = p->src.addr_data32[2];
152  fhk.src[3] = p->src.addr_data32[3];
153  fhk.dst[0] = p->dst.addr_data32[0];
154  fhk.dst[1] = p->dst.addr_data32[1];
155  fhk.dst[2] = p->dst.addr_data32[2];
156  fhk.dst[3] = p->dst.addr_data32[3];
157  } else {
158  fhk.src[0] = p->dst.addr_data32[0];
159  fhk.src[1] = p->dst.addr_data32[1];
160  fhk.src[2] = p->dst.addr_data32[2];
161  fhk.src[3] = p->dst.addr_data32[3];
162  fhk.dst[0] = p->src.addr_data32[0];
163  fhk.dst[1] = p->src.addr_data32[1];
164  fhk.dst[2] = p->src.addr_data32[2];
165  fhk.dst[3] = p->src.addr_data32[3];
166  }
167 
168  fhk.ports[0] = 0xfedc;
169  fhk.ports[1] = 0xba98;
170  fhk.proto = (uint8_t)p->proto;
171  fhk.recur = (uint8_t)p->recursion_level & g_recurlvl_mask;
172  fhk.vlan_id[0] = p->vlan_id[0] & g_vlan_mask;
173  fhk.vlan_id[1] = p->vlan_id[1] & g_vlan_mask;
174  fhk.vlan_id[2] = p->vlan_id[2] & g_vlan_mask;
175 
176  hash = hashword(fhk.u32, ARRAY_SIZE(fhk.u32), flow_config.hash_rand);
177  }
178  return hash;
179 }
180 
181 /* calculate the hash key for this packet
182  *
183  * we're using:
184  * hash_rand -- set at init time
185  * source port
186  * destination port
187  * source address
188  * destination address
189  * recursion level -- for tunnels, make sure different tunnel layers can
190  * never get mixed up.
191  *
192  * For ICMP we only consider UNREACHABLE errors atm.
193  */
194 static inline uint32_t FlowGetHash(const Packet *p)
195 {
196  uint32_t hash = 0;
197 
198  if (PacketIsIPv4(p)) {
199  if (PacketIsTCP(p) || PacketIsUDP(p)) {
200  FlowHashKey4 fhk = { .pad[0] = 0 };
201 
202  int ai = (p->src.addr_data32[0] > p->dst.addr_data32[0]);
203  fhk.addrs[1-ai] = p->src.addr_data32[0];
204  fhk.addrs[ai] = p->dst.addr_data32[0];
205 
206  const int pi = (p->sp > p->dp);
207  fhk.ports[1-pi] = p->sp;
208  fhk.ports[pi] = p->dp;
209 
210  fhk.proto = p->proto;
211  /* g_recurlvl_mask sets the recursion_level to 0 if
212  * decoder.recursion-level.use-for-tracking is disabled.
213  */
215  /* g_livedev_mask sets the livedev ids to 0 if livedev.use-for-tracking
216  * is disabled. */
217  uint16_t devid = p->livedev ? p->livedev->id : 0;
218  fhk.livedev = devid & g_livedev_mask;
219  /* g_vlan_mask sets the vlan_ids to 0 if vlan.use-for-tracking
220  * is disabled. */
221  fhk.vlan_id[0] = p->vlan_id[0] & g_vlan_mask;
222  fhk.vlan_id[1] = p->vlan_id[1] & g_vlan_mask;
223  fhk.vlan_id[2] = p->vlan_id[2] & g_vlan_mask;
224 
225  hash = hashword(fhk.u32, ARRAY_SIZE(fhk.u32), flow_config.hash_rand);
226 
227  } else if (ICMPV4_DEST_UNREACH_IS_VALID(p)) {
228  uint32_t psrc = IPV4_GET_RAW_IPSRC_U32(PacketGetICMPv4EmbIPv4(p));
229  uint32_t pdst = IPV4_GET_RAW_IPDST_U32(PacketGetICMPv4EmbIPv4(p));
230  FlowHashKey4 fhk = { .pad[0] = 0 };
231 
232  const int ai = (psrc > pdst);
233  fhk.addrs[1-ai] = psrc;
234  fhk.addrs[ai] = pdst;
235 
236  const int pi = (p->l4.vars.icmpv4.emb_sport > p->l4.vars.icmpv4.emb_dport);
237  fhk.ports[1 - pi] = p->l4.vars.icmpv4.emb_sport;
238  fhk.ports[pi] = p->l4.vars.icmpv4.emb_dport;
239 
240  fhk.proto = ICMPV4_GET_EMB_PROTO(p);
242  uint16_t devid = p->livedev ? p->livedev->id : 0;
243  fhk.livedev = devid & g_livedev_mask;
244  fhk.vlan_id[0] = p->vlan_id[0] & g_vlan_mask;
245  fhk.vlan_id[1] = p->vlan_id[1] & g_vlan_mask;
246  fhk.vlan_id[2] = p->vlan_id[2] & g_vlan_mask;
247 
248  hash = hashword(fhk.u32, ARRAY_SIZE(fhk.u32), flow_config.hash_rand);
249 
250  } else {
251  FlowHashKey4 fhk = { .pad[0] = 0 };
252  const int ai = (p->src.addr_data32[0] > p->dst.addr_data32[0]);
253  fhk.addrs[1-ai] = p->src.addr_data32[0];
254  fhk.addrs[ai] = p->dst.addr_data32[0];
255  fhk.ports[0] = 0xfeed;
256  fhk.ports[1] = 0xbeef;
257  fhk.proto = p->proto;
259  uint16_t devid = p->livedev ? p->livedev->id : 0;
260  fhk.livedev = devid & g_livedev_mask;
261  fhk.vlan_id[0] = p->vlan_id[0] & g_vlan_mask;
262  fhk.vlan_id[1] = p->vlan_id[1] & g_vlan_mask;
263  fhk.vlan_id[2] = p->vlan_id[2] & g_vlan_mask;
264 
265  hash = hashword(fhk.u32, ARRAY_SIZE(fhk.u32), flow_config.hash_rand);
266  }
267  } else if (PacketIsIPv6(p)) {
268  FlowHashKey6 fhk = { .pad[0] = 0 };
269  if (FlowHashRawAddressIPv6GtU32(p->src.addr_data32, p->dst.addr_data32)) {
270  fhk.src[0] = p->src.addr_data32[0];
271  fhk.src[1] = p->src.addr_data32[1];
272  fhk.src[2] = p->src.addr_data32[2];
273  fhk.src[3] = p->src.addr_data32[3];
274  fhk.dst[0] = p->dst.addr_data32[0];
275  fhk.dst[1] = p->dst.addr_data32[1];
276  fhk.dst[2] = p->dst.addr_data32[2];
277  fhk.dst[3] = p->dst.addr_data32[3];
278  } else {
279  fhk.src[0] = p->dst.addr_data32[0];
280  fhk.src[1] = p->dst.addr_data32[1];
281  fhk.src[2] = p->dst.addr_data32[2];
282  fhk.src[3] = p->dst.addr_data32[3];
283  fhk.dst[0] = p->src.addr_data32[0];
284  fhk.dst[1] = p->src.addr_data32[1];
285  fhk.dst[2] = p->src.addr_data32[2];
286  fhk.dst[3] = p->src.addr_data32[3];
287  }
288 
289  const int pi = (p->sp > p->dp);
290  fhk.ports[1-pi] = p->sp;
291  fhk.ports[pi] = p->dp;
292  fhk.proto = p->proto;
294  uint16_t devid = p->livedev ? p->livedev->id : 0;
295  fhk.livedev = devid & g_livedev_mask;
296  fhk.vlan_id[0] = p->vlan_id[0] & g_vlan_mask;
297  fhk.vlan_id[1] = p->vlan_id[1] & g_vlan_mask;
298  fhk.vlan_id[2] = p->vlan_id[2] & g_vlan_mask;
299 
300  hash = hashword(fhk.u32, ARRAY_SIZE(fhk.u32), flow_config.hash_rand);
301  }
302 
303  return hash;
304 }
305 
306 /**
307  * Basic hashing function for FlowKey
308  *
309  * \note Function only used for bypass and TCP or UDP flows
310  *
311  * \note this is only used at start to create Flow from pinned maps
312  * so fairness is not an issue
313  */
314 uint32_t FlowKeyGetHash(FlowKey *fk)
315 {
316  uint32_t hash = 0;
317 
318  if (fk->src.family == AF_INET) {
319  FlowHashKey4 fhk = {
320  .pad[0] = 0,
321  };
322  int ai = (fk->src.address.address_un_data32[0] > fk->dst.address.address_un_data32[0]);
323  fhk.addrs[1-ai] = fk->src.address.address_un_data32[0];
324  fhk.addrs[ai] = fk->dst.address.address_un_data32[0];
325 
326  const int pi = (fk->sp > fk->dp);
327  fhk.ports[1-pi] = fk->sp;
328  fhk.ports[pi] = fk->dp;
329 
330  fhk.proto = fk->proto;
332  fhk.livedev = fk->livedev_id & g_livedev_mask;
333  fhk.vlan_id[0] = fk->vlan_id[0] & g_vlan_mask;
334  fhk.vlan_id[1] = fk->vlan_id[1] & g_vlan_mask;
335  fhk.vlan_id[2] = fk->vlan_id[2] & g_vlan_mask;
336 
337  hash = hashword(fhk.u32, ARRAY_SIZE(fhk.u32), flow_config.hash_rand);
338  } else {
339  FlowHashKey6 fhk = {
340  .pad[0] = 0,
341  };
342  if (FlowHashRawAddressIPv6GtU32(fk->src.address.address_un_data32,
344  fhk.src[0] = fk->src.address.address_un_data32[0];
345  fhk.src[1] = fk->src.address.address_un_data32[1];
346  fhk.src[2] = fk->src.address.address_un_data32[2];
347  fhk.src[3] = fk->src.address.address_un_data32[3];
348  fhk.dst[0] = fk->dst.address.address_un_data32[0];
349  fhk.dst[1] = fk->dst.address.address_un_data32[1];
350  fhk.dst[2] = fk->dst.address.address_un_data32[2];
351  fhk.dst[3] = fk->dst.address.address_un_data32[3];
352  } else {
353  fhk.src[0] = fk->dst.address.address_un_data32[0];
354  fhk.src[1] = fk->dst.address.address_un_data32[1];
355  fhk.src[2] = fk->dst.address.address_un_data32[2];
356  fhk.src[3] = fk->dst.address.address_un_data32[3];
357  fhk.dst[0] = fk->src.address.address_un_data32[0];
358  fhk.dst[1] = fk->src.address.address_un_data32[1];
359  fhk.dst[2] = fk->src.address.address_un_data32[2];
360  fhk.dst[3] = fk->src.address.address_un_data32[3];
361  }
362 
363  const int pi = (fk->sp > fk->dp);
364  fhk.ports[1-pi] = fk->sp;
365  fhk.ports[pi] = fk->dp;
366  fhk.proto = fk->proto;
368  fhk.livedev = fk->livedev_id & g_livedev_mask;
369  fhk.vlan_id[0] = fk->vlan_id[0] & g_vlan_mask;
370  fhk.vlan_id[1] = fk->vlan_id[1] & g_vlan_mask;
371  fhk.vlan_id[2] = fk->vlan_id[2] & g_vlan_mask;
372 
373  hash = hashword(fhk.u32, ARRAY_SIZE(fhk.u32), flow_config.hash_rand);
374  }
375  return hash;
376 }
377 
378 static inline bool CmpAddrs(const uint32_t addr1[4], const uint32_t addr2[4])
379 {
380  return addr1[0] == addr2[0] && addr1[1] == addr2[1] &&
381  addr1[2] == addr2[2] && addr1[3] == addr2[3];
382 }
383 
384 static inline bool CmpAddrsAndPorts(const uint32_t src1[4],
385  const uint32_t dst1[4], Port src_port1, Port dst_port1,
386  const uint32_t src2[4], const uint32_t dst2[4], Port src_port2,
387  Port dst_port2)
388 {
389  /* Compare the source and destination addresses. If they are not equal,
390  * compare the first source address with the second destination address,
391  * and vice versa. Likewise for ports. */
392  return (CmpAddrs(src1, src2) && CmpAddrs(dst1, dst2) &&
393  src_port1 == src_port2 && dst_port1 == dst_port2) ||
394  (CmpAddrs(src1, dst2) && CmpAddrs(dst1, src2) &&
395  src_port1 == dst_port2 && dst_port1 == src_port2);
396 }
397 
398 static inline bool CmpVlanIds(
399  const uint16_t vlan_id1[VLAN_MAX_LAYERS], const uint16_t vlan_id2[VLAN_MAX_LAYERS])
400 {
401  return ((vlan_id1[0] ^ vlan_id2[0]) & g_vlan_mask) == 0 &&
402  ((vlan_id1[1] ^ vlan_id2[1]) & g_vlan_mask) == 0 &&
403  ((vlan_id1[2] ^ vlan_id2[2]) & g_vlan_mask) == 0;
404 }
405 
406 static inline bool CmpLiveDevIds(const LiveDevice *livedev, const uint16_t id)
407 {
408  uint16_t devid = livedev ? livedev->id : 0;
409  return (((devid ^ id) & g_livedev_mask) == 0);
410 }
411 
412 /* Since two or more flows can have the same hash key, we need to compare
413  * the flow with the current packet or flow key. */
414 static inline bool CmpFlowPacket(const Flow *f, const Packet *p)
415 {
416  const uint32_t *f_src = f->src.address.address_un_data32;
417  const uint32_t *f_dst = f->dst.address.address_un_data32;
418  const uint32_t *p_src = p->src.address.address_un_data32;
419  const uint32_t *p_dst = p->dst.address.address_un_data32;
420  return CmpAddrsAndPorts(f_src, f_dst, f->sp, f->dp, p_src, p_dst, p->sp, p->dp) &&
421  f->proto == p->proto &&
422  (f->recursion_level == p->recursion_level || g_recurlvl_mask == 0) &&
423  CmpVlanIds(f->vlan_id, p->vlan_id) && (f->livedev == p->livedev || g_livedev_mask == 0);
424 }
425 
426 static inline bool CmpFlowKey(const Flow *f, const FlowKey *k)
427 {
428  const uint32_t *f_src = f->src.address.address_un_data32;
429  const uint32_t *f_dst = f->dst.address.address_un_data32;
430  const uint32_t *k_src = k->src.address.address_un_data32;
431  const uint32_t *k_dst = k->dst.address.address_un_data32;
432  return CmpAddrsAndPorts(f_src, f_dst, f->sp, f->dp, k_src, k_dst, k->sp, k->dp) &&
433  f->proto == k->proto &&
434  (f->recursion_level == k->recursion_level || g_recurlvl_mask == 0) &&
435  CmpVlanIds(f->vlan_id, k->vlan_id) && CmpLiveDevIds(f->livedev, k->livedev_id);
436 }
437 
438 static inline bool CmpAddrsAndICMPTypes(const uint32_t src1[4],
439  const uint32_t dst1[4], uint8_t icmp_s_type1, uint8_t icmp_d_type1,
440  const uint32_t src2[4], const uint32_t dst2[4], uint8_t icmp_s_type2,
441  uint8_t icmp_d_type2)
442 {
443  /* Compare the source and destination addresses. If they are not equal,
444  * compare the first source address with the second destination address,
445  * and vice versa. Likewise for icmp types. */
446  return (CmpAddrs(src1, src2) && CmpAddrs(dst1, dst2) &&
447  icmp_s_type1 == icmp_s_type2 && icmp_d_type1 == icmp_d_type2) ||
448  (CmpAddrs(src1, dst2) && CmpAddrs(dst1, src2) &&
449  icmp_s_type1 == icmp_d_type2 && icmp_d_type1 == icmp_s_type2);
450 }
451 
452 static inline bool CmpFlowICMPPacket(const Flow *f, const Packet *p)
453 {
454  const uint32_t *f_src = f->src.address.address_un_data32;
455  const uint32_t *f_dst = f->dst.address.address_un_data32;
456  const uint32_t *p_src = p->src.address.address_un_data32;
457  const uint32_t *p_dst = p->dst.address.address_un_data32;
458  return CmpAddrsAndICMPTypes(f_src, f_dst, f->icmp_s.type, f->icmp_d.type, p_src, p_dst,
459  p->icmp_s.type, p->icmp_d.type) &&
460  f->proto == p->proto &&
461  (f->recursion_level == p->recursion_level || g_recurlvl_mask == 0) &&
462  CmpVlanIds(f->vlan_id, p->vlan_id) && (f->livedev == p->livedev || g_livedev_mask == 0);
463 }
464 
465 /**
466  * \brief See if a ICMP packet belongs to a flow by comparing the embedded
467  * packet in the ICMP error packet to the flow.
468  *
469  * \param f flow
470  * \param p ICMP packet
471  *
472  * \retval 1 match
473  * \retval 0 no match
474  */
475 static inline int FlowCompareICMPv4(Flow *f, const Packet *p)
476 {
478  /* first check the direction of the flow, in other words, the client ->
479  * server direction as it's most likely the ICMP error will be a
480  * response to the clients traffic */
481  if ((f->src.addr_data32[0] == IPV4_GET_RAW_IPSRC_U32(PacketGetICMPv4EmbIPv4(p))) &&
482  (f->dst.addr_data32[0] == IPV4_GET_RAW_IPDST_U32(PacketGetICMPv4EmbIPv4(p))) &&
483  f->sp == p->l4.vars.icmpv4.emb_sport && f->dp == p->l4.vars.icmpv4.emb_dport &&
484  f->proto == ICMPV4_GET_EMB_PROTO(p) &&
485  (f->recursion_level == p->recursion_level || g_recurlvl_mask == 0) &&
486  CmpVlanIds(f->vlan_id, p->vlan_id) &&
487  (f->livedev == p->livedev || g_livedev_mask == 0)) {
488  return 1;
489 
490  /* check the less likely case where the ICMP error was a response to
491  * a packet from the server. */
492  } else if ((f->dst.addr_data32[0] == IPV4_GET_RAW_IPSRC_U32(PacketGetICMPv4EmbIPv4(p))) &&
493  (f->src.addr_data32[0] == IPV4_GET_RAW_IPDST_U32(PacketGetICMPv4EmbIPv4(p))) &&
494  f->dp == p->l4.vars.icmpv4.emb_sport && f->sp == p->l4.vars.icmpv4.emb_dport &&
495  f->proto == ICMPV4_GET_EMB_PROTO(p) &&
496  (f->recursion_level == p->recursion_level || g_recurlvl_mask == 0) &&
497  CmpVlanIds(f->vlan_id, p->vlan_id) &&
498  (f->livedev == p->livedev || g_livedev_mask == 0)) {
499  return 1;
500  }
501 
502  /* no match, fall through */
503  } else {
504  /* just treat ICMP as a normal proto for now */
505  return CmpFlowICMPPacket(f, p);
506  }
507 
508  return 0;
509 }
510 
511 /**
512  * \brief See if a IP-ESP packet belongs to a flow by comparing the SPI
513  *
514  * \param f flow
515  * \param p ESP packet
516  *
517  * \retval 1 match
518  * \retval 0 no match
519  */
520 static inline int FlowCompareESP(Flow *f, const Packet *p)
521 {
522  const uint32_t *f_src = f->src.address.address_un_data32;
523  const uint32_t *f_dst = f->dst.address.address_un_data32;
524  const uint32_t *p_src = p->src.address.address_un_data32;
525  const uint32_t *p_dst = p->dst.address.address_un_data32;
526 
527  return CmpAddrs(f_src, p_src) && CmpAddrs(f_dst, p_dst) && f->proto == p->proto &&
528  (f->recursion_level == p->recursion_level || g_recurlvl_mask == 0) &&
529  CmpVlanIds(f->vlan_id, p->vlan_id) && f->esp.spi == ESP_GET_SPI(PacketGetESP(p)) &&
530  (f->livedev == p->livedev || g_livedev_mask == 0);
531 }
532 
534 {
535  p->flags |= PKT_WANTS_FLOW;
536  p->flow_hash = FlowGetHash(p);
537 }
538 
539 static inline int FlowCompare(Flow *f, const Packet *p)
540 {
541  if (p->proto == IPPROTO_ICMP) {
542  return FlowCompareICMPv4(f, p);
543  } else if (PacketIsESP(p)) {
544  return FlowCompareESP(f, p);
545  }
546  return CmpFlowPacket(f, p);
547 }
548 
549 /**
550  * \brief Check if we should create a flow based on a packet
551  *
552  * We use this check to filter out flow creation based on:
553  * - ICMP error messages
554  * - TCP flags (emergency mode only)
555  *
556  * \param p packet
557  * \retval true
558  * \retval false
559  */
560 static inline bool FlowCreateCheck(const Packet *p, const bool emerg)
561 {
562  /* if we're in emergency mode, don't try to create a flow for a TCP
563  * that is not a TCP SYN packet. */
564  if (emerg) {
565  if (PacketIsTCP(p)) {
566  const TCPHdr *tcph = PacketGetTCP(p);
567  if (((tcph->th_flags & (TH_SYN | TH_ACK | TH_RST | TH_FIN)) == TH_SYN) ||
569  ;
570  } else {
571  return false;
572  }
573  }
574  }
575 
576  if (PacketIsICMPv4(p)) {
577  if (ICMPV4_IS_ERROR_MSG(p->icmp_s.type)) {
578  return false;
579  }
580  }
581 
582  return true;
583 }
584 
585 static inline void FlowUpdateCounter(ThreadVars *tv, DecodeThreadVars *dtv,
586  uint8_t proto)
587 {
588 #ifdef UNITTESTS
589  if (tv && dtv) {
590 #endif
593  switch (proto){
594  case IPPROTO_UDP:
596  break;
597  case IPPROTO_TCP:
599  break;
600  case IPPROTO_ICMP:
602  break;
603  case IPPROTO_ICMPV6:
605  break;
606  }
607 #ifdef UNITTESTS
608  }
609 #endif
610 }
611 
612 /** \internal
613  * \brief try to fetch a new set of flows from the master flow pool.
614  *
615  * If in emergency mode, do this only once a second at max to avoid trying
616  * to synchronise per packet in the worse case. */
617 static inline Flow *FlowSpareSync(ThreadVars *tv, FlowLookupStruct *fls,
618  const Packet *p, const bool emerg)
619 {
620  Flow *f = NULL;
621  bool spare_sync = false;
622  if (emerg) {
623  if ((uint32_t)SCTIME_SECS(p->ts) > fls->emerg_spare_sync_stamp) {
624  fls->spare_queue = FlowSpareGetFromPool(); /* local empty, (re)populate and try again */
625  spare_sync = true;
627  if (f == NULL) {
628  /* wait till next full sec before retrying */
629  fls->emerg_spare_sync_stamp = (uint32_t)SCTIME_SECS(p->ts);
630  }
631  }
632  } else {
633  fls->spare_queue = FlowSpareGetFromPool(); /* local empty, (re)populate and try again */
635  spare_sync = true;
636  }
637 #ifdef UNITTESTS
638  if (tv && fls->dtv) {
639 #endif
640  if (spare_sync) {
641  if (f != NULL) {
643  if (fls->spare_queue.len < 99) {
644  /* When a new flow pool is fetched it has 100 flows in sync,
645  * so there should be 99 left if we're in full sync.
646  * If len is below 99, means the spare sync is incomplete */
647  /* Track these instances */
649  }
650  } else if (fls->spare_queue.len == 0) {
652  }
654  }
655 #ifdef UNITTESTS
656  }
657 #endif
658  return f;
659 }
660 
661 static void FlowExceptionPolicyStatsIncr(
662  ThreadVars *tv, FlowLookupStruct *fls, enum ExceptionPolicy policy)
663 {
664 #ifdef UNITTESTS
665  if (tv == NULL || fls->dtv == NULL) {
666  return;
667  }
668 #endif
669  uint16_t id = fls->dtv->counter_flow_memcap_eps.eps_id[policy];
670  if (likely(id > 0)) {
671  StatsIncr(tv, id);
672  }
673 }
674 
675 static inline void NoFlowHandleIPS(ThreadVars *tv, FlowLookupStruct *fls, Packet *p)
676 {
678  FlowExceptionPolicyStatsIncr(tv, fls, flow_config.memcap_policy);
679 }
680 
681 /**
682  * \brief Get a new flow
683  *
684  * Get a new flow. We're checking memcap first and will try to make room
685  * if the memcap is reached.
686  *
687  * \param tv thread vars
688  * \param fls lookup support vars
689  *
690  * \retval f *LOCKED* flow on success, NULL on error or if we should not create
691  * a new flow.
692  */
693 static Flow *FlowGetNew(ThreadVars *tv, FlowLookupStruct *fls, Packet *p)
694 {
695  const bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);
696 #ifdef DEBUG
697  if (g_eps_flow_memcap != UINT64_MAX && g_eps_flow_memcap == p->pcap_cnt) {
698  NoFlowHandleIPS(tv, fls, p);
700  return NULL;
701  }
702 #endif
703  if (!FlowCreateCheck(p, emerg)) {
704  return NULL;
705  }
706 
707  /* get a flow from the spare queue */
709  if (f == NULL) {
710  f = FlowSpareSync(tv, fls, p, emerg);
711  }
712  if (f == NULL) {
713  /* If we reached the max memcap, we get a used flow */
714  if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) {
715  /* declare state of emergency */
716  if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) {
717  SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY);
720  }
721 
722  f = FlowGetUsedFlow(tv, fls->dtv, p->ts);
723  if (f == NULL) {
724  NoFlowHandleIPS(tv, fls, p);
725 #ifdef UNITTESTS
726  if (tv != NULL && fls->dtv != NULL) {
727 #endif
729 #ifdef UNITTESTS
730  }
731 #endif
732  return NULL;
733  }
734 #ifdef UNITTESTS
735  if (tv != NULL && fls->dtv != NULL) {
736 #endif
738 #ifdef UNITTESTS
739  }
740 #endif
741  /* flow is still locked from FlowGetUsedFlow() */
742  FlowUpdateCounter(tv, fls->dtv, p->proto);
743  return f;
744  }
745 
746  /* now see if we can alloc a new flow */
747  f = FlowAlloc();
748  if (f == NULL) {
749 #ifdef UNITTESTS
750  if (tv != NULL && fls->dtv != NULL) {
751 #endif
753 #ifdef UNITTESTS
754  }
755 #endif
756  NoFlowHandleIPS(tv, fls, p);
757  return NULL;
758  }
759 
760  /* flow is initialized but *unlocked* */
761  } else {
762  /* flow has been recycled before it went into the spare queue */
763 
764  /* flow is initialized (recycled) but *unlocked* */
765  }
766 
767  FLOWLOCK_WRLOCK(f);
768  FlowUpdateCounter(tv, fls->dtv, p->proto);
769  return f;
770 }
771 
772 static Flow *TcpReuseReplace(ThreadVars *tv, FlowLookupStruct *fls, FlowBucket *fb, Flow *old_f,
773  const uint32_t hash, Packet *p)
774 {
775 #ifdef UNITTESTS
776  if (tv != NULL && fls->dtv != NULL) {
777 #endif
779 #ifdef UNITTESTS
780  }
781 #endif
782  /* get some settings that we move over to the new flow */
783  FlowThreadId thread_id[2] = { old_f->thread_id[0], old_f->thread_id[1] };
785 
786  /* flow is unlocked by caller */
787 
788  /* Get a new flow. It will be either a locked flow or NULL */
789  Flow *f = FlowGetNew(tv, fls, p);
790  if (f == NULL) {
791  return NULL;
792  }
793 
794  /* put at the start of the list */
795  f->next = fb->head;
796  fb->head = f;
797 
798  /* initialize and return */
799  FlowInit(tv, f, p);
800  f->flow_hash = hash;
801  f->fb = fb;
803 
804  f->thread_id[0] = thread_id[0];
805  f->thread_id[1] = thread_id[1];
806 
808  return f;
809 }
810 
811 static inline bool FlowBelongsToUs(const ThreadVars *tv, const Flow *f)
812 {
813 #ifdef UNITTESTS
814  if (RunmodeIsUnittests()) {
815  return true;
816  }
817 #endif
818  return f->thread_id[0] == tv->id;
819 }
820 
821 static inline void MoveToWorkQueue(ThreadVars *tv, FlowLookupStruct *fls,
822  FlowBucket *fb, Flow *f, Flow *prev_f)
823 {
825 
826  /* remove from hash... */
827  if (prev_f) {
828  prev_f->next = f->next;
829  }
830  if (f == fb->head) {
831  fb->head = f->next;
832  }
833 
834  if (f->proto != IPPROTO_TCP || FlowBelongsToUs(tv, f)) { // TODO thread_id[] direction
835  f->fb = NULL;
836  f->next = NULL;
838  } else {
839  /* implied: TCP but our thread does not own it. So set it
840  * aside for the Flow Manager to pick it up. */
841  f->next = fb->evicted;
842  fb->evicted = f;
843  if (SC_ATOMIC_GET(f->fb->next_ts) != 0) {
844  SC_ATOMIC_SET(f->fb->next_ts, 0);
845  }
846  }
847 }
848 
849 static inline bool FlowIsTimedOut(
850  const FlowThreadId ftid, const Flow *f, const SCTime_t pktts, const bool emerg)
851 {
852  SCTime_t timesout_at;
853  if (emerg) {
855  timesout_at = SCTIME_ADD_SECS(f->lastts,
856  FlowGetFlowTimeoutDirect(flow_timeouts_emerg, f->flow_state, f->protomap));
857  } else {
858  timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy);
859  }
860  /* if time is live, we just use the pktts */
861  if (TimeModeIsLive() || ftid == f->thread_id[0] || f->thread_id[0] == 0) {
862  if (SCTIME_CMP_LT(pktts, timesout_at)) {
863  return false;
864  }
865  } else {
866  SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]);
867  /* do the timeout check */
868  if (SCTIME_CMP_LT(checkts, timesout_at)) {
869  return false;
870  }
871  }
872  return true;
873 }
874 
875 static inline uint16_t GetTvId(const ThreadVars *tv)
876 {
877  uint16_t tv_id;
878 #ifdef UNITTESTS
879  if (RunmodeIsUnittests()) {
880  tv_id = 0;
881  } else {
882  tv_id = (uint16_t)tv->id;
883  }
884 #else
885  tv_id = (uint16_t)tv->id;
886 #endif
887  return tv_id;
888 }
889 
890 /** \brief Get Flow for packet
891  *
892  * Hash retrieval function for flows. Looks up the hash bucket containing the
893  * flow pointer. Then compares the packet with the found flow to see if it is
894  * the flow we need. If it isn't, walk the list until the right flow is found.
895  *
896  * If the flow is not found or the bucket was empty, a new flow is taken from
897  * the spare pool. The pool will alloc new flows as long as we stay within our
898  * memcap limit.
899  *
900  * The p->flow pointer is updated to point to the flow.
901  *
902  * \param tv thread vars
903  * \param dtv decode thread vars (for flow log api thread data)
904  *
905  * \retval f *LOCKED* flow or NULL
906  */
908 {
909  Flow *f = NULL;
910 
911  /* get our hash bucket and lock it */
912  const uint32_t hash = p->flow_hash;
913  FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
914  FBLOCK_LOCK(fb);
915 
916  SCLogDebug("fb %p fb->head %p", fb, fb->head);
917 
918  /* see if the bucket already has a flow */
919  if (fb->head == NULL) {
920  f = FlowGetNew(tv, fls, p);
921  if (f == NULL) {
922  FBLOCK_UNLOCK(fb);
923  return NULL;
924  }
925 
926  /* flow is locked */
927  fb->head = f;
928 
929  /* got one, now lock, initialize and return */
930  FlowInit(tv, f, p);
931  f->flow_hash = hash;
932  f->fb = fb;
934 
935  FlowReference(dest, f);
936 
937  FBLOCK_UNLOCK(fb);
938  return f;
939  }
940 
941  const uint16_t tv_id = GetTvId(tv);
942  const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0;
943  const uint32_t fb_nextts = !emerg ? SC_ATOMIC_GET(fb->next_ts) : 0;
944  const bool timeout_check = (fb_nextts <= (uint32_t)SCTIME_SECS(p->ts));
945  /* ok, we have a flow in the bucket. Let's find out if it is our flow */
946  Flow *prev_f = NULL; /* previous flow */
947  f = fb->head;
948  do {
949  Flow *next_f = NULL;
950  const bool our_flow = FlowCompare(f, p) != 0;
951  if (our_flow || timeout_check) {
952  FLOWLOCK_WRLOCK(f);
953  const bool timedout = (timeout_check && FlowIsTimedOut(tv_id, f, p->ts, emerg));
954  if (timedout) {
955  next_f = f->next;
956  MoveToWorkQueue(tv, fls, fb, f, prev_f);
957  FLOWLOCK_UNLOCK(f);
958  goto flow_removed;
959  } else if (our_flow) {
960  /* found a matching flow that is not timed out */
961  if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx))) {
962  Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p);
963  if (prev_f == NULL) /* if we have no prev it means new_f is now our prev */
964  prev_f = new_f;
965  MoveToWorkQueue(tv, fls, fb, f, prev_f); /* evict old flow */
966  FLOWLOCK_UNLOCK(f); /* unlock old replaced flow */
967 
968  if (new_f == NULL) {
969  FBLOCK_UNLOCK(fb);
970  return NULL;
971  }
972  f = new_f;
973  }
974  FlowReference(dest, f);
975  FBLOCK_UNLOCK(fb);
976  return f; /* return w/o releasing flow lock */
977  } else {
978  FLOWLOCK_UNLOCK(f);
979  }
980  }
981  /* unless we removed 'f', prev_f needs to point to
982  * current 'f' when adding a new flow below. */
983  prev_f = f;
984  next_f = f->next;
985 
986 flow_removed:
987  if (next_f == NULL) {
988  f = FlowGetNew(tv, fls, p);
989  if (f == NULL) {
990  FBLOCK_UNLOCK(fb);
991  return NULL;
992  }
993 
994  /* flow is locked */
995 
996  f->next = fb->head;
997  fb->head = f;
998 
999  /* initialize and return */
1000  FlowInit(tv, f, p);
1001  f->flow_hash = hash;
1002  f->fb = fb;
1004  FlowReference(dest, f);
1005  FBLOCK_UNLOCK(fb);
1006  return f;
1007  }
1008  f = next_f;
1009  } while (f != NULL);
1010 
1011  /* should be unreachable */
1012  BUG_ON(1);
1013  return NULL;
1014 }
1015 
1016 /** \internal
1017  * \retval true if flow matches key
1018  * \retval false if flow does not match key, or unsupported protocol
1019  * \note only supports TCP & UDP
1020  */
1021 static inline bool FlowCompareKey(Flow *f, FlowKey *key)
1022 {
1023  if ((f->proto != IPPROTO_TCP) && (f->proto != IPPROTO_UDP))
1024  return false;
1025  return CmpFlowKey(f, key);
1026 }
1027 
1028 /** \brief Look for existing Flow using a flow id value
1029  *
1030  * Hash retrieval function for flows. Looks up the hash bucket containing the
1031  * flow pointer. Then compares the flow_id with the found flow's flow_id to see
1032  * if it is the flow we need.
1033  *
1034  * \param flow_id Flow ID of the flow to look for
1035  * \retval f *LOCKED* flow or NULL
1036  */
1038 {
1039  uint32_t hash = flow_id & 0x0000FFFF;
1040  FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
1041  FBLOCK_LOCK(fb);
1042  SCLogDebug("fb %p fb->head %p", fb, fb->head);
1043 
1044  for (Flow *f = fb->head; f != NULL; f = f->next) {
1045  if (FlowGetId(f) == flow_id) {
1046  /* found our flow, lock & return */
1047  FLOWLOCK_WRLOCK(f);
1048  FBLOCK_UNLOCK(fb);
1049  return f;
1050  }
1051  }
1052  FBLOCK_UNLOCK(fb);
1053  return NULL;
1054 }
1055 
1056 /** \brief Look for existing Flow using a FlowKey
1057  *
1058  * Hash retrieval function for flows. Looks up the hash bucket containing the
1059  * flow pointer. Then compares the key with the found flow to see if it is
1060  * the flow we need. If it isn't, walk the list until the right flow is found.
1061  *
1062  * \param key Pointer to FlowKey build using flow to look for
1063  * \param hash Value of the flow hash
1064  * \retval f *LOCKED* flow or NULL
1065  */
1066 static Flow *FlowGetExistingFlowFromHash(FlowKey *key, const uint32_t hash)
1067 {
1068  /* get our hash bucket and lock it */
1069  FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
1070  FBLOCK_LOCK(fb);
1071  SCLogDebug("fb %p fb->head %p", fb, fb->head);
1072 
1073  for (Flow *f = fb->head; f != NULL; f = f->next) {
1074  /* see if this is the flow we are looking for */
1075  if (FlowCompareKey(f, key)) {
1076  /* found our flow, lock & return */
1077  FLOWLOCK_WRLOCK(f);
1078  FBLOCK_UNLOCK(fb);
1079  return f;
1080  }
1081  }
1082 
1083  FBLOCK_UNLOCK(fb);
1084  return NULL;
1085 }
1086 
1087 /** \brief Get or create a Flow using a FlowKey
1088  *
1089  * Hash retrieval function for flows. Looks up the hash bucket containing the
1090  * flow pointer. Then compares the packet with the found flow to see if it is
1091  * the flow we need. If it isn't, walk the list until the right flow is found.
1092  * Return a new Flow if ever no Flow was found.
1093  *
1094  *
1095  * \param key Pointer to FlowKey build using flow to look for
1096  * \param ttime time to use for flow creation
1097  * \param hash Value of the flow hash
1098  * \retval f *LOCKED* flow or NULL
1099  */
1100 
1101 Flow *FlowGetFromFlowKey(FlowKey *key, struct timespec *ttime, const uint32_t hash)
1102 {
1103  Flow *f = FlowGetExistingFlowFromHash(key, hash);
1104 
1105  if (f != NULL) {
1106  return f;
1107  }
1108  /* TODO use spare pool */
1109  /* now see if we can alloc a new flow */
1110  f = FlowAlloc();
1111  if (f == NULL) {
1112  SCLogDebug("Can't get a spare flow at start");
1113  return NULL;
1114  }
1115  f->proto = key->proto;
1116  memcpy(&f->vlan_id[0], &key->vlan_id[0], sizeof(f->vlan_id));
1117  ;
1118  f->src.addr_data32[0] = key->src.addr_data32[0];
1119  f->src.addr_data32[1] = key->src.addr_data32[1];
1120  f->src.addr_data32[2] = key->src.addr_data32[2];
1121  f->src.addr_data32[3] = key->src.addr_data32[3];
1122  f->dst.addr_data32[0] = key->dst.addr_data32[0];
1123  f->dst.addr_data32[1] = key->dst.addr_data32[1];
1124  f->dst.addr_data32[2] = key->dst.addr_data32[2];
1125  f->dst.addr_data32[3] = key->dst.addr_data32[3];
1126  f->sp = key->sp;
1127  f->dp = key->dp;
1128  f->recursion_level = 0;
1129  // f->livedev is set by caller EBPFCreateFlowForKey
1130  f->flow_hash = hash;
1131  if (key->src.family == AF_INET) {
1132  f->flags |= FLOW_IPV4;
1133  } else if (key->src.family == AF_INET6) {
1134  f->flags |= FLOW_IPV6;
1135  }
1136 
1138  /* set timestamp to now */
1139  f->startts = SCTIME_FROM_TIMESPEC(ttime);
1140  f->lastts = f->startts;
1141 
1142  FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
1143  FBLOCK_LOCK(fb);
1144  f->fb = fb;
1145  f->next = fb->head;
1146  fb->head = f;
1147  FLOWLOCK_WRLOCK(f);
1148  FBLOCK_UNLOCK(fb);
1149  return f;
1150 }
1151 
1152 #define FLOW_GET_NEW_TRIES 5
1154 /* inline locking wrappers to make profiling easier */
1155 
1156 static inline int GetUsedTryLockBucket(FlowBucket *fb)
1157 {
1158  int r = FBLOCK_TRYLOCK(fb);
1159  return r;
1160 }
1161 static inline int GetUsedTryLockFlow(Flow *f)
1162 {
1163  int r = FLOWLOCK_TRYWRLOCK(f);
1164  return r;
1165 }
1166 static inline uint32_t GetUsedAtomicUpdate(const uint32_t val)
1167 {
1168  uint32_t r = SC_ATOMIC_ADD(flow_prune_idx, val);
1169  return r;
1170 }
1171 
1172 /** \internal
1173  * \brief check if flow has just seen an update.
1174  */
1175 static inline bool StillAlive(const Flow *f, const SCTime_t ts)
1176 {
1177  switch (f->flow_state) {
1178  case FLOW_STATE_NEW:
1179  if (SCTIME_SECS(ts) - SCTIME_SECS(f->lastts) <= 1) {
1180  return true;
1181  }
1182  break;
1184  if (SCTIME_SECS(ts) - SCTIME_SECS(f->lastts) <= 5) {
1185  return true;
1186  }
1187  break;
1188  case FLOW_STATE_CLOSED:
1189  if (SCTIME_SECS(ts) - SCTIME_SECS(f->lastts) <= 3) {
1190  return true;
1191  }
1192  break;
1193  default:
1194  if (SCTIME_SECS(ts) - SCTIME_SECS(f->lastts) < 30) {
1195  return true;
1196  }
1197  break;
1198  }
1199  return false;
1200 }
1201 
1202 #ifdef UNITTESTS
1203  #define STATSADDUI64(cnt, value) \
1204  if (tv && dtv) { \
1205  StatsAddUI64(tv, dtv->cnt, (value)); \
1206  }
1207 #else
1208  #define STATSADDUI64(cnt, value) \
1209  StatsAddUI64(tv, dtv->cnt, (value));
1210 #endif
1211 
1212 /** \internal
1213  * \brief Get a flow from the hash directly.
1214  *
1215  * Called in conditions where the spare queue is empty and memcap is reached.
1216  *
1217  * Walks the hash until a flow can be freed. Timeouts are disregarded.
1218  * "flow_prune_idx" atomic int makes sure we don't start at the
1219  * top each time since that would clear the top of the hash leading to longer
1220  * and longer search times under high pressure (observed).
1221  *
1222  * \param tv thread vars
1223  * \param dtv decode thread vars (for flow log api thread data)
1224  *
1225  * \retval f flow or NULL
1226  */
1227 static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv, const SCTime_t ts)
1228 {
1229  uint32_t idx = GetUsedAtomicUpdate(FLOW_GET_NEW_TRIES) % flow_config.hash_size;
1230  uint32_t tried = 0;
1231 
1232  while (1) {
1233  if (tried++ > FLOW_GET_NEW_TRIES) {
1234  STATSADDUI64(counter_flow_get_used_eval, tried);
1235  break;
1236  }
1237  if (++idx >= flow_config.hash_size)
1238  idx = 0;
1239 
1240  FlowBucket *fb = &flow_hash[idx];
1241 
1242  if (SC_ATOMIC_GET(fb->next_ts) == UINT_MAX)
1243  continue;
1244 
1245  if (GetUsedTryLockBucket(fb) != 0) {
1246  STATSADDUI64(counter_flow_get_used_eval_busy, 1);
1247  continue;
1248  }
1249 
1250  Flow *f = fb->head;
1251  if (f == NULL) {
1252  FBLOCK_UNLOCK(fb);
1253  continue;
1254  }
1255 
1256  if (GetUsedTryLockFlow(f) != 0) {
1257  STATSADDUI64(counter_flow_get_used_eval_busy, 1);
1258  FBLOCK_UNLOCK(fb);
1259  continue;
1260  }
1261 
1262  if (StillAlive(f, ts)) {
1263  STATSADDUI64(counter_flow_get_used_eval_reject, 1);
1264  FBLOCK_UNLOCK(fb);
1265  FLOWLOCK_UNLOCK(f);
1266  continue;
1267  }
1268 
1269  /* remove from the hash */
1270  fb->head = f->next;
1271  f->next = NULL;
1272  f->fb = NULL;
1273  FBLOCK_UNLOCK(fb);
1274 
1275  /* rest of the flags is updated on-demand in output */
1277  if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
1279 
1280  /* invoke flow log api */
1281 #ifdef UNITTESTS
1282  if (dtv) {
1283 #endif
1286  }
1287 #ifdef UNITTESTS
1288  }
1289 #endif
1290 
1292  FlowClearMemory(f, f->protomap);
1293 
1294  /* leave locked */
1295 
1296  STATSADDUI64(counter_flow_get_used_eval, tried);
1297  return f;
1298  }
1299 
1300  STATSADDUI64(counter_flow_get_used_failed, 1);
1301  return NULL;
1302 }
FlowHashKey4_::ports
uint16_t ports[2]
Definition: flow-hash.c:91
ESP_GET_SPI
#define ESP_GET_SPI(esph)
Get the spi field off a packet.
Definition: decode-esp.h:29
FlowLookupStruct_::work_queue
FlowQueuePrivate work_queue
Definition: flow.h:538
util-device-private.h
Flow_::icmp_s
struct Flow_::@129::@135 icmp_s
OutputFlowLog
TmEcode OutputFlowLog(ThreadVars *tv, void *thread_data, Flow *f)
Run flow logger(s)
Definition: output-flow.c:87
Packet_::proto
uint8_t proto
Definition: decode.h:523
DecodeThreadVars_::counter_flow_udp
uint16_t counter_flow_udp
Definition: decode.h:1036
ts
uint64_t ts
Definition: source-erf-file.c:55
ExceptionPolicyApply
void ExceptionPolicyApply(Packet *p, enum ExceptionPolicy policy, enum PacketDropReason drop_reason)
Definition: util-exception-policy.c:136
g_livedev_mask
uint16_t g_livedev_mask
Definition: suricata.c:206
DecodeThreadVars_::counter_flow_active
uint16_t counter_flow_active
Definition: decode.h:1034
ICMPV4_GET_EMB_PROTO
#define ICMPV4_GET_EMB_PROTO(p)
Definition: decode-icmpv4.h:243
Flow_::recursion_level
uint8_t recursion_level
Definition: flow.h:371
StatsIncr
void StatsIncr(ThreadVars *tv, uint16_t id)
Increments the local counter.
Definition: counters.c:166
IPV4_GET_RAW_IPDST_U32
#define IPV4_GET_RAW_IPDST_U32(ip4h)
Definition: decode-ipv4.h:110
hashword
uint32_t hashword(const uint32_t *k, size_t length, uint32_t initval)
Definition: util-hash-lookup3.c:172
FLOW_STATE_ESTABLISHED
@ FLOW_STATE_ESTABLISHED
Definition: flow.h:497
flow-util.h
FBLOCK_LOCK
#define FBLOCK_LOCK(fb)
Definition: flow-hash.h:73
FlowLookupStruct_::dtv
DecodeThreadVars * dtv
Definition: flow.h:537
DecodeThreadVars_::counter_flow_icmp4
uint16_t counter_flow_icmp4
Definition: decode.h:1037
Flow_::startts
SCTime_t startts
Definition: flow.h:485
TcpSessionPacketSsnReuse
bool TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, const void *tcp_ssn)
Definition: stream-tcp.c:5931
stream-tcp.h
FlowKey_::src
Address src
Definition: flow.h:302
unlikely
#define unlikely(expr)
Definition: util-optimize.h:35
SC_ATOMIC_SET
#define SC_ATOMIC_SET(name, val)
Set the value for the atomic variable.
Definition: util-atomic.h:386
PKT_DROP_REASON_FLOW_MEMCAP
@ PKT_DROP_REASON_FLOW_MEMCAP
Definition: decode.h:385
FlowCnf_::hash_size
uint32_t hash_size
Definition: flow.h:286
FlowAddress_::address_un_data32
uint32_t address_un_data32[4]
Definition: flow.h:312
FlowSpareGetFromPool
FlowQueuePrivate FlowSpareGetFromPool(void)
Definition: flow-spare-pool.c:173
SCLogDebug
#define SCLogDebug(...)
Definition: util-debug.h:275
Packet_::pcap_cnt
uint64_t pcap_cnt
Definition: decode.h:626
DecodeThreadVars_::counter_flow_spare_sync_avg
uint16_t counter_flow_spare_sync_avg
Definition: decode.h:1050
Flow_::proto
uint8_t proto
Definition: flow.h:370
Packet_::flags
uint32_t flags
Definition: decode.h:544
FlowKeyGetHash
uint32_t FlowKeyGetHash(FlowKey *fk)
Definition: flow-hash.c:314
threads.h
ICMPV4_DEST_UNREACH_IS_VALID
#define ICMPV4_DEST_UNREACH_IS_VALID(p)
Definition: decode-icmpv4.h:253
TH_RST
#define TH_RST
Definition: decode-tcp.h:36
flow-private.h
Flow_
Flow data structure.
Definition: flow.h:348
FlowHashKey4_::vlan_id
uint16_t vlan_id[VLAN_MAX_LAYERS]
Definition: flow-hash.c:95
TH_FIN
#define TH_FIN
Definition: decode-tcp.h:34
Flow_::protomap
uint8_t protomap
Definition: flow.h:437
LiveDevice_
Definition: util-device-private.h:32
SC_ATOMIC_ADD
#define SC_ATOMIC_ADD(name, val)
add a value to our atomic variable
Definition: util-atomic.h:332
FlowProtoTimeout_
Definition: flow.h:510
LiveDevice_::id
uint16_t id
Definition: util-device-private.h:38
FLOWLOCK_TRYWRLOCK
#define FLOWLOCK_TRYWRLOCK(fb)
Definition: flow.h:264
PKT_WANTS_FLOW
#define PKT_WANTS_FLOW
Definition: decode.h:1296
flow-hash.h
FlowLookupStruct_
Definition: flow.h:534
FlowHashKey4
struct FlowHashKey4_ FlowHashKey4
FBLOCK_TRYLOCK
#define FBLOCK_TRYLOCK(fb)
Definition: flow-hash.h:74
TcpStreamCnf_
Definition: stream-tcp.h:54
DecodeThreadVars_::counter_flow_tcp
uint16_t counter_flow_tcp
Definition: decode.h:1035
Address_::address_un_data32
uint32_t address_un_data32[4]
Definition: decode.h:115
proto
uint8_t proto
Definition: decode-template.h:0
FlowHashKey6_::recur
uint8_t recur
Definition: flow-hash.c:108
Flow_::dp
Port dp
Definition: flow.h:364
Packet_::icmp_s
struct Packet_::@33::@40 icmp_s
stream_config
TcpStreamCnf stream_config
Definition: stream-tcp.c:219
FlowQueuePrivate_::len
uint32_t len
Definition: flow-queue.h:43
Flow_::protoctx
void * protoctx
Definition: flow.h:433
FLOW_IPV4
#define FLOW_IPV4
Definition: flow.h:100
DecodeThreadVars_::counter_flow_get_used
uint16_t counter_flow_get_used
Definition: decode.h:1041
g_recurlvl_mask
uint8_t g_recurlvl_mask
Definition: suricata.c:210
TmThreadsGetThreadTime
SCTime_t TmThreadsGetThreadTime(const int idx)
Definition: tm-threads.c:2361
FLOWLOCK_UNLOCK
#define FLOWLOCK_UNLOCK(fb)
Definition: flow.h:265
Flow_::flow_state
FlowStateType flow_state
Definition: flow.h:404
DecodeThreadVars_::counter_flow_spare_sync_empty
uint16_t counter_flow_spare_sync_empty
Definition: decode.h:1048
DecodeThreadVars_::counter_flow_tcp_reuse
uint16_t counter_flow_tcp_reuse
Definition: decode.h:1039
DecodeThreadVars_::counter_flow_total
uint16_t counter_flow_total
Definition: decode.h:1033
FlowWakeupFlowManagerThread
void FlowWakeupFlowManagerThread(void)
Definition: flow-manager.c:84
FLOW_CHECK_MEMCAP
#define FLOW_CHECK_MEMCAP(size)
check if a memory alloc would fit in the memcap
Definition: flow-util.h:134
FlowLookupStruct_::emerg_spare_sync_stamp
uint32_t emerg_spare_sync_stamp
Definition: flow.h:539
flow-spare-pool.h
DecodeThreadVars_::counter_flow_spare_sync
uint16_t counter_flow_spare_sync
Definition: decode.h:1047
Flow_::dst
FlowAddress dst
Definition: flow.h:351
Flow_::fb
struct FlowBucket_ * fb
Definition: flow.h:483
FlowHashKey6_::ports
uint16_t ports[2]
Definition: flow-hash.c:106
FlowHashKey4_::u32
const uint32_t u32[6]
Definition: flow-hash.c:98
TCPHdr_::th_flags
uint8_t th_flags
Definition: decode-tcp.h:155
decode.h
util-debug.h
SCFlowRunFinishCallbacks
void SCFlowRunFinishCallbacks(ThreadVars *tv, Flow *f)
Definition: flow-callbacks.c:122
FLOW_GET_NEW_TRIES
#define FLOW_GET_NEW_TRIES
Definition: flow-hash.c:1152
STATSADDUI64
#define STATSADDUI64(cnt, value)
Definition: flow-hash.c:1203
SCTIME_FROM_TIMESPEC
#define SCTIME_FROM_TIMESPEC(ts)
Definition: util-time.h:91
PacketL4::L4Vars::icmpv4
ICMPV4Vars icmpv4
Definition: decode.h:479
Packet_::ts
SCTime_t ts
Definition: decode.h:555
FlowHashKey4_::livedev
uint16_t livedev
Definition: flow-hash.c:94
util-exception-policy.h
FlowHashKey6_::proto
uint8_t proto
Definition: flow-hash.c:107
Flow_::lastts
SCTime_t lastts
Definition: flow.h:402
FLOWLOCK_WRLOCK
#define FLOWLOCK_WRLOCK(fb)
Definition: flow.h:262
SC_ATOMIC_EXTERN
SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx)
ExceptionPolicyCounters_::eps_id
uint16_t eps_id[EXCEPTION_POLICY_MAX]
Definition: util-exception-policy-types.h:53
FlowKey_::recursion_level
uint8_t recursion_level
Definition: flow.h:305
ICMPV4Vars_::emb_dport
uint16_t emb_dport
Definition: decode-icmpv4.h:197
FlowAddress_::address
union FlowAddress_::@128 address
ThreadVars_
Per thread variable structure.
Definition: threadvars.h:58
Flow_::flow_end_flags
uint8_t flow_end_flags
Definition: flow.h:439
FlowStorageSize
unsigned int FlowStorageSize(void)
Definition: flow-storage.c:35
Packet_::sp
Port sp
Definition: decode.h:508
FlowHashKey6
struct FlowHashKey6_ FlowHashKey6
FlowHashKey4_::pad
uint16_t pad[1]
Definition: flow-hash.c:96
TH_ACK
#define TH_ACK
Definition: decode-tcp.h:38
util-time.h
FlowQueuePrivateGetFromTop
Flow * FlowQueuePrivateGetFromTop(FlowQueuePrivate *fqc)
Definition: flow-queue.c:151
app-layer-parser.h
FlowKey_::livedev_id
uint16_t livedev_id
Definition: flow.h:306
ThreadVars_::id
int id
Definition: threadvars.h:87
FlowHashKey6_::livedev
uint16_t livedev
Definition: flow-hash.c:109
BUG_ON
#define BUG_ON(x)
Definition: suricata-common.h:317
Address_::address
union Address_::@30 address
flow_timeouts_emerg
FlowProtoTimeout flow_timeouts_emerg[FLOW_PROTO_MAX]
Definition: flow.c:89
FlowThreadId
uint16_t FlowThreadId
Definition: flow.h:325
FlowKey_::sp
Port sp
Definition: flow.h:303
TimeModeIsLive
bool TimeModeIsLive(void)
Definition: util-time.c:111
FlowTimeoutsEmergency
void FlowTimeoutsEmergency(void)
Definition: flow-manager.c:103
FlowGetProtoMapping
uint8_t FlowGetProtoMapping(uint8_t proto)
Function to map the protocol to the defined FLOW_PROTO_* enumeration.
Definition: flow-util.c:99
Packet_
Definition: decode.h:501
FlowHashKey6_::pad
uint16_t pad[1]
Definition: flow-hash.c:111
FLOW_END_FLAG_TCPREUSE
#define FLOW_END_FLAG_TCPREUSE
Definition: flow.h:238
conf.h
Packet_::l4
struct PacketL4 l4
Definition: decode.h:601
Port
uint16_t Port
Definition: decode.h:218
FLOW_END_FLAG_EMERGENCY
#define FLOW_END_FLAG_EMERGENCY
Definition: flow.h:234
FBLOCK_UNLOCK
#define FBLOCK_UNLOCK(fb)
Definition: flow-hash.h:75
SCTime_t
Definition: util-time.h:40
Packet_::livedev
struct LiveDevice_ * livedev
Definition: decode.h:618
FlowHashKey4_
Definition: flow-hash.c:87
FlowClearMemory
int FlowClearMemory(Flow *f, uint8_t proto_map)
Function clear the flow memory before queueing it to spare flow queue.
Definition: flow.c:1096
STREAM_PKT_FLAG_TCP_SESSION_REUSE
#define STREAM_PKT_FLAG_TCP_SESSION_REUSE
Definition: stream-tcp-private.h:322
FlowCnf_::hash_rand
uint32_t hash_rand
Definition: flow.h:285
FlowHashKey4_::addrs
uint32_t addrs[2]
Definition: flow-hash.c:90
output-flow.h
detect-engine-state.h
Data structures and function prototypes for keeping state for the detection engine.
FlowHashKey6_
Definition: flow-hash.c:102
flow-timeout.h
Flow_::flow_hash
uint32_t flow_hash
Definition: flow.h:393
RunmodeIsUnittests
int RunmodeIsUnittests(void)
Definition: suricata.c:270
FlowUpdateState
void FlowUpdateState(Flow *f, const enum FlowState s)
Definition: flow.c:1161
Flow_::src
FlowAddress src
Definition: flow.h:351
Flow_::next
struct Flow_ * next
Definition: flow.h:388
dtv
DecodeThreadVars * dtv
Definition: fuzz_decodepcapfile.c:33
FlowHashKey6_::src
uint32_t src[4]
Definition: flow-hash.c:105
FlowLookupStruct_::spare_queue
FlowQueuePrivate spare_queue
Definition: flow.h:536
FlowGetIpPairProtoHash
uint32_t FlowGetIpPairProtoHash(const Packet *p)
Definition: flow-hash.c:117
SCTIME_CMP_LT
#define SCTIME_CMP_LT(a, b)
Definition: util-time.h:105
ARRAY_SIZE
#define ARRAY_SIZE(arr)
Definition: suricata-common.h:562
flow_hash
FlowBucket * flow_hash
Definition: flow-hash.c:59
flow-storage.h
TH_SYN
#define TH_SYN
Definition: decode-tcp.h:35
FLOW_STATE_NEW
@ FLOW_STATE_NEW
Definition: flow.h:496
DecodeThreadVars_::counter_flow_spare_sync_incomplete
uint16_t counter_flow_spare_sync_incomplete
Definition: decode.h:1049
flow-manager.h
suricata-common.h
IPV4_GET_RAW_IPSRC_U32
#define IPV4_GET_RAW_IPSRC_U32(ip4h)
Definition: decode-ipv4.h:108
FLOW_IPV6
#define FLOW_IPV6
Definition: flow.h:102
DecodeThreadVars_::counter_flow_icmp6
uint16_t counter_flow_icmp6
Definition: decode.h:1038
flow_config
FlowConfig flow_config
Definition: flow.c:93
FlowKey_::dst
Address dst
Definition: flow.h:302
SCTIME_SECS
#define SCTIME_SECS(t)
Definition: util-time.h:57
Packet_::icmp_d
struct Packet_::@35::@41 icmp_d
VLAN_MAX_LAYERS
#define VLAN_MAX_LAYERS
Definition: decode-vlan.h:51
DecodeThreadVars_::counter_flow_memcap_eps
ExceptionPolicyCounters counter_flow_memcap_eps
Definition: decode.h:1030
Flow_::icmp_d
struct Flow_::@131::@137 icmp_d
util-hash-lookup3.h
FlowGetFlowFromHash
Flow * FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow **dest)
Get Flow for packet.
Definition: flow-hash.c:907
Flow_::timeout_policy
uint32_t timeout_policy
Definition: flow.h:397
tv
ThreadVars * tv
Definition: fuzz_decodepcapfile.c:32
Flow_::livedev
struct LiveDevice_ * livedev
Definition: flow.h:390
FlowHashKey6_::dst
uint32_t dst[4]
Definition: flow-hash.c:105
TcpStreamCnf_::midstream
bool midstream
Definition: stream-tcp.h:70
FlowCnf_::memcap_policy
enum ExceptionPolicy memcap_policy
Definition: flow.h:294
StatsAddUI64
void StatsAddUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Adds a value of type uint64_t to the local counter.
Definition: counters.c:146
g_vlan_mask
uint16_t g_vlan_mask
Definition: suricata.c:202
Packet_::flow_hash
uint32_t flow_hash
Definition: decode.h:550
FlowKey_::proto
uint8_t proto
Definition: flow.h:304
FLOW_STATE_CLOSED
@ FLOW_STATE_CLOSED
Definition: flow.h:498
flow-callbacks.h
DecodeThreadVars_
Structure to hold thread specific data for all decode modules.
Definition: decode.h:963
FLOW_PROTO_MAX
@ FLOW_PROTO_MAX
Definition: flow-private.h:74
FlowGetFromFlowKey
Flow * FlowGetFromFlowKey(FlowKey *key, struct timespec *ttime, const uint32_t hash)
Get or create a Flow using a FlowKey.
Definition: flow-hash.c:1101
Flow_::flags
uint32_t flags
Definition: flow.h:413
Packet_::recursion_level
uint8_t recursion_level
Definition: decode.h:526
DecodeThreadVars_::output_flow_thread_data
void * output_flow_thread_data
Definition: decode.h:1056
FlowInit
void FlowInit(ThreadVars *tv, Flow *f, const Packet *p)
Definition: flow-util.c:147
FlowKey_
Definition: flow.h:301
FlowHashKey6_::vlan_id
uint16_t vlan_id[VLAN_MAX_LAYERS]
Definition: flow-hash.c:110
FlowHashKey4_::proto
uint8_t proto
Definition: flow-hash.c:92
FLOW_EMERGENCY
#define FLOW_EMERGENCY
Definition: flow-private.h:37
STREAM_PKT_FLAG_SET
#define STREAM_PKT_FLAG_SET(p, f)
Definition: stream-tcp-private.h:326
Address_::family
char family
Definition: decode.h:113
Packet_::dst
Address dst
Definition: decode.h:506
Flow_::vlan_id
uint16_t vlan_id[VLAN_MAX_LAYERS]
Definition: flow.h:372
DecodeThreadVars_::counter_flow_memcap
uint16_t counter_flow_memcap
Definition: decode.h:1029
FLOW_END_FLAG_TIMEOUT
#define FLOW_END_FLAG_TIMEOUT
Definition: flow.h:235
FlowHashKey6_::u32
const uint32_t u32[12]
Definition: flow-hash.c:113
Flow_::esp
struct Flow_::@129::@136 esp
Packet_::vlan_id
uint16_t vlan_id[VLAN_MAX_LAYERS]
Definition: decode.h:528
likely
#define likely(expr)
Definition: util-optimize.h:32
Flow_::sp
Port sp
Definition: flow.h:353
ICMPV4Vars_::emb_sport
uint16_t emb_sport
Definition: decode-icmpv4.h:196
SC_ATOMIC_GET
#define SC_ATOMIC_GET(name)
Get the value from the atomic variable.
Definition: util-atomic.h:375
FlowKey_::vlan_id
uint16_t vlan_id[VLAN_MAX_LAYERS]
Definition: flow.h:307
flow.h
FlowQueuePrivateAppendFlow
void FlowQueuePrivateAppendFlow(FlowQueuePrivate *fqc, Flow *f)
Definition: flow-queue.c:65
FlowAlloc
Flow * FlowAlloc(void)
allocate a flow
Definition: flow-util.c:56
SCTIME_ADD_SECS
#define SCTIME_ADD_SECS(ts, s)
Definition: util-time.h:64
FlowGetExistingFlowFromFlowId
Flow * FlowGetExistingFlowFromFlowId(uint64_t flow_id)
Look for existing Flow using a flow id value.
Definition: flow-hash.c:1037
Packet_::dp
Port dp
Definition: decode.h:516
ExceptionPolicy
ExceptionPolicy
Definition: util-exception-policy-types.h:25
FLOW_END_FLAG_FORCED
#define FLOW_END_FLAG_FORCED
Definition: flow.h:236
ICMPV4_IS_ERROR_MSG
#define ICMPV4_IS_ERROR_MSG(type)
Definition: decode-icmpv4.h:267
FlowKey_::dp
Port dp
Definition: flow.h:303
FlowSetupPacket
void FlowSetupPacket(Packet *p)
prepare packet for a life with flow Set PKT_WANTS_FLOW flag to indicate workers should do a flow look...
Definition: flow-hash.c:533
TCPHdr_
Definition: decode-tcp.h:149
Packet_::src
Address src
Definition: decode.h:505
PacketL4::vars
union PacketL4::L4Vars vars
FlowHashKey4_::recur
uint8_t recur
Definition: flow-hash.c:93
output.h
Flow_::thread_id
FlowThreadId thread_id[2]
Definition: flow.h:386
SC_ATOMIC_OR
#define SC_ATOMIC_OR(name, val)
Bitwise OR a value to our atomic variable.
Definition: util-atomic.h:350