Revive postingest

This commit is contained in:
Dominik Werder
2024-06-20 00:34:48 +02:00
parent ebc623436e
commit 995defaff3
18 changed files with 508 additions and 124 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.1-aa.0"
version = "0.2.1-aa.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
@@ -17,7 +17,7 @@ tokio-postgres = "0.7.10"
async-channel = "2.2.0"
futures-util = "0.3"
chrono = "0.4"
bytes = "1.5.0"
bytes = "1.6.0"
libc = "0.2"
err = { path = "../../daqbuffer/crates/err" }
netpod = { path = "../../daqbuffer/crates/netpod" }

View File

@@ -89,7 +89,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
netfetch::ca::search::ca_search(conf, &channels).await?
}
ChannelAccess::CaIngest(k) => {
info!("daqingest version {} +0003", clap::crate_version!());
info!("daqingest version {} +0004", clap::crate_version!());
let (conf, channels_config) = parse_config(k.config.into()).await?;
daqingest::daemon::run(conf, channels_config).await?
}

View File

@@ -14,6 +14,7 @@ use netfetch::conf::ChannelConfig;
use netfetch::conf::ChannelsConfig;
use netfetch::daemon_common::Channel;
use netfetch::daemon_common::DaemonEvent;
use netfetch::metrics::RoutesResources;
use netfetch::metrics::StatsSet;
use netfetch::throttletrace::ThrottleTrace;
use netpod::ttl::RetentionTime;
@@ -603,6 +604,14 @@ impl Daemon {
let connset_cmd_tx = self.connset_ctrl.sender().clone();
let ca_conn_stats = self.connset_ctrl.ca_conn_stats().clone();
let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone()));
let rres = RoutesResources::new(
self.ingest_opts.backend().into(),
self.channel_info_query_tx.clone(),
self.iqtx
.take()
.ok_or_else(|| Error::with_msg_no_trace("no iqtx available"))?,
);
let rres = Arc::new(rres);
let metrics_jh = {
let conn_set_stats = self.connset_ctrl.stats().clone();
let stats_set = StatsSet::new(
@@ -621,6 +630,7 @@ impl Daemon {
connset_cmd_tx,
stats_set,
self.metrics_shutdown_rx.clone(),
rres,
);
tokio::task::spawn(fut)
};
@@ -634,7 +644,7 @@ impl Daemon {
let (_item_tx, item_rx) = async_channel::bounded(256);
let info_worker_tx = self.channel_info_query_tx.clone();
use netfetch::metrics::postingest::process_api_query_items;
let iqtx = self.iqtx.take().unwrap();
let iqtx = self.iqtx.clone().unwrap();
let worker_fut = process_api_query_items(backend, item_rx, info_worker_tx, iqtx);
taskrun::spawn(worker_fut)
};

View File

@@ -164,6 +164,7 @@ fn table_name_from_type(scalar_type: &ScalarType, shape: &Shape) -> &'static str
ScalarType::F64 => todo!(),
ScalarType::BOOL => todo!(),
ScalarType::STRING => todo!(),
ScalarType::Enum => todo!(),
ScalarType::ChannelStatus => todo!(),
},
Shape::Wave(_) => match scalar_type {
@@ -179,6 +180,7 @@ fn table_name_from_type(scalar_type: &ScalarType, shape: &Shape) -> &'static str
ScalarType::F64 => todo!(),
ScalarType::BOOL => todo!(),
ScalarType::STRING => todo!(),
ScalarType::Enum => todo!(),
ScalarType::ChannelStatus => todo!(),
},
Shape::Image(_, _) => todo!(),

View File

