suricata
util-napatech.c
Go to the documentation of this file.
1 /* Copyright (C) 2017 Open Information Security Foundation
2  *
3  * You can copy, redistribute or modify this Program under the terms of
4  * the GNU General Public License version 2 as published by the Free
5  * Software Foundation.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * version 2 along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15  * 02110-1301, USA.
16  */
17 
18 /**
19  * \file
20  *
21  * \author Napatech Inc.
22  * \author Phil Young <py@napatech.com>
23  *
24  *
25  */
26 
27 #include "suricata-common.h"
28 #ifdef HAVE_NAPATECH
29 #include "suricata.h"
30 #include "util-device.h"
31 #include "util-cpu.h"
32 #include "threadvars.h"
33 #include "tm-threads.h"
34 
35 /*-----------------------------------------------------------------------------
36  *-----------------------------------------------------------------------------
37  * Statistics code
38  *-----------------------------------------------------------------------------
39  */
40 
41 typedef struct StreamCounters_ {
42  uint16_t pkts;
43  uint16_t byte;
44  uint16_t drop;
46 
47 
49 
51 {
52 
53  return current_stats[id];
54 }
55 
60 };
61 
62 #define MAX_HOSTBUFFERS 8
63 
64 static uint16_t TestStreamConfig(
65  NtInfoStream_t hInfo,
66  NtStatStream_t hStatStream,
68  uint16_t num_inst)
69 {
70 
71  uint16_t num_active = 0;
72 
73  for (uint16_t inst = 0; inst < num_inst; ++inst) {
74  int status;
75  char buffer[80]; // Error buffer
76  NtStatistics_t stat; // Stat handle.
77 
78  /* Check to see if it is an active stream */
79  memset(&stat, 0, sizeof (NtStatistics_t));
80 
81  /* Read usage data for the chosen stream ID */
82  stat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
83  stat.u.usageData_v0.streamid = (uint8_t) stream_config[inst].stream_id;
84 
85  if ((status = NT_StatRead(hStatStream, &stat)) != NT_SUCCESS) {
86  /* Get the status code as text */
87  NT_ExplainError(status, buffer, sizeof (buffer));
89  "NT_StatRead():2 failed: %s\n", buffer);
90  return 0;
91  }
92 
93  if (stat.u.usageData_v0.data.numHostBufferUsed > 0) {
94  stream_config[inst].is_active = true;
95  num_active++;
96  } else {
97  stream_config[inst].is_active = false;
98  }
99  }
100 
101  return num_active;
102 }
103 
104 static uint32_t UpdateStreamStats(ThreadVars *tv,
105  NtInfoStream_t hInfo,
106  NtStatStream_t hStatStream,
107  uint16_t num_streams,
109  StreamCounters streamCounters[]
110  )
111 {
112  static uint64_t rxPktsStart[MAX_STREAMS] = {0};
113  static uint64_t rxByteStart[MAX_STREAMS] = {0};
114  static uint64_t dropStart[MAX_STREAMS] = {0};
115 
116  int status;
117  char error_buffer[80]; // Error buffer
118  NtInfo_t hStreamInfo;
119  NtStatistics_t hStat; // Stat handle.
120 
121  /* Query the system to get the number of streams currently instantiated */
122  hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM;
123  if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) {
124  NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
126  "NT_InfoRead() failed: %s\n", error_buffer);
127  exit(EXIT_FAILURE);
128  }
129 
130  uint16_t num_active;
131  if ((num_active = TestStreamConfig(hInfo, hStatStream,
132  stream_config, num_streams)) == 0) {
133  /* None of the configured streams are active */
134  return 0;
135  }
136 
137  /* At least one stream is active so proceed with the stats. */
138  uint16_t inst_id = 0;
139  uint32_t stream_cnt = 0;
140  for (stream_cnt = 0; stream_cnt < num_streams; ++stream_cnt) {
141 
142 
143  while (inst_id < num_streams) {
144  if (stream_config[inst_id].is_active) {
145  break;
146  } else {
147  ++inst_id;
148  }
149  }
150  if (inst_id == num_streams)
151  break;
152 
153  /* Read usage data for the chosen stream ID */
154  memset(&hStat, 0, sizeof (NtStatistics_t));
155  hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
156  hStat.u.usageData_v0.streamid = (uint8_t) stream_config[inst_id].stream_id;
157 
158  if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
159  /* Get the status code as text */
160  NT_ExplainError(status, error_buffer, sizeof (error_buffer));
162  "NT_StatRead() failed: %s\n", error_buffer);
163  return 0;
164  }
165 
166  uint16_t stream_id = stream_config[inst_id].stream_id;
167  if (stream_config[inst_id].is_active) {
168  uint64_t rxPktsTotal = 0;
169  uint64_t rxByteTotal = 0;
170  uint64_t dropTotal = 0;
171 
172  for (uint32_t hbCount = 0; hbCount < hStat.u.usageData_v0.data.numHostBufferUsed; hbCount++) {
173  if (unlikely(stream_config[inst_id].initialized == false)) {
174  rxPktsStart[stream_id] += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.frames;
175  rxByteStart[stream_id] += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.bytes;
176  dropStart[stream_id] += hStat.u.usageData_v0.data.hb[hbCount].stat.drop.frames;
177  stream_config[inst_id].initialized = true;
178  } else {
179  rxPktsTotal += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.frames;
180  rxByteTotal += hStat.u.usageData_v0.data.hb[hbCount].stat.rx.bytes;
181  dropTotal += hStat.u.usageData_v0.data.hb[hbCount].stat.drop.frames;
182  }
183  }
184 
185  current_stats[stream_id].current_packets = rxPktsTotal - rxPktsStart[stream_id];
186  current_stats[stream_id].current_bytes = rxByteTotal - rxByteStart[stream_id];
187  current_stats[stream_id].current_drops = dropTotal - dropStart[stream_id];
188  }
189 
190  StatsSetUI64(tv, streamCounters[inst_id].pkts, current_stats[stream_id].current_packets);
191  StatsSetUI64(tv, streamCounters[inst_id].byte, current_stats[stream_id].current_bytes);
192  StatsSetUI64(tv, streamCounters[inst_id].drop, current_stats[stream_id].current_drops);
193 
194  ++inst_id;
195  }
196  return num_active;
197 }
198 
199 static void *NapatechStatsLoop(void *arg)
200 {
201  ThreadVars *tv = (ThreadVars *) arg;
202 
203  int status;
204  char error_buffer[80]; // Error buffer
205  NtInfoStream_t hInfo;
206  NtStatStream_t hStatStream;
207 
208  NapatechStreamConfig stream_config[MAX_STREAMS];
209  uint16_t stream_cnt = NapatechGetStreamConfig(stream_config);
210 
211  /* Open the info and Statistics */
212  if ((status = NT_InfoOpen(&hInfo, "StatsLoopInfoStream")) != NT_SUCCESS) {
213  NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
214  SCLogError(SC_ERR_RUNMODE, "NT_InfoOpen() failed: %s\n", error_buffer);
215  return NULL;
216  }
217 
218  if ((status = NT_StatOpen(&hStatStream, "StatsLoopStatsStream")) != NT_SUCCESS) {
219  /* Get the status code as text */
220  NT_ExplainError(status, error_buffer, sizeof (error_buffer));
221  SCLogError(SC_ERR_RUNMODE, "NT_StatOpen() failed: %s\n", error_buffer);
222  return NULL;
223  }
224 
225  StreamCounters streamCounters[MAX_STREAMS];
226  for (int i = 0; i < stream_cnt; ++i) {
227  char *pkts_buf = SCCalloc(1, 32);
228  if (unlikely(pkts_buf == NULL)) {
230  "Failed to allocate memory for NAPATECH stream counter.");
231  exit(EXIT_FAILURE);
232  }
233 
234  snprintf(pkts_buf, 32, "nt%d.pkts", stream_config[i].stream_id);
235  streamCounters[i].pkts = StatsRegisterCounter(pkts_buf, tv);
236 
237  char *byte_buf = SCCalloc(1, 32);
238  if (unlikely(byte_buf == NULL)) {
240  "Failed to allocate memory for NAPATECH stream counter.");
241  exit(EXIT_FAILURE);
242  }
243  snprintf(byte_buf, 32, "nt%d.bytes", stream_config[i].stream_id);
244  streamCounters[i].byte = StatsRegisterCounter(byte_buf, tv);
245 
246  char *drop_buf = SCCalloc(1, 32);
247  if (unlikely(drop_buf == NULL)) {
249  "Failed to allocate memory for NAPATECH stream counter.");
250  exit(EXIT_FAILURE);
251  }
252  snprintf(drop_buf, 32, "nt%d.drop", stream_config[i].stream_id);
253  streamCounters[i].drop = StatsRegisterCounter(drop_buf, tv);
254  }
255 
256  StatsSetupPrivate(tv);
257 
258  for (int i = 0; i < stream_cnt; ++i) {
259  StatsSetUI64(tv, streamCounters[i].pkts, 0);
260  StatsSetUI64(tv, streamCounters[i].byte, 0);
261  StatsSetUI64(tv, streamCounters[i].drop, 0);
262  }
263 
264  uint32_t num_active = UpdateStreamStats(tv, hInfo, hStatStream,
265  stream_cnt, stream_config, streamCounters);
266 
267  if (!NapatechIsAutoConfigEnabled() && (num_active < stream_cnt)) {
268  SCLogInfo("num_active: %d, stream_cnt: %d", num_active, stream_cnt);
270  "Some or all of the configured streams are not created. Proceeding with active streams.");
271  }
272 
274  while (1) {
275  if (TmThreadsCheckFlag(tv, THV_KILL)) {
276  SCLogDebug("NapatechStatsLoop THV_KILL detected");
277  break;
278  }
279 
280  UpdateStreamStats(tv, hInfo, hStatStream,
281  stream_cnt, stream_config, streamCounters);
282 
284  usleep(1000000);
285  }
286 
287  /* CLEAN UP NT Resources and Close the info stream */
288  if ((status = NT_InfoClose(hInfo)) != NT_SUCCESS) {
289  NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
290  SCLogError(SC_ERR_RUNMODE, "NT_InfoClose() failed: %s\n", error_buffer);
291  return NULL;
292  }
293 
294  /* Close the statistics stream */
295  if ((status = NT_StatClose(hStatStream)) != NT_SUCCESS) {
296  /* Get the status code as text */
297  NT_ExplainError(status, error_buffer, sizeof (error_buffer));
298  SCLogError(SC_ERR_RUNMODE, "NT_StatClose() failed: %s\n", error_buffer);
299  return NULL;
300  }
301 
302 
303  SCLogDebug("Exiting NapatechStatsLoop");
307 
308  return NULL;
309 }
310 
311 #define MAX_HOSTBUFFER 4
312 #define MAX_STREAMS 256
313 #define HB_HIGHWATER 2048 //1982
314 
315 static bool RegisteredStream(uint16_t stream_id, uint16_t num_registered,
316  NapatechStreamConfig registered_streams[])
317 {
318  for (uint16_t reg_id = 0; reg_id < num_registered; ++reg_id) {
319  if (stream_id == registered_streams[reg_id].stream_id) {
320  return true;
321  }
322  }
323  return false;
324 }
325 
326 static uint32_t CountWorkerThreads(void)
327 {
328  int worker_count = 0;
329 
330  ConfNode *affinity;
331  ConfNode *root = ConfGetNode("threading.cpu-affinity");
332 
333  if (root != NULL) {
334  TAILQ_FOREACH(affinity, &root->head, next)
335  {
336  if (strcmp(affinity->val, "decode-cpu-set") == 0 ||
337  strcmp(affinity->val, "stream-cpu-set") == 0 ||
338  strcmp(affinity->val, "reject-cpu-set") == 0 ||
339  strcmp(affinity->val, "output-cpu-set") == 0) {
340  continue;
341  }
342 
343  if (strcmp(affinity->val, "worker-cpu-set") == 0) {
344  ConfNode *node = ConfNodeLookupChild(affinity->head.tqh_first, "cpu");
345  ConfNode *lnode;
346 
348 
349  TAILQ_FOREACH(lnode, &node->head, next)
350  {
351  uint8_t start, end;
352  if (strncmp(lnode->val, "all", 4) == 0) {
353  /* check that the sting in the config file is correctly specified */
354  if (cpu_spec != CONFIG_SPECIFIER_UNDEFINED) {
356  "Only one Napatech port specifier type allowed.");
357  exit(EXIT_FAILURE);
358  }
359  cpu_spec = CONFIG_SPECIFIER_RANGE;
360  worker_count = UtilCpuGetNumProcessorsConfigured();
361  } else if (strchr(lnode->val, '-')) {
362  /* check that the sting in the config file is correctly specified */
363  if (cpu_spec != CONFIG_SPECIFIER_UNDEFINED) {
365  "Only one Napatech port specifier type allowed.");
366  exit(EXIT_FAILURE);
367  }
368  cpu_spec = CONFIG_SPECIFIER_RANGE;
369 
370  char copystr[16];
371  strlcpy(copystr, lnode->val, 16);
372 
373  start = atoi(copystr);
374  end = atoi(strchr(copystr, '-') + 1);
375  worker_count = end - start + 1;
376 
377  } else {
378  /* check that the sting in the config file is correctly specified */
379  if (cpu_spec == CONFIG_SPECIFIER_RANGE) {
381  "Napatech port range specifiers cannot be combined with individual stream specifiers.");
382  exit(EXIT_FAILURE);
383  }
384  cpu_spec = CONFIG_SPECIFIER_INDIVIDUAL;
385  ++worker_count;
386  }
387  }
388  break;
389  }
390  }
391  }
392  return worker_count;
393 }
394 
396 {
397  int status;
398  char error_buffer[80]; // Error buffer
399  NtStatStream_t hStatStream;
400  NtStatistics_t hStat; // Stat handle.
401  NtInfoStream_t info_stream;
402  NtInfo_t info;
403  uint16_t instance_cnt = 0;
404  int use_all_streams = 0;
405  int set_cpu_affinity = 0;
406  ConfNode *ntstreams;
407  uint16_t stream_id = 0;
408  uint16_t start = 0;
409  uint16_t end = 0;
410 
411  for (uint16_t i = 0; i < MAX_STREAMS; ++i) {
412  stream_config[i].stream_id = 0;
413  stream_config[i].is_active = false;
414  stream_config[i].initialized = false;
415  }
416 
417  if (ConfGetBool("napatech.use-all-streams", &use_all_streams) == 0) {
418  /* default is "no" */
419  use_all_streams = 0;
420  }
421 
422  if ((status = NT_InfoOpen(&info_stream, "SuricataStreamInfo")) != NT_SUCCESS) {
423  NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
425  "NT_InfoOpen failed: %s", error_buffer);
426  return -1;
427  }
428 
429  if ((status = NT_StatOpen(&hStatStream, "StatsStream")) != NT_SUCCESS) {
430  /* Get the status code as text */
431  NT_ExplainError(status, error_buffer, sizeof (error_buffer));
432  SCLogError(SC_ERR_RUNMODE, "NT_StatOpen() failed: %s\n", error_buffer);
433  return -1;
434  }
435 
436  if (use_all_streams) {
437  info.cmd = NT_INFO_CMD_READ_STREAM;
438  if ((status = NT_InfoRead(info_stream, &info)) != NT_SUCCESS) {
439  NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
441  "NT_InfoRead failed: %s", error_buffer);
442  return -1;
443  }
444 
445  while (instance_cnt < info.u.stream.data.count) {
446 
447  /*
448  * For each stream ID query the number of host-buffers used by
449  * the stream. If zero, then that streamID is not used; skip
450  * over it and continue until we get a streamID with a non-zero
451  * count of the host-buffers.
452  */
453  memset(&hStat, 0, sizeof (NtStatistics_t));
454 
455  /* Read usage data for the chosen stream ID */
456  hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
457  hStat.u.usageData_v0.streamid = (uint8_t) stream_id;
458 
459  if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
460  /* Get the status code as text */
461  NT_ExplainError(status, error_buffer, sizeof (error_buffer));
463  "NT_StatRead() failed: %s\n", error_buffer);
464  return -1;
465  }
466 
467  if (hStat.u.usageData_v0.data.numHostBufferUsed == 0) {
468  ++stream_id;
469  continue;
470  }
471 
472  /* if we get here it is an active stream */
473  stream_config[instance_cnt].stream_id = stream_id++;
474  stream_config[instance_cnt].is_active = true;
475  instance_cnt++;
476  }
477 
478  } else {
479  ConfGetBool("threading.set-cpu-affinity", &set_cpu_affinity);
480  if (NapatechIsAutoConfigEnabled() && (set_cpu_affinity == 1)) {
481  start = 0;
482  end = CountWorkerThreads() - 1;
483  } else {
484  /* When not using the default streams we need to
485  * parse the array of streams from the conf */
486  if ((ntstreams = ConfGetNode("napatech.streams")) == NULL) {
488  "Failed retrieving napatech.streams from Config");
489  if (NapatechIsAutoConfigEnabled() && (set_cpu_affinity == 0)) {
491  "if set-cpu-affinity: no in conf then napatech.streams must be defined");
492  }
493  exit(EXIT_FAILURE);
494  }
495 
496  /* Loop through all stream numbers in the array and register the devices */
497  ConfNode *stream;
498  enum CONFIG_SPECIFIER stream_spec = CONFIG_SPECIFIER_UNDEFINED;
499  instance_cnt = 0;
500 
501  TAILQ_FOREACH(stream, &ntstreams->head, next)
502  {
503 
504  if (stream == NULL) {
506  "Couldn't Parse Stream Configuration");
507  return -1;
508  }
509 
510  if (strchr(stream->val, '-')) {
511  if (stream_spec != CONFIG_SPECIFIER_UNDEFINED) {
513  "Only one Napatech stream range specifier allowed.");
514  return -1;
515  }
516  stream_spec = CONFIG_SPECIFIER_RANGE;
517 
518  char copystr[16];
519  strlcpy(copystr, stream->val, 16);
520 
521  start = atoi(copystr);
522  end = atoi(strchr(copystr, '-') + 1);
523  } else {
524  if (stream_spec == CONFIG_SPECIFIER_RANGE) {
526  "Napatech range and individual specifiers cannot be combined.");
527  exit(EXIT_FAILURE);
528  }
529  stream_spec = CONFIG_SPECIFIER_INDIVIDUAL;
530 
531  stream_config[instance_cnt].stream_id = atoi(stream->val);
532  start = stream_config[instance_cnt].stream_id;
533  end = stream_config[instance_cnt].stream_id;
534  }
535  }
536  }
537 
538  for (stream_id = start; stream_id <= end; ++stream_id) {
539  /* if we get here it is configured in the .yaml file */
540  stream_config[instance_cnt].stream_id = stream_id;
541 
542  /* Check to see if it is an active stream */
543  memset(&hStat, 0, sizeof (NtStatistics_t));
544 
545  /* Read usage data for the chosen stream ID */
546  hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
547  hStat.u.usageData_v0.streamid =
548  (uint8_t) stream_config[instance_cnt].stream_id;
549 
550  if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
551  /* Get the status code as text */
552  NT_ExplainError(status, error_buffer, sizeof (error_buffer));
554  "NT_StatRead() failed: %s\n", error_buffer);
555  return -1;
556  }
557 
558  if (hStat.u.usageData_v0.data.numHostBufferUsed > 0) {
559  stream_config[instance_cnt].is_active = true;
560  }
561  instance_cnt++;
562  }
563  }
564 
565  /* Close the statistics stream */
566  if ((status = NT_StatClose(hStatStream)) != NT_SUCCESS) {
567  /* Get the status code as text */
568  NT_ExplainError(status, error_buffer, sizeof (error_buffer));
569  SCLogError(SC_ERR_RUNMODE, "NT_StatClose() failed: %s\n", error_buffer);
570  return -1;
571  }
572 
573  if ((status = NT_InfoClose(info_stream)) != NT_SUCCESS) {
574  NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
576  "NT_InfoClose failed: %s", error_buffer);
577  return -1;
578  }
579 
580  return instance_cnt;
581 }
582 
583 static void *NapatechBufMonitorLoop(void *arg)
584 {
585  ThreadVars *tv = (ThreadVars *) arg;
586 
587  NtInfo_t hStreamInfo;
588  NtStatistics_t hStat; // Stat handle.
589  NtInfoStream_t hInfo;
590  NtStatStream_t hStatStream;
591 
592  char error_buffer[NT_ERRBUF_SIZE]; // Error buffer
593  int status; // Status variable
594 
595  const uint32_t alertInterval = 25;
596 
597  uint32_t OB_fill_level[MAX_STREAMS] = {0};
598  uint32_t OB_alert_level[MAX_STREAMS] = {0};
599  uint32_t ave_OB_fill_level[MAX_STREAMS] = {0};
600 
601  uint32_t HB_fill_level[MAX_STREAMS] = {0};
602  uint32_t HB_alert_level[MAX_STREAMS] = {0};
603  uint32_t ave_HB_fill_level[MAX_STREAMS] = {0};
604 
605  /* Open the info and Statistics */
606  if ((status = NT_InfoOpen(&hInfo, "InfoStream")) != NT_SUCCESS) {
607  NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
609  "NT_InfoOpen() failed: %s\n", error_buffer);
610  exit(EXIT_FAILURE);
611  }
612 
613  if ((status = NT_StatOpen(&hStatStream, "StatsStream")) != NT_SUCCESS) {
614  /* Get the status code as text */
615  NT_ExplainError(status, error_buffer, sizeof (error_buffer));
617  "NT_StatOpen() failed: %s\n", error_buffer);
618  exit(EXIT_FAILURE);
619  }
620 
621  /* Read the info on all streams instantiated in the system */
622  hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM;
623  if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) {
624  NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
626  "NT_InfoRead() failed: %s\n", error_buffer);
627  exit(EXIT_FAILURE);
628  }
629 
630  NapatechStreamConfig registered_streams[MAX_STREAMS];
631  int num_registered = NapatechGetStreamConfig(registered_streams);
632  if (num_registered == -1) {
633  exit(EXIT_FAILURE);
634  }
635 
637  while (1) {
638  if (TmThreadsCheckFlag(tv, THV_KILL)) {
639  SCLogDebug("NapatechBufMonitorLoop THV_KILL detected");
640  break;
641  }
642 
643  usleep(200000);
644 
645  /* Read the info on all streams instantiated in the system */
646  hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM;
647  if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) {
648  NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
650  "NT_InfoRead() failed: %s\n", error_buffer);
651  exit(EXIT_FAILURE);
652  }
653 
654  char pktCntStr[4096];
655  memset(pktCntStr, 0, sizeof (pktCntStr));
656 
657  uint32_t stream_id = 0;
658  uint32_t stream_cnt = 0;
659  uint32_t num_streams = hStreamInfo.u.stream.data.count;
660 
661  for (stream_cnt = 0; stream_cnt < num_streams; ++stream_cnt) {
662 
663  do {
664 
665  /* Read usage data for the chosen stream ID */
666  hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
667  hStat.u.usageData_v0.streamid = (uint8_t) stream_id;
668 
669  if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
670  /* Get the status code as text */
671  NT_ExplainError(status, error_buffer, sizeof (error_buffer));
673  "NT_StatRead() failed: %s\n", error_buffer);
674  exit(EXIT_FAILURE);
675  }
676 
677  if (hStat.u.usageData_v0.data.numHostBufferUsed == 0) {
678  ++stream_id;
679  continue;
680  }
681  } while (hStat.u.usageData_v0.data.numHostBufferUsed == 0);
682 
683  if (RegisteredStream(stream_id, num_registered, registered_streams)) {
684  ave_OB_fill_level[stream_id] = 0;
685  ave_HB_fill_level[stream_id] = 0;
686 
687  for (uint32_t hb_count = 0;
688  hb_count < hStat.u.usageData_v0.data.numHostBufferUsed;
689  hb_count++) {
690 
691  OB_fill_level[hb_count] =
692  ((100 * hStat.u.usageData_v0.data.hb[hb_count].onboardBuffering.used) /
693  hStat.u.usageData_v0.data.hb[hb_count].onboardBuffering.size);
694 
695  if (OB_fill_level[hb_count] > 100) {
696  OB_fill_level[hb_count] = 100;
697  }
698 
699  uint32_t bufSize = hStat.u.usageData_v0.data.hb[hb_count].enQueuedAdapter / 1024
700  + hStat.u.usageData_v0.data.hb[hb_count].deQueued / 1024
701  + hStat.u.usageData_v0.data.hb[hb_count].enQueued / 1024
702  - HB_HIGHWATER;
703 
704  HB_fill_level[hb_count] = (uint32_t)
705  ((100 * hStat.u.usageData_v0.data.hb[hb_count].deQueued / 1024) /
706  bufSize);
707 
708  ave_OB_fill_level[stream_id] += OB_fill_level[hb_count];
709  ave_HB_fill_level[stream_id] += HB_fill_level[hb_count];
710  }
711 
712  ave_OB_fill_level[stream_id] /= hStat.u.usageData_v0.data.numHostBufferUsed;
713  ave_HB_fill_level[stream_id] /= hStat.u.usageData_v0.data.numHostBufferUsed;
714 
715  /* Host Buffer Fill Level warnings... */
716  if (ave_HB_fill_level[stream_id] >= (HB_alert_level[stream_id] + alertInterval)) {
717 
718  while (ave_HB_fill_level[stream_id] >= HB_alert_level[stream_id]
719  + alertInterval) {
720 
721  HB_alert_level[stream_id] += alertInterval;
722  }
723  SCLogInfo("nt%d - Increasing Host Buffer Fill Level : %4d%%",
724  stream_id, ave_HB_fill_level[stream_id] - 1);
725  }
726 
727  if (HB_alert_level[stream_id] > 0) {
728  if ((ave_HB_fill_level[stream_id] <= (HB_alert_level[stream_id] - alertInterval))) {
729  SCLogInfo("nt%d - Decreasing Host Buffer Fill Level: %4d%%",
730  stream_id, ave_HB_fill_level[stream_id]);
731 
732  while (ave_HB_fill_level[stream_id] <= (HB_alert_level[stream_id] - alertInterval)) {
733  if ((HB_alert_level[stream_id]) > 0) {
734  HB_alert_level[stream_id] -= alertInterval;
735  } else break;
736  }
737  }
738  }
739 
740  /* On Board SDRAM Fill Level warnings... */
741  if (ave_OB_fill_level[stream_id] >= (OB_alert_level[stream_id] + alertInterval)) {
742  while (ave_OB_fill_level[stream_id] >= OB_alert_level[stream_id] + alertInterval) {
743  OB_alert_level[stream_id] += alertInterval;
744 
745  }
746  SCLogInfo("nt%d - Increasing Adapter SDRAM Fill Level: %4d%%",
747  stream_id, ave_OB_fill_level[stream_id]);
748  }
749 
750  if (OB_alert_level[stream_id] > 0) {
751  if ((ave_OB_fill_level[stream_id] <= (OB_alert_level[stream_id] - alertInterval))) {
752  SCLogInfo("nt%d - Decreasing Adapter SDRAM Fill Level : %4d%%",
753  stream_id, ave_OB_fill_level[stream_id]);
754 
755  while (ave_OB_fill_level[stream_id] <= (OB_alert_level[stream_id] - alertInterval)) {
756  if ((OB_alert_level[stream_id]) > 0) {
757  OB_alert_level[stream_id] -= alertInterval;
758  } else break;
759  }
760  }
761  }
762  }
763  ++stream_id;
764  }
765  }
766 
767  if ((status = NT_InfoClose(hInfo)) != NT_SUCCESS) {
768  NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
769  SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoClose() failed: %s\n", error_buffer);
770  exit(EXIT_FAILURE);
771  }
772 
773  /* Close the statistics stream */
774  if ((status = NT_StatClose(hStatStream)) != NT_SUCCESS) {
775  /* Get the status code as text */
776  NT_ExplainError(status, error_buffer, sizeof (error_buffer));
777  SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatClose() failed: %s\n", error_buffer);
778  exit(EXIT_FAILURE);
779  }
780 
781  SCLogDebug("Exiting NapatechStatsLoop");
785 
786  return NULL;
787 }
788 
790 {
791  /* Creates the Statistic threads */
792  ThreadVars *stats_tv = TmThreadCreate("NapatechStats",
793  NULL, NULL,
794  NULL, NULL,
795  "custom", NapatechStatsLoop, 0);
796 
797  if (stats_tv == NULL) {
799  "Error creating a thread for NapatechStats - Killing engine.");
800  exit(EXIT_FAILURE);
801  }
802 
803  if (TmThreadSpawn(stats_tv) != 0) {
805  "Failed to spawn thread for NapatechStats - Killing engine.");
806  exit(EXIT_FAILURE);
807  }
808 
809  ThreadVars *buf_monitor_tv = TmThreadCreate("NapatechBufMonitor",
810  NULL, NULL,
811  NULL, NULL,
812  "custom", NapatechBufMonitorLoop, 0);
813 
814  if (buf_monitor_tv == NULL) {
816  "Error creating a thread for NapatechBufMonitor - Killing engine.");
817  exit(EXIT_FAILURE);
818  }
819 
820  if (TmThreadSpawn(buf_monitor_tv) != 0) {
822  "Failed to spawn thread for NapatechBufMonitor - Killing engine.");
823  exit(EXIT_FAILURE);
824  }
825 
826 
827  return;
828 }
829 
830 bool NapatechSetupNuma(uint32_t stream, uint32_t numa)
831 {
832  uint32_t status = 0;
833  static NtConfigStream_t hconfig;
834 
835  char ntpl_cmd[64];
836  snprintf(ntpl_cmd, 64, "setup[numanode=%d] = streamid == %d", numa, stream);
837 
838  NtNtplInfo_t ntpl_info;
839 
840  if ((status = NT_ConfigOpen(&hconfig, "ConfigStream")) != NT_SUCCESS) {
841 
843  return false;
844  }
845 
846  if ((status = NT_NTPL(hconfig, ntpl_cmd, &ntpl_info, NT_NTPL_PARSER_VALIDATE_NORMAL)) == NT_SUCCESS) {
847  status = ntpl_info.ntplId;
848 
849  } else {
850  NAPATECH_NTPL_ERROR(ntpl_cmd, ntpl_info, status);
851  return false;
852  }
853 
854  return status;
855 }
856 
857 static bool NapatechSetHashmode(uint32_t *filter_id)
858 {
859  uint32_t status = 0;
860  const char *hash_mode;
861  static NtConfigStream_t hconfig;
862  char ntpl_cmd[64];
863  NtNtplInfo_t ntpl_info;
864 
865  *filter_id = 0;
866 
867  /* Get the hashmode from the conf file. */
868  ConfGetValue("napatech.hashmode", &hash_mode);
869 
870  snprintf(ntpl_cmd, 64, "hashmode = %s", hash_mode);
871 
872  /* Issue the NTPL command */
873  if ((status = NT_ConfigOpen(&hconfig, "ConfigStream")) != NT_SUCCESS) {
875  return false;
876  }
877 
878  if ((status = NT_NTPL(hconfig, ntpl_cmd, &ntpl_info,
879  NT_NTPL_PARSER_VALIDATE_NORMAL)) == NT_SUCCESS) {
880  *filter_id = ntpl_info.ntplId;
881  SCLogInfo("Napatech hashmode: %s ID: %d", hash_mode, status);
882  } else {
883  NAPATECH_NTPL_ERROR(ntpl_cmd, ntpl_info, status);
884  status = 0;
885  }
886 
887  return status;
888 }
889 
890 static uint32_t GetStreamNUMAs(uint32_t stream_id, int stream_numas[])
891 {
892  NtStatistics_t hStat; // Stat handle.
893  NtStatStream_t hStatStream;
894  int status; // Status variable
895 
896  for (int i = 0; i < MAX_HOSTBUFFERS; ++i)
897  stream_numas[i] = -1;
898 
899  if ((status = NT_StatOpen(&hStatStream, "StatsStream")) != NT_SUCCESS) {
901  exit(EXIT_FAILURE);
902  }
903 
904  char pktCntStr[4096];
905  memset(pktCntStr, 0, sizeof (pktCntStr));
906 
907 
908  /* Read usage data for the chosen stream ID */
909  hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
910  hStat.u.usageData_v0.streamid = (uint8_t) stream_id;
911 
912  if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
914  exit(EXIT_FAILURE);
915  }
916 
917  for (uint32_t hb_id = 0; hb_id < hStat.u.usageData_v0.data.numHostBufferUsed; ++hb_id) {
918  stream_numas[hb_id] = hStat.u.usageData_v0.data.hb[hb_id].numaNode;
919  }
920 
921  return hStat.u.usageData_v0.data.numHostBufferUsed;
922 }
923 
924 uint32_t NapatechSetupTraffic(uint32_t first_stream, uint32_t last_stream,
925  uint32_t *filter_id, uint32_t *hash_id)
926 {
927 #define PORTS_SPEC_SIZE 64
928 
929  char ports_spec[PORTS_SPEC_SIZE];
930  ConfNode *ntports;
931  bool first_iteration = true;
932  int status = 0;
933  static NtConfigStream_t hconfig;
934  char ntpl_cmd[128];
935 
936  NapatechSetHashmode(hash_id);
937 
938  /* When not using the default streams we need to parse
939  * the array of streams from the conf
940  */
941  if ((ntports = ConfGetNode("napatech.ports")) == NULL) {
942  SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.ports from Conf");
943  exit(EXIT_FAILURE);
944  }
945 
946  /* Loop through all ports in the array */
947  ConfNode *port;
948  enum CONFIG_SPECIFIER stream_spec = CONFIG_SPECIFIER_UNDEFINED;
949 
950  /* Build the NTPL command using values in the config file. */
951  TAILQ_FOREACH(port, &ntports->head, next)
952  {
953  if (port == NULL) {
955  "Couldn't Parse Port Configuration");
956  exit(EXIT_FAILURE);
957  }
958 
959  uint8_t start, end;
960  if (strncmp(port->val, "all", 3) == 0) {
961  /* check that the sting in the config file is correctly specified */
962  if (stream_spec != CONFIG_SPECIFIER_UNDEFINED) {
964  "Only one Napatech port specifier type allowed.");
965  exit(EXIT_FAILURE);
966  }
967  stream_spec = CONFIG_SPECIFIER_RANGE;
968 
969  snprintf(ports_spec, sizeof(ports_spec), "all");
970  } else if (strchr(port->val, '-')) {
971  /* check that the sting in the config file is correctly specified */
972  if (stream_spec != CONFIG_SPECIFIER_UNDEFINED) {
974  "Only one Napatech port specifier type allowed.");
975  exit(EXIT_FAILURE);
976  }
977  stream_spec = CONFIG_SPECIFIER_RANGE;
978 
979  char copystr[16];
980  strlcpy(copystr, port->val, sizeof(copystr));
981 
982  start = atoi(copystr);
983  end = atoi(strchr(copystr, '-') + 1);
984  snprintf(ports_spec, sizeof(ports_spec), "port == (%d..%d)", start, end);
985 
986  } else {
987  /* check that the sting in the config file is correctly specified */
988  if (stream_spec == CONFIG_SPECIFIER_RANGE) {
990  "Napatech port range specifiers cannot be combined with individual stream specifiers.");
991  exit(EXIT_FAILURE);
992  }
993  stream_spec = CONFIG_SPECIFIER_INDIVIDUAL;
994 
995  /* Determine the ports to use on the NTPL assign statement*/
996  if (first_iteration) {
997  snprintf(ports_spec, sizeof(ports_spec), "port==%s", port->val);
998  first_iteration = false;
999  } else {
1000  char temp[PORTS_SPEC_SIZE];
1001  snprintf(temp, sizeof(temp), "%s,%s",ports_spec,port->val);
1002  snprintf(ports_spec, sizeof(ports_spec), "%s", temp);
1003  }
1004  }
1005  }
1006 
1007  /* Build the NTPL command */
1008  snprintf(ntpl_cmd, sizeof(ntpl_cmd), "assign[streamid=(%d..%d)] = %s",
1009  first_stream, last_stream, ports_spec);
1010 
1011  NtNtplInfo_t ntpl_info;
1012 
1013  if ((status = NT_ConfigOpen(&hconfig, "ConfigStream")) != NT_SUCCESS) {
1015  exit(EXIT_FAILURE);
1016  }
1017 
1018  if ((status = NT_NTPL(hconfig, ntpl_cmd, &ntpl_info,
1019  NT_NTPL_PARSER_VALIDATE_NORMAL)) == NT_SUCCESS) {
1020  *filter_id = ntpl_info.ntplId;
1021  status = ntpl_info.u.errorData.errCode;
1022  SCLogInfo("NTPL filter assignment \"%s\" returned filter id %4d",
1023  ntpl_cmd, *filter_id);
1024  } else {
1025  NAPATECH_NTPL_ERROR(ntpl_cmd, ntpl_info, status);
1026  status = ntpl_info.u.errorData.errCode;
1027  return false;
1028  }
1029 
1030  SCLogInfo("Host-buffer NUMA assignments: ");
1031  int numa_nodes[MAX_HOSTBUFFERS];
1032  uint32_t stream_id;
1033  for (stream_id = first_stream; stream_id < last_stream; ++stream_id) {
1034  char temp1[256];
1035  char temp2[256];
1036 
1037  uint32_t num_host_buffers = GetStreamNUMAs(stream_id, numa_nodes);
1038 
1039  snprintf(temp1, 256, " stream %d:", stream_id);
1040 
1041  for (uint32_t hb_id = 0; hb_id < num_host_buffers; ++hb_id) {
1042  snprintf(temp2, 256, "%s %d ", temp1, numa_nodes[hb_id]);
1043  snprintf(temp1, 256, "%s", temp2);
1044  }
1045 
1046  SCLogInfo("%s", temp1);
1047  }
1048 
1049  return status;
1050 }
1051 
1052 bool NapatechDeleteFilter(uint32_t filter_id)
1053 {
1054  uint32_t status = 0;
1055  static NtConfigStream_t hconfig;
1056  char ntpl_cmd[64];
1057  NtNtplInfo_t ntpl_info;
1058 
1059  /* issue an NTPL command to delete the filter */
1060  snprintf(ntpl_cmd, 64, "delete = %d", filter_id);
1061 
1062  if ((status = NT_ConfigOpen(&hconfig, "ConfigStream")) != NT_SUCCESS) {
1064  exit(EXIT_FAILURE);
1065  }
1066 
1067  if ((status = NT_NTPL(hconfig, ntpl_cmd, &ntpl_info,
1068  NT_NTPL_PARSER_VALIDATE_NORMAL)) == NT_SUCCESS) {
1069  status = ntpl_info.ntplId;
1070  SCLogInfo("Removed Napatech filter %d. ", filter_id);
1071  } else {
1072  NAPATECH_NTPL_ERROR(ntpl_cmd, ntpl_info, status);
1073  status = 0;
1074  }
1075 
1076  return status;
1077 }
1078 
1079 #endif // HAVE_NAPATECH
struct StreamCounters_ StreamCounters
bool NapatechSetupNuma(uint32_t stream, uint32_t numa)
#define SCLogDebug(...)
Definition: util-debug.h:335
#define PORTS_SPEC_SIZE
uint16_t UtilCpuGetNumProcessorsConfigured(void)
Get the number of cpus configured in the system.
Definition: util-cpu.c:58
#define TAILQ_FOREACH(var, head, field)
Definition: queue.h:350
struct HtpBodyChunk_ * next
NapatechCurrentStats NapatechGetCurrentStats(uint16_t id)
Definition: util-napatech.c:50
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: util-strlcpyu.c:43
TcpStreamCnf stream_config
Definition: stream-tcp.h:106
uint32_t NapatechSetupTraffic(uint32_t first_stream, uint32_t last_stream, uint32_t *filter_id, uint32_t *hash_id)
#define THV_RUNNING_DONE
Definition: threadvars.h:45
#define unlikely(expr)
Definition: util-optimize.h:35
int ConfGetBool(const char *name, int *val)
Retrieve a configuration value as an boolen.
Definition: conf.c:517
NapatechCurrentStats current_stats[MAX_STREAMS]
Definition: util-napatech.c:48
char * val
Definition: conf.h:34
ConfNode * ConfNodeLookupChild(const ConfNode *node, const char *name)
Lookup a child configuration node by name.
Definition: conf.c:815
int NapatechGetStreamConfig(NapatechStreamConfig stream_config[])
uint16_t StatsRegisterCounter(const char *name, struct ThreadVars_ *tv)
Registers a normal, unqualified counter.
Definition: counters.c:939
void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag)
Set a thread flag.
Definition: tm-threads.c:98
#define SCCalloc(nm, a)
Definition: util-mem.h:197
void TmThreadWaitForFlag(ThreadVars *tv, uint16_t flags)
Waits till the specified flag(s) is(are) set. We don&#39;t bother if the kill flag has been set or not on...
Definition: tm-threads.c:1983
void StatsSetUI64(ThreadVars *tv, uint16_t id, uint64_t x)
Sets a value of type double to the local counter.
Definition: counters.c:185
#define THV_INIT_DONE
Definition: threadvars.h:36
#define THV_CLOSED
Definition: threadvars.h:41
#define SCLogError(err_code,...)
Macro used to log ERROR messages.
Definition: util-debug.h:294
#define NAPATECH_NTPL_ERROR(ntpl_cmd, ntpl_info, status)
Definition: util-napatech.h:60
#define HB_HIGHWATER
bool NapatechIsAutoConfigEnabled(void)
#define NAPATECH_ERROR(err_type, status)
Definition: util-napatech.h:54
int ConfGetValue(const char *name, const char **vptr)
Retrieve the value of a configuration node.
Definition: conf.c:360
#define THV_KILL
Definition: threadvars.h:39
#define SCLogWarning(err_code,...)
Macro used to log WARNING messages.
Definition: util-debug.h:281
Definition: conf.h:32
#define SCLogInfo(...)
Macro used to log INFORMATIONAL messages.
Definition: util-debug.h:254
#define MAX_HOSTBUFFERS
Definition: util-napatech.c:62
CONFIG_SPECIFIER
Definition: util-napatech.c:56
void NapatechStartStats(void)
#define StatsSyncCountersIfSignalled(tv)
Definition: counters.h:136
int StatsSetupPrivate(ThreadVars *tv)
Definition: counters.c:1201
int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag)
Check if a thread flag is set.
Definition: tm-threads.c:90
ConfNode * ConfGetNode(const char *name)
Get a ConfNode by name.
Definition: conf.c:176
#define MAX_STREAMS
bool NapatechDeleteFilter(uint32_t filter_id)
#define THV_DEINIT
Definition: threadvars.h:44
Per thread variable structure.
Definition: threadvars.h:57
TmEcode TmThreadSpawn(ThreadVars *tv)
Spawns a thread associated with the ThreadVars instance tv.
Definition: tm-threads.c:1867
ThreadVars * TmThreadCreate(const char *name, const char *inq_name, const char *inqh_name, const char *outq_name, const char *outqh_name, const char *slots, void *(*fn_p)(void *), int mucond)
Creates and returns the TV instance for a new thread.
Definition: tm-threads.c:1116