1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use crate::element::*;
use crate::error::Error;
use common::{MsgReceiver, MsgType, Pipeline};
use serde::Deserialize;
use serde_with::{serde_as, DurationMilliSecondsWithFrac};
use std::io::Write;
use std::time::{Duration, Instant};
pub struct StatFilterElement {
conf: StatFilterElementConf,
count: usize,
total_msg_size: usize,
before: Instant,
}
#[serde_as]
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct StatFilterElementConf {
#[serde_as(as = "DurationMilliSecondsWithFrac<f64>")]
#[serde(alias = "duration_ms")]
pub interval_ms: Duration,
}
impl ElementBuildable for StatFilterElement {
type Config = StatFilterElementConf;
const NAME: &'static str = "stat-filter";
const RECV_PORTS: Port = 1;
const SEND_PORTS: Port = 1;
fn acceptable_msg_types() -> Vec<Vec<MsgType>> {
vec![vec![MsgType::any()]]
}
fn new(conf: Self::Config) -> Result<Self, Error> {
Ok(StatFilterElement {
conf,
count: 0,
total_msg_size: 0,
before: Instant::now(),
})
}
fn next(&mut self, pipeline: &mut Pipeline, receiver: &mut MsgReceiver) -> ElementResult {
let msg = receiver.recv(0)?;
let msg_size = msg.as_bytes().len();
self.count += 1;
self.total_msg_size += msg_size;
let now = Instant::now();
let since = now.duration_since(self.before);
if since > self.conf.interval_ms {
eprintln!(
"count = {}, total_msg_size = {}",
self.count, self.total_msg_size
);
self.before = now;
}
let mut buf = pipeline.msg_buf(0);
buf.write_all(msg.as_bytes())?;
Ok(ElementValue::MsgBuf)
}
}