@@ -157,8 +157,11 @@ impl Worker {
" as inp (rid, backend, channel, kind)",
")",
" select q1.rid, t.series, t.scalar_type, t.shape_dims, t.tscs, t.kind from q1",
" join series_by_channel t on t.facility = q1.backend and t.channel = q1.channel",
" and t.kind = q1.kind and t.agg_kind = 0",
" join series_by_channel t",
" on t.facility = q1.backend",
" and t.channel = q1.channel",
" and t.kind = q1.kind",
" and t.agg_kind = 0",
" order by q1.rid",
);
let qu_select = pg
@@ -224,6 +227,7 @@ impl Worker {
match self.pg.execute("commit", &[]).await {
Ok(n) => {
let dt = ts1.elapsed();
self.stats.commit_duration_ms().ingest((1e3 * dt.as_secs_f32()) as u32);
if dt > Duration::from_millis(40) {
debug!("commit {} {:.0} ms", n, dt.as_secs_f32());
}
@@ -345,15 +349,18 @@ impl Worker {
for (&rid, job) in rids.iter().zip(jobs.into_iter()) {
loop {
break if let Some(row) = &row_opt {
if row.get::<_, i32>(0) == rid {
let series = SeriesId::new(row.get::<_, i64>(1) as _);
let rid2: i32 = row.get(0);
if rid2 == rid {
let series: i64 = row.get(1);
let series = SeriesId::new(series as _);
let shape_dims: Vec<i32> = row.get(3);
let scalar_type = ScalarType::from_scylla_i32(row.get(2)).map_err(|_| Error::ScalarType)?;
let shape_dims = Shape::from_scylla_shape_dims(row.get::<_, Vec<i32>>(3).as_slice())
.map_err(|_| Error::Shape)?;
let shape_dims =
Shape::from_scylla_shape_dims(shape_dims.as_slice()).map_err(|_| Error::Shape)?;
let tscs: Vec<DateTime<Utc>> = row.get(4);
let kind: i16 = row.get(5);
let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::ScalarType)?;
if job.channel == "TEST:MEDIUM:WAVE-01024:F32:000000"
if true && job.channel == "TEST:MEDIUM:WAVE-01024:F32:000000"
|| series == SeriesId::new(1605348259462543621)
{
debug!(
@@ -361,7 +368,7 @@ impl Worker {
rid, series, scalar_type, shape_dims, tscs, kind
);
}
acc.push((rid, series, scalar_type, shape_dims, tscs));
acc.push((rid, series, kind, scalar_type, shape_dims, tscs));
row_opt = row_it.next();
continue;
}
@@ -370,7 +377,7 @@ impl Worker {
// debug!("check for {job:?}");
// TODO call decide with empty accumulator: will result in DoesntExist.
let v = std::mem::replace(&mut acc, Vec::new());
let dec = Self::decide_matching_via_db(job.scalar_type.clone(), job.shape.clone(), v)?;
let dec = Self::decide_matching_via_db(&job.scalar_type, &job.shape, v)?;
// debug!("decision {dec:?}");
result.push(FoundResult { job, status: dec });
}
@@ -378,24 +385,29 @@ impl Worker {
}
fn decide_matching_via_db(
scalar_type: ScalarType,
shape: Shape,
acc: Vec<(i32, SeriesId, ScalarType, Shape, Vec<DateTime<Utc>>)>,
scalar_type: &ScalarType,
shape: &Shape,
acc: Vec<(i32, SeriesId, SeriesKind, ScalarType, Shape, Vec<DateTime<Utc>>)>,
) -> Result<MatchingSeries, Error> {
let a2 = acc.iter().map(|x| &x.4).collect();
let a2 = acc.iter().map(|x| &x.5).collect();
Self::assert_order(a2)?;
let unfolded = Self::unfold_series_rows(acc)?;
Self::assert_varying_types(&unfolded)?;
// TODO do database cleanup and enable again
if false {
Self::assert_varying_types(&unfolded)?;
}
if let Some(last) = unfolded.last() {
if last.1 == scalar_type && last.2 == shape {
if last.2 == *scalar_type && shape_equiv(&last.3, &shape) {
Ok(MatchingSeries::Latest(last.0.clone()))
} else {
let mut ret = MatchingSeries::DoesntExist;
for e in unfolded.into_iter().rev() {
if e.1 == scalar_type && e.2 == shape {
return Ok(MatchingSeries::UsedBefore(e.0.clone()));
if e.2 == *scalar_type && shape_equiv(&e.3, &shape) {
ret = MatchingSeries::UsedBefore(e.0.clone());
break;
}
}
Ok(MatchingSeries::DoesntExist)
Ok(ret)
}
} else {
Ok(MatchingSeries::DoesntExist)
@@ -403,15 +415,15 @@ impl Worker {
}
fn unfold_series_rows(
acc: Vec<(i32, SeriesId, ScalarType, Shape, Vec<DateTime<Utc>>)>,
) -> Result<Vec<(SeriesId, ScalarType, Shape, DateTime<Utc>)>, Error> {
acc: Vec<(i32, SeriesId, SeriesKind, ScalarType, Shape, Vec<DateTime<Utc>>)>,
) -> Result<Vec<(SeriesId, SeriesKind, ScalarType, Shape, DateTime<Utc>)>, Error> {
let mut ret = Vec::new();
for g in acc.iter() {
for h in g.4.iter() {
ret.push((g.1.clone(), g.2.clone(), g.3.clone(), *h));
for h in g.5.iter() {
ret.push((g.1.clone(), g.2.clone(), g.3.clone(), g.4.clone(), *h));
}
}
ret.sort_by(|a, b| a.cmp(b));
ret.sort_by(|a, b| a.4.cmp(&b.4));
Ok(ret)
}
@@ -432,7 +444,7 @@ impl Worker {
Ok(())
}
fn assert_varying_types(v: &Vec<(SeriesId, ScalarType, Shape, DateTime<Utc>)>) -> Result<(), Error> {
fn assert_varying_types(v: &Vec<(SeriesId, SeriesKind, ScalarType, Shape, DateTime<Utc>)>) -> Result<(), Error> {
if v.len() > 1 {
let mut z_0 = &v[0].0;
let mut z_1 = &v[0].1;
@@ -471,9 +483,6 @@ impl Worker {
h
};
let x = (backend, channel, kind, scalar_type.to_scylla_i32(), shape, hasher);
if channel == "TEST:MEDIUM:WAVE-01024:F32:000000" {
debug!("INSERT {x:?}");
}
x
})
.fold(
@@ -562,6 +571,23 @@ impl Worker {
}
}
fn shape_equiv(a: &Shape, b: &Shape) -> bool {
match a {
Shape::Scalar => match b {
Shape::Scalar => true,
_ => false,
},
Shape::Wave(_) => match b {
Shape::Wave(_) => true,
_ => false,
},
Shape::Image(_, _) => match b {
Shape::Image(_, _) => true,
_ => false,
},
}
}
pub trait HashSalter {
fn hupd(hupd: &mut dyn FnMut(&[u8]), i1: u16, i2: u16);
}

View File

@@ -12,6 +12,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11"
serde_yaml = "0.9.16"
ciborium = "0.2.2"
tokio-stream = { version = "0.1", features = ["fs"] }
tracing = "0.1.37"
async-channel = "2.0.0"
@@ -22,8 +23,8 @@ futures-util = "0.3"
md-5 = "0.10.5"
hex = "0.4.3"
regex = "1.8.4"
axum = "0.6.18"
http-body = "0.4"
axum = "0.7.5"
http-body = "1"
url = "2.2"
hyper = "0.14"
chrono = "0.4"

View File

@@ -165,6 +165,7 @@ pub enum ChannelConnectedInfo {
#[derive(Clone, Debug, Serialize)]
pub struct ChannelStateInfo {
pub stnow: SystemTime,
pub cssid: ChannelStatusSeriesId,
pub addr: SocketAddrV4,
pub series: Option<SeriesId>,
@@ -184,6 +185,10 @@ pub struct ChannelStateInfo {
pub item_recv_ivl_ema: Option<f32>,
pub interest_score: f32,
pub conf: ChannelConfig,
pub recv_last: SystemTime,
pub write_st_last: SystemTime,
pub write_mt_last: SystemTime,
pub write_lt_last: SystemTime,
}
mod ser_instant {
@@ -358,6 +363,7 @@ struct CreatedState {
ts_alive_last: Instant,
// Updated on monitoring, polling or when the channel config changes to reset the timeout
ts_activity_last: Instant,
st_activity_last: SystemTime,
ts_msp_last: u64,
ts_msp_grid_last: u32,
inserted_in_ts_msp: u64,
@@ -374,11 +380,15 @@ struct CreatedState {
account_emit_last: TsMs,
account_count: u64,
account_bytes: u64,
dw_st_last: SystemTime,
dw_mt_last: SystemTime,
dw_lt_last: SystemTime,
}
impl CreatedState {
fn dummy() -> Self {
let tsnow = Instant::now();
let stnow = SystemTime::now();
Self {
cssid: ChannelStatusSeriesId::new(0),
cid: Cid(0),
@@ -388,6 +398,7 @@ impl CreatedState {
ts_created: tsnow,
ts_alive_last: tsnow,
ts_activity_last: tsnow,
st_activity_last: stnow,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: 0,
@@ -404,6 +415,9 @@ impl CreatedState {
account_emit_last: TsMs(0),
account_count: 0,
account_bytes: 0,
dw_st_last: SystemTime::UNIX_EPOCH,
dw_mt_last: SystemTime::UNIX_EPOCH,
dw_lt_last: SystemTime::UNIX_EPOCH,
}
}
}
@@ -438,7 +452,13 @@ impl ChannelConf {
}
impl ChannelState {
fn to_info(&self, cssid: ChannelStatusSeriesId, addr: SocketAddrV4, conf: ChannelConfig) -> ChannelStateInfo {
fn to_info(
&self,
cssid: ChannelStatusSeriesId,
addr: SocketAddrV4,
conf: ChannelConfig,
stnow: SystemTime,
) -> ChannelStateInfo {
let channel_connected_info = match self {
ChannelState::Init(..) => ChannelConnectedInfo::Disconnected,
ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting,
@@ -472,6 +492,19 @@ impl ChannelState {
ChannelState::Writable(s) => Some(s.channel.recv_bytes),
_ => None,
};
let (recv_last, write_st_last, write_mt_last, write_lt_last) = match self {
ChannelState::Writable(s) => {
let a = s.channel.st_activity_last;
let b = s.channel.dw_st_last;
let c = s.channel.dw_mt_last;
let d = s.channel.dw_lt_last;
(a, b, c, d)
}
_ => {
let a = SystemTime::UNIX_EPOCH;
(a, a, a, a)
}
};
let item_recv_ivl_ema = match self {
ChannelState::Writable(s) => {
let ema = s.channel.item_recv_ivl_ema.ema();
@@ -489,6 +522,7 @@ impl ChannelState {
};
let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10);
ChannelStateInfo {
stnow,
cssid,
addr,
series,
@@ -502,6 +536,10 @@ impl ChannelState {
item_recv_ivl_ema,
interest_score,
conf,
recv_last,
write_st_last,
write_mt_last,
write_lt_last,
}
}
@@ -749,7 +787,8 @@ pub enum CaConnEventValue {
pub enum EndOfStreamReason {
UnspecifiedReason,
Error(Error),
ConnectFail,
ConnectRefused,
ConnectTimeout,
OnCommand,
RemoteClosed,
IocTimeout,
@@ -910,8 +949,12 @@ impl CaConn {
fn trigger_shutdown(&mut self, reason: ShutdownReason) {
let channel_reason = match &reason {
ShutdownReason::ConnectFail => {
self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectFail);
ShutdownReason::ConnectRefused => {
self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectRefused);
ChannelStatusClosedReason::ConnectFail
}
ShutdownReason::ConnectTimeout => {
self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectTimeout);
ChannelStatusClosedReason::ConnectFail
}
ShutdownReason::IoError => {
@@ -1319,7 +1362,7 @@ impl CaConn {
// return Err(Error::with_msg_no_trace());
return Ok(());
};
// debug!("handle_event_add_res {ev:?}");
trace!("handle_event_add_res {:?}", ch_s.cssid());
match ch_s {
ChannelState::Writable(st) => {
// debug!(
@@ -1608,10 +1651,10 @@ impl CaConn {
);
crst.ts_alive_last = tsnow;
crst.ts_activity_last = tsnow;
crst.st_activity_last = stnow;
crst.item_recv_ivl_ema.tick(tsnow);
crst.recv_count += 1;
crst.recv_bytes += payload_len as u64;
let series = writer.sid();
// TODO should attach these counters already to Writable state.
let ts_local = {
let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO);
@@ -1631,7 +1674,16 @@ impl CaConn {
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
{
let val: DataValue = value.data.into();
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?;
let ((dwst, dwmt, dwlt),) = writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?;
if dwst {
crst.dw_st_last = stnow;
}
if dwmt {
crst.dw_mt_last = stnow;
}
if dwlt {
crst.dw_lt_last = stnow;
}
}
}
if false {
@@ -1641,6 +1693,7 @@ impl CaConn {
if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) {
crst.insert_recv_ivl_last = tsnow;
let ema = crst.insert_item_ivl_ema.ema();
let _ = ema;
}
if crst.muted_before == 0 {}
crst.muted_before = 1;
@@ -1668,6 +1721,7 @@ impl CaConn {
},
CaDataScalarValue::I16(..) => match &scalar_type {
ScalarType::I16 => {}
ScalarType::Enum => {}
_ => {
error!("MISMATCH got i16 exp {:?}", scalar_type);
}
@@ -1998,7 +2052,8 @@ impl CaConn {
// TODO count this unexpected case.
}
CaMsgTy::CreateChanRes(k) => {
self.handle_create_chan_res(k, tsnow)?;
let stnow = SystemTime::now();
self.handle_create_chan_res(k, tsnow, stnow)?;
cx.waker().wake_by_ref();
}
CaMsgTy::EventAddRes(ev) => {
@@ -2095,7 +2150,12 @@ impl CaConn {
res.map_err(Into::into)
}
fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> {
fn handle_create_chan_res(
&mut self,
k: proto::CreateChanRes,
tsnow: Instant,
stnow: SystemTime,
) -> Result<(), Error> {
let cid = Cid(k.cid);
let sid = Sid(k.sid);
let conf = if let Some(x) = self.channels.get_mut(&cid) {
@@ -2132,6 +2192,7 @@ impl CaConn {
ts_created: tsnow,
ts_alive_last: tsnow,
ts_activity_last: tsnow,
st_activity_last: stnow,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: u64::MAX,
@@ -2148,6 +2209,9 @@ impl CaConn {
account_emit_last: TsMs::from_ms_u64(0),
account_count: 0,
account_bytes: 0,
dw_st_last: SystemTime::UNIX_EPOCH,
dw_mt_last: SystemTime::UNIX_EPOCH,
dw_lt_last: SystemTime::UNIX_EPOCH,
};
*chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel });
let job = EstablishWorkerJob::new(
@@ -2218,6 +2282,7 @@ impl CaConn {
Ok(Ready(Some(())))
}
Ok(Err(e)) => {
use std::io::ErrorKind;
info!("error connect to {addr} {e}");
let addr = addr.clone();
self.iqdqs
@@ -2226,7 +2291,11 @@ impl CaConn {
addr,
status: ConnectionStatus::ConnectError,
}))?;
self.trigger_shutdown(ShutdownReason::IoError);
let reason = match e.kind() {
ErrorKind::ConnectionRefused => ShutdownReason::ConnectRefused,
_ => ShutdownReason::IoError,
};
self.trigger_shutdown(reason);
Ok(Ready(Some(())))
}
Err(e) => {
@@ -2239,7 +2308,7 @@ impl CaConn {
addr,
status: ConnectionStatus::ConnectTimeout,
}))?;
self.trigger_shutdown(ShutdownReason::IocTimeout);
self.trigger_shutdown(ShutdownReason::ConnectTimeout);
Ok(Ready(Some(())))
}
}
@@ -2372,10 +2441,11 @@ impl CaConn {
}
fn emit_channel_status(&mut self) -> Result<(), Error> {
let stnow = SystemTime::now();
let mut channel_statuses = BTreeMap::new();
for (_, conf) in self.channels.iter() {
let chst = &conf.state;
let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone());
let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone(), stnow);
channel_statuses.insert(chst.cssid(), chinfo);
}
// trace2!("{:?}", channel_statuses);
@@ -2407,21 +2477,21 @@ impl CaConn {
ChannelState::Writable(st1) => {
let ch = &mut st1.channel;
if ch.account_emit_last != msp {
ch.account_emit_last = msp;
if ch.account_count != 0 {
let series_id = ch.cssid.id();
let series = st1.writer.sid();
let count = ch.account_count as i64;
let bytes = ch.account_bytes as i64;
ch.account_count = 0;
ch.account_bytes = 0;
let item = QueryItem::Accounting(Accounting {
part: (series_id & 0xff) as i32,
part: (series.id() & 0xff) as i32,
ts: msp,
series: SeriesId::new(series_id),
series,
count,
bytes,
});
self.iqdqs.emit_status_item(item)?;
ch.account_emit_last = msp;
}
}
}

View File

@@ -946,7 +946,8 @@ impl CaConnSet {
warn!("received error {addr} {e}");
self.handle_connect_fail(addr)?
}
EndOfStreamReason::ConnectFail => self.handle_connect_fail(addr)?,
EndOfStreamReason::ConnectRefused => self.handle_connect_fail(addr)?,
EndOfStreamReason::ConnectTimeout => self.handle_connect_fail(addr)?,
EndOfStreamReason::OnCommand => {
// warn!("TODO make sure no channel is in state which could trigger health timeout")
}
@@ -1103,10 +1104,12 @@ impl CaConnSet {
let mut eos_reason = None;
while let Some(item) = conn.next().await {
trace!("ca_conn_item_merge_inner item {}", item.desc_short());
if let Some(x) = eos_reason {
let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}"));
error!("{e}");
return Err(e);
if let Some(x) = &eos_reason {
// TODO enable again, should not happen.
// let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}"));
// error!("{e}");
// return Err(e);
warn!("CaConn {addr} EOS reason [{x:?}] after [{eos_reason:?}]");
}
stats.item_count.inc();
match item.value {
@@ -1497,25 +1500,29 @@ impl CaConnSet {
fn try_push_ca_conn_cmds(&mut self, cx: &mut Context) -> Result<(), Error> {
use Poll::*;
for (_, v) in self.ca_conn_ress.iter_mut() {
for (addr, v) in self.ca_conn_ress.iter_mut() {
let tx = &mut v.sender;
loop {
if false {
if v.cmd_queue.len() != 0 || tx.is_sending() {
debug!("try_push_ca_conn_cmds {:?} {:?}", v.cmd_queue.len(), tx.len());
}
}
break if tx.is_sending() {
match tx.poll_unpin(cx) {
Ready(Ok(())) => {
self.stats.try_push_ca_conn_cmds_sent.inc();
continue;
}
Ready(Err(e)) => {
error!("try_push_ca_conn_cmds {e}");
return Err(Error::with_msg_no_trace(format!("{e}")));
}
Pending => (),
Ready(Err(e)) => match e {
scywr::senderpolling::Error::NoSendInProgress => {
error!("try_push_ca_conn_cmds {e}");
return Err(Error::with_msg_no_trace(format!("{e}")));
}
scywr::senderpolling::Error::Closed(_) => {
// TODO
// Should be nothing to do here.
// The connection ended, which CaConnSet notices anyway.
// self.handle_connect_fail(addr)?;
self.stats.try_push_ca_conn_cmds_closed().inc();
}
},
Pending => {}
}
} else if let Some(item) = v.cmd_queue.pop_front() {
tx.as_mut().send_pin(item);

View File

@@ -244,7 +244,7 @@ pub enum CaDataScalarValue {
I32(i32),
F32(f32),
F64(f64),
Enum(i16),
Enum(i16, String),
String(String),
// TODO remove, CA has no bool, make new enum for other use cases.
Bool(bool),
@@ -259,7 +259,7 @@ impl From<CaDataScalarValue> for scywr::iteminsertqueue::ScalarValue {
CaDataScalarValue::I32(x) => ScalarValue::I32(x),
CaDataScalarValue::F32(x) => ScalarValue::F32(x),
CaDataScalarValue::F64(x) => ScalarValue::F64(x),
CaDataScalarValue::Enum(x) => ScalarValue::Enum(x),
CaDataScalarValue::Enum(x, y) => ScalarValue::Enum(x, y),
CaDataScalarValue::String(x) => ScalarValue::String(x),
CaDataScalarValue::Bool(x) => ScalarValue::Bool(x),
}

View File

@@ -19,6 +19,7 @@ pub struct CaIngestOpts {
backend: String,
channels: Option<PathBuf>,
api_bind: String,
udp_broadcast_bind: Option<String>,
search: Vec<String>,
#[serde(default)]
search_blacklist: Vec<String>,
@@ -53,6 +54,10 @@ impl CaIngestOpts {
self.api_bind.clone()
}
pub fn udp_broadcast_bind(&self) -> Option<&str> {
self.udp_broadcast_bind.as_ref().map(String::as_str)
}
pub fn postgresql_config(&self) -> &Database {
&self.postgresql
}

View File

@@ -1,4 +1,5 @@
#![allow(unused)]
pub mod ingest;
pub mod postingest;
pub mod status;
@@ -18,14 +19,17 @@ use axum::http;
use axum::response::IntoResponse;
use axum::response::Response;
use bytes::Bytes;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::Error;
use http::Request;
use http::StatusCode;
use http_body::Body;
use log::*;
use scywr::insertqueues::InsertQueuesTx;
use scywr::iteminsertqueue::QueryItem;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use stats::CaConnSetStats;
use stats::CaConnStats;
use stats::CaConnStatsAgg;
@@ -37,12 +41,17 @@ use stats::IocFinderStats;
use stats::SeriesByChannelStats;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use taskrun::tokio;
use taskrun::tokio::net::TcpListener;
struct PublicErrorMsg(String);
@@ -59,15 +68,45 @@ impl ToPublicErrorMsg for err::Error {
}
}
pub struct Res123 {
content: Option<Bytes>,
}
impl http_body::Body for Res123 {
type Data = Bytes;
type Error = Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
use Poll::*;
match self.content.take() {
Some(x) => Ready(Some(Ok(http_body::Frame::data(x)))),
None => Ready(None),
}
}
}
impl IntoResponse for PublicErrorMsg {
fn into_response(self) -> axum::response::Response {
let msgbytes = self.0.as_bytes();
let body = axum::body::Bytes::from(msgbytes.to_vec());
let body = axum::body::Full::new(body);
let body = body.map_err(|_| axum::Error::new(Error::from_string("error while trying to create fixed body")));
let body = axum::body::BoxBody::new(body);
let x = axum::response::Response::builder().status(500).body(body).unwrap();
x
// let body = axum::body::Bytes::from(msgbytes.to_vec());
// let body = http_body::Frame::data(body);
// let body = body.map_err(|_| axum::Error::new(Error::from_string("error while trying to create fixed body")));
// let body = http_body::combinators::BoxBody::new(body);
// let body = axum::body::Body::new(body);
// let x = axum::response::Response::builder().status(500).body(body).unwrap();
// return x;
// x
// let boddat = http_body::Empty::new();
let res: Res123 = Res123 {
content: Some(Bytes::from(self.0.as_bytes().to_vec())),
};
let bod = axum::body::Body::new(res);
// let ret: http::Response<Bytes> = todo!();
let ret = http::Response::builder().status(500).body(bod).unwrap();
ret
}
}
@@ -268,10 +307,30 @@ fn metrics(stats_set: &StatsSet) -> String {
[s1, s2, s3, s4, s5, s6, s7].join("")
}
fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, stats_set: StatsSet) -> axum::Router {
pub struct RoutesResources {
backend: String,
worker_tx: Sender<ChannelInfoQuery>,
iqtx: InsertQueuesTx,
}
impl RoutesResources {
pub fn new(backend: String, worker_tx: Sender<ChannelInfoQuery>, iqtx: InsertQueuesTx) -> Self {
Self {
backend,
worker_tx,
iqtx,
}
}
}
fn make_routes(
rres: Arc<RoutesResources>,
dcom: Arc<DaemonComm>,
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
) -> axum::Router {
use axum::extract;
use axum::routing::get;
use axum::routing::put;
use axum::routing::{get, post, put};
use axum::Router;
use http::StatusCode;
@@ -290,12 +349,51 @@ fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, st
)
.route("/path3/", get(|| async { (StatusCode::OK, format!("Hello there!")) })),
)
.route(
"/daqingest/metrics",
get({
let stats_set = stats_set.clone();
|| async move { metrics(&stats_set) }
}),
.nest(
"/daqingest",
Router::new()
.fallback(|| async { axum::Json(json!({"subcommands":["channel", "metrics"]})) })
.nest(
"/metrics",
Router::new().fallback(|| async { StatusCode::NOT_FOUND }).route(
"/",
get({
let stats_set = stats_set.clone();
|| async move { metrics(&stats_set) }
}),
),
)
.nest(
"/channel",
Router::new()
.fallback(|| async { axum::Json(json!({"subcommands":["states"]})) })
.route(
"/states",
get({
let tx = connset_cmd_tx.clone();
|Query(params): Query<HashMap<String, String>>| status::channel_states(params, tx)
}),
)
.route(
"/add",
get({
let dcom = dcom.clone();
|Query(params): Query<HashMap<String, String>>| channel_add(params, dcom)
}),
),
)
.nest(
"/ingest",
Router::new().route(
"/v1",
post({
let rres = rres.clone();
move |(params, body): (Query<HashMap<String, String>>, axum::body::Body)| {
ingest::post_v01((params, body), rres)
}
}),
),
),
)
.route(
"/daqingest/metricbeat",
@@ -315,13 +413,6 @@ fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, st
|Query(params): Query<HashMap<String, String>>| find_channel(params, dcom)
}),
)
.route(
"/daqingest/channel/states",
get({
let tx = connset_cmd_tx.clone();
|Query(params): Query<HashMap<String, String>>| status::channel_states(params, tx)
}),
)
.route(
"/daqingest/private/channel/states",
get({
@@ -329,13 +420,6 @@ fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, st
|Query(params): Query<HashMap<String, String>>| private_channel_states(params, tx)
}),
)
.route(
"/daqingest/channel/add",
get({
let dcom = dcom.clone();
|Query(params): Query<HashMap<String, String>>| channel_add(params, dcom)
}),
)
.route(
"/daqingest/channel/remove",
get({
@@ -393,20 +477,18 @@ pub async fn metrics_service(
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
shutdown_signal: Receiver<u32>,
rres: Arc<RoutesResources>,
) -> Result<(), Error> {
info!("metrics service start {bind_to}");
let addr = bind_to.parse().map_err(Error::from_string)?;
let router = make_routes(dcom, connset_cmd_tx, stats_set).into_make_service();
axum::Server::bind(&addr)
.serve(router)
let addr: SocketAddr = bind_to.parse().map_err(Error::from_string)?;
let router = make_routes(rres, dcom, connset_cmd_tx, stats_set).into_make_service();
let listener = TcpListener::bind(addr).await?;
// into_make_service_with_connect_info
axum::serve(listener, router)
.with_graceful_shutdown(async move {
let _ = shutdown_signal.recv().await;
})
.await
.inspect(|x| {
info!("metrics service finished with {x:?}");
})
.map_err(Error::from_string)?;
.await?;
Ok(())
}

View File

@@ -0,0 +1,105 @@
use super::RoutesResources;
use axum::extract::FromRequest;
use axum::extract::Query;
use axum::Json;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_2::eventsdim0::EventsDim0;
use netpod::log::*;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ScalarValue;
use serieswriter::writer::SeriesWriter;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Cursor;
use std::sync::Arc;
use std::time::SystemTime;
use streams::framed_bytes::FramedBytesStream;
// use core::io::BorrowedBuf;
#[derive(Debug, ThisError)]
pub enum Error {
Logic,
SeriesWriter(#[from] serieswriter::writer::Error),
MissingChannelName,
SendError,
Decode,
FramedBytes(#[from] streams::framed_bytes::Error),
}
struct BodyRead {}
pub async fn post_v01(
(Query(params), body): (Query<HashMap<String, String>>, axum::body::Body),
rres: Arc<RoutesResources>,
) -> Json<serde_json::Value> {
match post_v01_try(params, body, rres).await {
Ok(k) => k,
Err(e) => Json(serde_json::Value::String(e.to_string())),
}
}
async fn post_v01_try(
params: HashMap<String, String>,
body: axum::body::Body,
rres: Arc<RoutesResources>,
) -> Result<Json<serde_json::Value>, Error> {
info!("params {:?}", params);
let stnow = SystemTime::now();
let worker_tx = rres.worker_tx.clone();
let backend = rres.backend.clone();
let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.into();
let scalar_type = ScalarType::I16;
let shape = Shape::Scalar;
info!("establishing...");
let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?;
let mut iqdqs = InsertDeques::new();
let mut iqtx = rres.iqtx.clone();
// iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?;
// let deque = &mut iqdqs.st_rf3_rx;
let mut frames = FramedBytesStream::new(body.into_data_stream().map_err(|_| streams::framed_bytes::Error::Logic));
while let Some(frame) = frames.try_next().await? {
info!("got frame len {}", frame.len());
let evs: EventsDim0<i16> = ciborium::de::from_reader(Cursor::new(frame)).map_err(|_| Error::Decode)?;
info!("see events {:?}", evs);
let deque = &mut iqdqs.st_rf3_rx;
for (i, (&ts, &val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
info!("ev {:6} {:20} {:20}", i, ts, val);
let val = DataValue::Scalar(ScalarValue::I16(val));
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?;
}
iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?;
}
let deque = &mut iqdqs.st_rf3_rx;
finish_writers(vec![&mut writer], deque)?;
iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?;
let ret = Json(serde_json::json!({
"result": true,
}));
Ok(ret)
}
fn tick_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
for sw in sws {
sw.tick(deque)?;
}
Ok(())
}
fn finish_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
for sw in sws {
sw.tick(deque)?;
}
Ok(())
}

View File

@@ -7,6 +7,7 @@ use serde::Serialize;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::time::SystemTime;
#[derive(Debug, Serialize)]
pub struct ChannelStates {
@@ -20,6 +21,20 @@ struct ChannelState {
archiving_configuration: ChannelConfig,
recv_count: u64,
recv_bytes: u64,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
recv_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
write_st_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
write_mt_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
write_lt_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
updated: SystemTime,
}
fn system_time_epoch(x: &SystemTime) -> bool {
*x == SystemTime::UNIX_EPOCH
}
#[derive(Debug, Serialize)]
@@ -62,6 +77,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -72,6 +92,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -85,6 +110,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -98,6 +128,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -111,6 +146,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -128,6 +168,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count,
recv_bytes,
recv_last: st6.recv_last,
write_st_last: st6.write_st_last,
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
};
states.channels.insert(k, chst);
}
@@ -138,6 +183,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count,
recv_bytes,
recv_last: st6.recv_last,
write_st_last: st6.write_st_last,
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
};
states.channels.insert(k, chst);
}
@@ -148,6 +198,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count,
recv_bytes,
recv_last: st6.recv_last,
write_st_last: st6.write_st_last,
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
};
states.channels.insert(k, chst);
}
@@ -158,6 +213,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count,
recv_bytes,
recv_last: st6.recv_last,
write_st_last: st6.write_st_last,
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
};
states.channels.insert(k, chst);
}
@@ -174,6 +234,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -184,6 +249,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -194,6 +264,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}

View File

@@ -54,7 +54,7 @@ pub enum ScalarValue {
I64(i64),
F32(f32),
F64(f64),
Enum(i16),
Enum(i16, String),
String(String),
Bool(bool),
}
@@ -68,7 +68,7 @@ impl ScalarValue {
ScalarValue::I64(_) => 8,
ScalarValue::F32(_) => 4,
ScalarValue::F64(_) => 8,
ScalarValue::Enum(_) => 2,
ScalarValue::Enum(_, y) => 2 + y.len() as u32,
ScalarValue::String(x) => x.len() as u32,
ScalarValue::Bool(_) => 1,
}
@@ -82,7 +82,7 @@ impl ScalarValue {
ScalarValue::I64(x) => x.to_string(),
ScalarValue::F32(x) => x.to_string(),
ScalarValue::F64(x) => x.to_string(),
ScalarValue::Enum(x) => x.to_string(),
ScalarValue::Enum(x, y) => format!("({}, {})", x, y),
ScalarValue::String(x) => x.to_string(),
ScalarValue::Bool(x) => x.to_string(),
}
@@ -233,7 +233,7 @@ impl DataValue {
ScalarValue::I64(_) => ScalarType::I64,
ScalarValue::F32(_) => ScalarType::F32,
ScalarValue::F64(_) => ScalarType::F64,
ScalarValue::Enum(_) => ScalarType::U16,
ScalarValue::Enum(_, _) => ScalarType::Enum,
ScalarValue::String(_) => ScalarType::STRING,
ScalarValue::Bool(_) => ScalarType::BOOL,
},
@@ -471,7 +471,8 @@ impl ChannelStatus {
#[derive(Debug, Clone)]
pub enum ShutdownReason {
ConnectFail,
ConnectRefused,
ConnectTimeout,
IoError,
ShutdownCommand,
InternalError,
@@ -747,7 +748,7 @@ pub async fn insert_item(
match val {
I8(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i8, &data_store).await?,
I16(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?,
Enum(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?,
Enum(a, b) => insert_scalar_gen(par, a, &data_store.qu_insert_scalar_i16, &data_store).await?,
I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?,
I64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i64, &data_store).await?,
F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?,
@@ -824,7 +825,7 @@ pub fn insert_item_fut(
I64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i64.clone(), scy),
F32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f32.clone(), scy),
F64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f64.clone(), scy),
Enum(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy),
Enum(a, b) => insert_scalar_gen_fut(par, a, data_store.qu_insert_scalar_i16.clone(), scy),
String(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_string.clone(), scy),
Bool(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_bool.clone(), scy),
}

View File

@@ -19,7 +19,7 @@ pub struct PatchCollect {
impl PatchCollect {
pub fn new(bin_len: TsNano, bin_count: u64) -> Self {
Self {
patch_len: TsNano(bin_len.0 * bin_count),
patch_len: TsNano::from_ns(bin_len.ns() * bin_count),
bin_len,
bin_count,
coll: None,
@@ -68,13 +68,13 @@ impl PatchCollect {
for (i2, (ts1, ts2)) in ts1s.iter().zip(ts2s).enumerate() {
info!("EDGE {}", ts1 / SEC);
if self.locked {
if ts2 % self.patch_len.0 == 0 {
if ts2 % self.patch_len.ns() == 0 {
info!("FOUND PATCH EDGE-END at {}", ts2 / SEC);
i3 = i2 + 1;
emit = true;
}
} else {
if ts1 % self.patch_len.0 == 0 {
if ts1 % self.patch_len.ns() == 0 {
info!("FOUND PATCH EDGE-BEG at {}", ts1 / SEC);
self.locked = true;
i3 = i2;

View File

@@ -124,9 +124,9 @@ impl RtWriter {
ts_local: TsNano,
val: DataValue,
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
) -> Result<((bool, bool, bool),), Error> {
let sid = self.sid;
Self::write_inner(
let (did_write_st,) = Self::write_inner(
"ST",
self.min_quiets.st,
&mut self.state_st,
@@ -136,7 +136,7 @@ impl RtWriter {
val.clone(),
sid,
)?;
Self::write_inner(
let (did_write_mt,) = Self::write_inner(
"MT",
self.min_quiets.mt,
&mut self.state_mt,
@@ -146,7 +146,7 @@ impl RtWriter {
val.clone(),
sid,
)?;
Self::write_inner(
let (did_write_lt,) = Self::write_inner(
"LT",
self.min_quiets.lt,
&mut self.state_lt,
@@ -156,7 +156,7 @@ impl RtWriter {
val.clone(),
sid,
)?;
Ok(())
Ok(((did_write_st, did_write_mt, did_write_lt),))
}
fn write_inner(
@@ -168,7 +168,7 @@ impl RtWriter {
ts_local: TsNano,
val: DataValue,
sid: SeriesId,
) -> Result<(), Error> {
) -> Result<(bool,), Error> {
// Decide whether we want to write.
// Use the IOC time for the decision whether to write.
// But use the ingest local time as the primary index.
@@ -200,7 +200,7 @@ impl RtWriter {
});
state.writer.write(ts_ioc, ts_local, val.clone(), deque)?;
}
Ok(())
Ok((do_write,))
}
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {

View File

@@ -432,7 +432,7 @@ fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque<Query
let bin_len_sec = (pc.bin_len().ns() / MS);
let bin_count = pc.bin_count();
let off = ts0 / pc.patch_len().0;
let off = ts0 / pc.patch_len().ns();
let off_msp = off / 1000;
let off_lsp = off % 1000;
// let item = TimeBinSimpleF32 {

View File

@@ -409,7 +409,6 @@ stats_proc::stats_struct!((
ca_conn_eos_unexpected,
response_tx_fail,
try_push_ca_conn_cmds_sent,
try_push_ca_conn_cmds_full,
try_push_ca_conn_cmds_closed,
logic_error,
logic_issue,
@@ -451,6 +450,7 @@ stats_proc::stats_struct!((
name(SeriesByChannelStats),
prefix(seriesbychannel),
counters(res_tx_fail, res_tx_timeout, recv_batch, recv_items,),
histolog2s(commit_duration_ms),
),
stats_struct(
name(InsertWorkerStats),