Merge branch 'postingest' into dev

This commit is contained in:
Dominik Werder
2024-06-20 15:47:03 +02:00
23 changed files with 1048 additions and 404 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.1-aa.0"
version = "0.2.1-aa.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
@@ -17,7 +17,7 @@ tokio-postgres = "0.7.10"
async-channel = "2.2.0"
futures-util = "0.3"
chrono = "0.4"
bytes = "1.5.0"
bytes = "1.6.0"
libc = "0.2"
err = { path = "../../daqbuffer/crates/err" }
netpod = { path = "../../daqbuffer/crates/netpod" }

View File

@@ -89,7 +89,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
netfetch::ca::search::ca_search(conf, &channels).await?
}
ChannelAccess::CaIngest(k) => {
info!("daqingest version {} +0003", clap::crate_version!());
info!("daqingest version {} +0004", clap::crate_version!());
let (conf, channels_config) = parse_config(k.config.into()).await?;
daqingest::daemon::run(conf, channels_config).await?
}

View File

@@ -14,6 +14,7 @@ use netfetch::conf::ChannelConfig;
use netfetch::conf::ChannelsConfig;
use netfetch::daemon_common::Channel;
use netfetch::daemon_common::DaemonEvent;
use netfetch::metrics::RoutesResources;
use netfetch::metrics::StatsSet;
use netfetch::throttletrace::ThrottleTrace;
use netpod::ttl::RetentionTime;
@@ -603,6 +604,14 @@ impl Daemon {
let connset_cmd_tx = self.connset_ctrl.sender().clone();
let ca_conn_stats = self.connset_ctrl.ca_conn_stats().clone();
let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone()));
let rres = RoutesResources::new(
self.ingest_opts.backend().into(),
self.channel_info_query_tx.clone(),
self.iqtx
.take()
.ok_or_else(|| Error::with_msg_no_trace("no iqtx available"))?,
);
let rres = Arc::new(rres);
let metrics_jh = {
let conn_set_stats = self.connset_ctrl.stats().clone();
let stats_set = StatsSet::new(
@@ -621,6 +630,7 @@ impl Daemon {
connset_cmd_tx,
stats_set,
self.metrics_shutdown_rx.clone(),
rres,
);
tokio::task::spawn(fut)
};
@@ -634,7 +644,7 @@ impl Daemon {
let (_item_tx, item_rx) = async_channel::bounded(256);
let info_worker_tx = self.channel_info_query_tx.clone();
use netfetch::metrics::postingest::process_api_query_items;
let iqtx = self.iqtx.take().unwrap();
let iqtx = self.iqtx.clone().unwrap();
let worker_fut = process_api_query_items(backend, item_rx, info_worker_tx, iqtx);
taskrun::spawn(worker_fut)
};

View File

@@ -164,6 +164,7 @@ fn table_name_from_type(scalar_type: &ScalarType, shape: &Shape) -> &'static str
ScalarType::F64 => todo!(),
ScalarType::BOOL => todo!(),
ScalarType::STRING => todo!(),
ScalarType::Enum => todo!(),
ScalarType::ChannelStatus => todo!(),
},
Shape::Wave(_) => match scalar_type {
@@ -179,6 +180,7 @@ fn table_name_from_type(scalar_type: &ScalarType, shape: &Shape) -> &'static str
ScalarType::F64 => todo!(),
ScalarType::BOOL => todo!(),
ScalarType::STRING => todo!(),
ScalarType::Enum => todo!(),
ScalarType::ChannelStatus => todo!(),
},
Shape::Image(_, _) => todo!(),

View File

@@ -157,8 +157,11 @@ impl Worker {
" as inp (rid, backend, channel, kind)",
")",
" select q1.rid, t.series, t.scalar_type, t.shape_dims, t.tscs, t.kind from q1",
" join series_by_channel t on t.facility = q1.backend and t.channel = q1.channel",
" and t.kind = q1.kind and t.agg_kind = 0",
" join series_by_channel t",
" on t.facility = q1.backend",
" and t.channel = q1.channel",
" and t.kind = q1.kind",
" and t.agg_kind = 0",
" order by q1.rid",
);
let qu_select = pg
@@ -224,6 +227,7 @@ impl Worker {
match self.pg.execute("commit", &[]).await {
Ok(n) => {
let dt = ts1.elapsed();
self.stats.commit_duration_ms().ingest((1e3 * dt.as_secs_f32()) as u32);
if dt > Duration::from_millis(40) {
debug!("commit {} {:.0} ms", n, dt.as_secs_f32());
}
@@ -345,15 +349,18 @@ impl Worker {
for (&rid, job) in rids.iter().zip(jobs.into_iter()) {
loop {
break if let Some(row) = &row_opt {
if row.get::<_, i32>(0) == rid {
let series = SeriesId::new(row.get::<_, i64>(1) as _);
let rid2: i32 = row.get(0);
if rid2 == rid {
let series: i64 = row.get(1);
let series = SeriesId::new(series as _);
let shape_dims: Vec<i32> = row.get(3);
let scalar_type = ScalarType::from_scylla_i32(row.get(2)).map_err(|_| Error::ScalarType)?;
let shape_dims = Shape::from_scylla_shape_dims(row.get::<_, Vec<i32>>(3).as_slice())
.map_err(|_| Error::Shape)?;
let shape_dims =
Shape::from_scylla_shape_dims(shape_dims.as_slice()).map_err(|_| Error::Shape)?;
let tscs: Vec<DateTime<Utc>> = row.get(4);
let kind: i16 = row.get(5);
let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::ScalarType)?;
if job.channel == "TEST:MEDIUM:WAVE-01024:F32:000000"
if true && job.channel == "TEST:MEDIUM:WAVE-01024:F32:000000"
|| series == SeriesId::new(1605348259462543621)
{
debug!(
@@ -361,7 +368,7 @@ impl Worker {
rid, series, scalar_type, shape_dims, tscs, kind
);
}
acc.push((rid, series, scalar_type, shape_dims, tscs));
acc.push((rid, series, kind, scalar_type, shape_dims, tscs));
row_opt = row_it.next();
continue;
}
@@ -370,7 +377,7 @@ impl Worker {
// debug!("check for {job:?}");
// TODO call decide with empty accumulator: will result in DoesntExist.
let v = std::mem::replace(&mut acc, Vec::new());
let dec = Self::decide_matching_via_db(job.scalar_type.clone(), job.shape.clone(), v)?;
let dec = Self::decide_matching_via_db(&job.scalar_type, &job.shape, v)?;
// debug!("decision {dec:?}");
result.push(FoundResult { job, status: dec });
}
@@ -378,24 +385,29 @@ impl Worker {
}
fn decide_matching_via_db(
scalar_type: ScalarType,
shape: Shape,
acc: Vec<(i32, SeriesId, ScalarType, Shape, Vec<DateTime<Utc>>)>,
scalar_type: &ScalarType,
shape: &Shape,
acc: Vec<(i32, SeriesId, SeriesKind, ScalarType, Shape, Vec<DateTime<Utc>>)>,
) -> Result<MatchingSeries, Error> {
let a2 = acc.iter().map(|x| &x.4).collect();
let a2 = acc.iter().map(|x| &x.5).collect();
Self::assert_order(a2)?;
let unfolded = Self::unfold_series_rows(acc)?;
Self::assert_varying_types(&unfolded)?;
// TODO do database cleanup and enable again
if false {
Self::assert_varying_types(&unfolded)?;
}
if let Some(last) = unfolded.last() {
if last.1 == scalar_type && last.2 == shape {
if last.2 == *scalar_type && shape_equiv(&last.3, &shape) {
Ok(MatchingSeries::Latest(last.0.clone()))
} else {
let mut ret = MatchingSeries::DoesntExist;
for e in unfolded.into_iter().rev() {
if e.1 == scalar_type && e.2 == shape {
return Ok(MatchingSeries::UsedBefore(e.0.clone()));
if e.2 == *scalar_type && shape_equiv(&e.3, &shape) {
ret = MatchingSeries::UsedBefore(e.0.clone());
break;
}
}
Ok(MatchingSeries::DoesntExist)
Ok(ret)
}
} else {
Ok(MatchingSeries::DoesntExist)
@@ -403,15 +415,15 @@ impl Worker {
}
fn unfold_series_rows(
acc: Vec<(i32, SeriesId, ScalarType, Shape, Vec<DateTime<Utc>>)>,
) -> Result<Vec<(SeriesId, ScalarType, Shape, DateTime<Utc>)>, Error> {
acc: Vec<(i32, SeriesId, SeriesKind, ScalarType, Shape, Vec<DateTime<Utc>>)>,
) -> Result<Vec<(SeriesId, SeriesKind, ScalarType, Shape, DateTime<Utc>)>, Error> {
let mut ret = Vec::new();
for g in acc.iter() {
for h in g.4.iter() {
ret.push((g.1.clone(), g.2.clone(), g.3.clone(), *h));
for h in g.5.iter() {
ret.push((g.1.clone(), g.2.clone(), g.3.clone(), g.4.clone(), *h));
}
}
ret.sort_by(|a, b| a.cmp(b));
ret.sort_by(|a, b| a.4.cmp(&b.4));
Ok(ret)
}
@@ -432,7 +444,7 @@ impl Worker {
Ok(())
}
fn assert_varying_types(v: &Vec<(SeriesId, ScalarType, Shape, DateTime<Utc>)>) -> Result<(), Error> {
fn assert_varying_types(v: &Vec<(SeriesId, SeriesKind, ScalarType, Shape, DateTime<Utc>)>) -> Result<(), Error> {
if v.len() > 1 {
let mut z_0 = &v[0].0;
let mut z_1 = &v[0].1;
@@ -471,9 +483,6 @@ impl Worker {
h
};
let x = (backend, channel, kind, scalar_type.to_scylla_i32(), shape, hasher);
if channel == "TEST:MEDIUM:WAVE-01024:F32:000000" {
debug!("INSERT {x:?}");
}
x
})
.fold(
@@ -562,6 +571,23 @@ impl Worker {
}
}
fn shape_equiv(a: &Shape, b: &Shape) -> bool {
match a {
Shape::Scalar => match b {
Shape::Scalar => true,
_ => false,
},
Shape::Wave(_) => match b {
Shape::Wave(_) => true,
_ => false,
},
Shape::Image(_, _) => match b {
Shape::Image(_, _) => true,
_ => false,
},
}
}
pub trait HashSalter {
fn hupd(hupd: &mut dyn FnMut(&[u8]), i1: u16, i2: u16);
}

View File

@@ -12,6 +12,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11"
serde_yaml = "0.9.16"
ciborium = "0.2.2"
tokio-stream = { version = "0.1", features = ["fs"] }
tracing = "0.1.37"
async-channel = "2.0.0"
@@ -22,8 +23,8 @@ futures-util = "0.3"
md-5 = "0.10.5"
hex = "0.4.3"
regex = "1.8.4"
axum = "0.6.18"
http-body = "0.4"
axum = "0.7.5"
http-body = "1"
url = "2.2"
hyper = "0.14"
chrono = "0.4"

View File

@@ -165,6 +165,7 @@ pub enum ChannelConnectedInfo {
#[derive(Clone, Debug, Serialize)]
pub struct ChannelStateInfo {
pub stnow: SystemTime,
pub cssid: ChannelStatusSeriesId,
pub addr: SocketAddrV4,
pub series: Option<SeriesId>,
@@ -184,6 +185,10 @@ pub struct ChannelStateInfo {
pub item_recv_ivl_ema: Option<f32>,
pub interest_score: f32,
pub conf: ChannelConfig,
pub recv_last: SystemTime,
pub write_st_last: SystemTime,
pub write_mt_last: SystemTime,
pub write_lt_last: SystemTime,
}
mod ser_instant {
@@ -358,6 +363,7 @@ struct CreatedState {
ts_alive_last: Instant,
// Updated on monitoring, polling or when the channel config changes to reset the timeout
ts_activity_last: Instant,
st_activity_last: SystemTime,
ts_msp_last: u64,
ts_msp_grid_last: u32,
inserted_in_ts_msp: u64,
@@ -374,11 +380,15 @@ struct CreatedState {
account_emit_last: TsMs,
account_count: u64,
account_bytes: u64,
dw_st_last: SystemTime,
dw_mt_last: SystemTime,
dw_lt_last: SystemTime,
}
impl CreatedState {
fn dummy() -> Self {
let tsnow = Instant::now();
let stnow = SystemTime::now();
Self {
cssid: ChannelStatusSeriesId::new(0),
cid: Cid(0),
@@ -388,6 +398,7 @@ impl CreatedState {
ts_created: tsnow,
ts_alive_last: tsnow,
ts_activity_last: tsnow,
st_activity_last: stnow,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: 0,
@@ -404,6 +415,9 @@ impl CreatedState {
account_emit_last: TsMs(0),
account_count: 0,
account_bytes: 0,
dw_st_last: SystemTime::UNIX_EPOCH,
dw_mt_last: SystemTime::UNIX_EPOCH,
dw_lt_last: SystemTime::UNIX_EPOCH,
}
}
}
@@ -438,7 +452,13 @@ impl ChannelConf {
}
impl ChannelState {
fn to_info(&self, cssid: ChannelStatusSeriesId, addr: SocketAddrV4, conf: ChannelConfig) -> ChannelStateInfo {
fn to_info(
&self,
cssid: ChannelStatusSeriesId,
addr: SocketAddrV4,
conf: ChannelConfig,
stnow: SystemTime,
) -> ChannelStateInfo {
let channel_connected_info = match self {
ChannelState::Init(..) => ChannelConnectedInfo::Disconnected,
ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting,
@@ -472,6 +492,19 @@ impl ChannelState {
ChannelState::Writable(s) => Some(s.channel.recv_bytes),
_ => None,
};
let (recv_last, write_st_last, write_mt_last, write_lt_last) = match self {
ChannelState::Writable(s) => {
let a = s.channel.st_activity_last;
let b = s.channel.dw_st_last;
let c = s.channel.dw_mt_last;
let d = s.channel.dw_lt_last;
(a, b, c, d)
}
_ => {
let a = SystemTime::UNIX_EPOCH;
(a, a, a, a)
}
};
let item_recv_ivl_ema = match self {
ChannelState::Writable(s) => {
let ema = s.channel.item_recv_ivl_ema.ema();
@@ -489,6 +522,7 @@ impl ChannelState {
};
let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10);
ChannelStateInfo {
stnow,
cssid,
addr,
series,
@@ -502,6 +536,10 @@ impl ChannelState {
item_recv_ivl_ema,
interest_score,
conf,
recv_last,
write_st_last,
write_mt_last,
write_lt_last,
}
}
@@ -749,7 +787,8 @@ pub enum CaConnEventValue {
pub enum EndOfStreamReason {
UnspecifiedReason,
Error(Error),
ConnectFail,
ConnectRefused,
ConnectTimeout,
OnCommand,
RemoteClosed,
IocTimeout,
@@ -910,8 +949,12 @@ impl CaConn {
fn trigger_shutdown(&mut self, reason: ShutdownReason) {
let channel_reason = match &reason {
ShutdownReason::ConnectFail => {
self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectFail);
ShutdownReason::ConnectRefused => {
self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectRefused);
ChannelStatusClosedReason::ConnectFail
}
ShutdownReason::ConnectTimeout => {
self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectTimeout);
ChannelStatusClosedReason::ConnectFail
}
ShutdownReason::IoError => {
@@ -1319,7 +1362,7 @@ impl CaConn {
// return Err(Error::with_msg_no_trace());
return Ok(());
};
// debug!("handle_event_add_res {ev:?}");
trace!("handle_event_add_res {:?}", ch_s.cssid());
match ch_s {
ChannelState::Writable(st) => {
// debug!(
@@ -1608,10 +1651,10 @@ impl CaConn {
);
crst.ts_alive_last = tsnow;
crst.ts_activity_last = tsnow;
crst.st_activity_last = stnow;
crst.item_recv_ivl_ema.tick(tsnow);
crst.recv_count += 1;
crst.recv_bytes += payload_len as u64;
let series = writer.sid();
// TODO should attach these counters already to Writable state.
let ts_local = {
let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO);
@@ -1631,7 +1674,16 @@ impl CaConn {
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
{
let val: DataValue = value.data.into();
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?;
let ((dwst, dwmt, dwlt),) = writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?;
if dwst {
crst.dw_st_last = stnow;
}
if dwmt {
crst.dw_mt_last = stnow;
}
if dwlt {
crst.dw_lt_last = stnow;
}
}
}
if false {
@@ -1641,6 +1693,7 @@ impl CaConn {
if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) {
crst.insert_recv_ivl_last = tsnow;
let ema = crst.insert_item_ivl_ema.ema();
let _ = ema;
}
if crst.muted_before == 0 {}
crst.muted_before = 1;
@@ -1668,6 +1721,7 @@ impl CaConn {
},
CaDataScalarValue::I16(..) => match &scalar_type {
ScalarType::I16 => {}
ScalarType::Enum => {}
_ => {
error!("MISMATCH got i16 exp {:?}", scalar_type);
}
@@ -1998,7 +2052,8 @@ impl CaConn {
// TODO count this unexpected case.
}
CaMsgTy::CreateChanRes(k) => {
self.handle_create_chan_res(k, tsnow)?;
let stnow = SystemTime::now();
self.handle_create_chan_res(k, tsnow, stnow)?;
cx.waker().wake_by_ref();
}
CaMsgTy::EventAddRes(ev) => {
@@ -2095,7 +2150,12 @@ impl CaConn {
res.map_err(Into::into)
}
fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> {
fn handle_create_chan_res(
&mut self,
k: proto::CreateChanRes,
tsnow: Instant,
stnow: SystemTime,
) -> Result<(), Error> {
let cid = Cid(k.cid);
let sid = Sid(k.sid);
let conf = if let Some(x) = self.channels.get_mut(&cid) {
@@ -2132,6 +2192,7 @@ impl CaConn {
ts_created: tsnow,
ts_alive_last: tsnow,
ts_activity_last: tsnow,
st_activity_last: stnow,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: u64::MAX,
@@ -2148,6 +2209,9 @@ impl CaConn {
account_emit_last: TsMs::from_ms_u64(0),
account_count: 0,
account_bytes: 0,
dw_st_last: SystemTime::UNIX_EPOCH,
dw_mt_last: SystemTime::UNIX_EPOCH,
dw_lt_last: SystemTime::UNIX_EPOCH,
};
*chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel });
let job = EstablishWorkerJob::new(
@@ -2218,6 +2282,7 @@ impl CaConn {
Ok(Ready(Some(())))
}
Ok(Err(e)) => {
use std::io::ErrorKind;
info!("error connect to {addr} {e}");
let addr = addr.clone();
self.iqdqs
@@ -2226,7 +2291,11 @@ impl CaConn {
addr,
status: ConnectionStatus::ConnectError,
}))?;
self.trigger_shutdown(ShutdownReason::IoError);
let reason = match e.kind() {
ErrorKind::ConnectionRefused => ShutdownReason::ConnectRefused,
_ => ShutdownReason::IoError,
};
self.trigger_shutdown(reason);
Ok(Ready(Some(())))
}
Err(e) => {
@@ -2239,7 +2308,7 @@ impl CaConn {
addr,
status: ConnectionStatus::ConnectTimeout,
}))?;
self.trigger_shutdown(ShutdownReason::IocTimeout);
self.trigger_shutdown(ShutdownReason::ConnectTimeout);
Ok(Ready(Some(())))
}
}
@@ -2372,10 +2441,11 @@ impl CaConn {
}
fn emit_channel_status(&mut self) -> Result<(), Error> {
let stnow = SystemTime::now();
let mut channel_statuses = BTreeMap::new();
for (_, conf) in self.channels.iter() {
let chst = &conf.state;
let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone());
let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone(), stnow);
channel_statuses.insert(chst.cssid(), chinfo);
}
// trace2!("{:?}", channel_statuses);
@@ -2407,21 +2477,21 @@ impl CaConn {
ChannelState::Writable(st1) => {
let ch = &mut st1.channel;
if ch.account_emit_last != msp {
ch.account_emit_last = msp;
if ch.account_count != 0 {
let series_id = ch.cssid.id();
let series = st1.writer.sid();
let count = ch.account_count as i64;
let bytes = ch.account_bytes as i64;
ch.account_count = 0;
ch.account_bytes = 0;
let item = QueryItem::Accounting(Accounting {
part: (series_id & 0xff) as i32,
part: (series.id() & 0xff) as i32,
ts: msp,
series: SeriesId::new(series_id),
series,
count,
bytes,
});
self.iqdqs.emit_status_item(item)?;
ch.account_emit_last = msp;
}
}
}
@@ -2525,8 +2595,8 @@ impl CaConn {
}
fn log_queues_summary(&self) {
self.iqdqs.log_summary();
self.iqsp.log_summary();
trace!("{}", self.iqdqs.summary());
trace!("{}", self.iqsp.summary());
}
}

View File

@@ -946,7 +946,8 @@ impl CaConnSet {
warn!("received error {addr} {e}");
self.handle_connect_fail(addr)?
}
EndOfStreamReason::ConnectFail => self.handle_connect_fail(addr)?,
EndOfStreamReason::ConnectRefused => self.handle_connect_fail(addr)?,
EndOfStreamReason::ConnectTimeout => self.handle_connect_fail(addr)?,
EndOfStreamReason::OnCommand => {
// warn!("TODO make sure no channel is in state which could trigger health timeout")
}
@@ -1103,10 +1104,12 @@ impl CaConnSet {
let mut eos_reason = None;
while let Some(item) = conn.next().await {
trace!("ca_conn_item_merge_inner item {}", item.desc_short());
if let Some(x) = eos_reason {
let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}"));
error!("{e}");
return Err(e);
if let Some(x) = &eos_reason {
// TODO enable again, should not happen.
// let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}"));
// error!("{e}");
// return Err(e);
warn!("CaConn {addr} EOS reason [{x:?}] after [{eos_reason:?}]");
}
stats.item_count.inc();
match item.value {
@@ -1497,25 +1500,29 @@ impl CaConnSet {
fn try_push_ca_conn_cmds(&mut self, cx: &mut Context) -> Result<(), Error> {
use Poll::*;
for (_, v) in self.ca_conn_ress.iter_mut() {
for (addr, v) in self.ca_conn_ress.iter_mut() {
let tx = &mut v.sender;
loop {
if false {
if v.cmd_queue.len() != 0 || tx.is_sending() {
debug!("try_push_ca_conn_cmds {:?} {:?}", v.cmd_queue.len(), tx.len());
}
}
break if tx.is_sending() {
match tx.poll_unpin(cx) {
Ready(Ok(())) => {
self.stats.try_push_ca_conn_cmds_sent.inc();
continue;
}
Ready(Err(e)) => {
error!("try_push_ca_conn_cmds {e}");
return Err(Error::with_msg_no_trace(format!("{e}")));
}
Pending => (),
Ready(Err(e)) => match e {
scywr::senderpolling::Error::NoSendInProgress => {
error!("try_push_ca_conn_cmds {e}");
return Err(Error::with_msg_no_trace(format!("{e}")));
}
scywr::senderpolling::Error::Closed(_) => {
// TODO
// Should be nothing to do here.
// The connection ended, which CaConnSet notices anyway.
// self.handle_connect_fail(addr)?;
self.stats.try_push_ca_conn_cmds_closed().inc();
}
},
Pending => {}
}
} else if let Some(item) = v.cmd_queue.pop_front() {
tx.as_mut().send_pin(item);

View File

@@ -244,7 +244,7 @@ pub enum CaDataScalarValue {
I32(i32),
F32(f32),
F64(f64),
Enum(i16),
Enum(i16, String),
String(String),
// TODO remove, CA has no bool, make new enum for other use cases.
Bool(bool),
@@ -259,7 +259,7 @@ impl From<CaDataScalarValue> for scywr::iteminsertqueue::ScalarValue {
CaDataScalarValue::I32(x) => ScalarValue::I32(x),
CaDataScalarValue::F32(x) => ScalarValue::F32(x),
CaDataScalarValue::F64(x) => ScalarValue::F64(x),
CaDataScalarValue::Enum(x) => ScalarValue::Enum(x),
CaDataScalarValue::Enum(x, y) => ScalarValue::Enum(x, y),
CaDataScalarValue::String(x) => ScalarValue::String(x),
CaDataScalarValue::Bool(x) => ScalarValue::Bool(x),
}

View File

@@ -19,6 +19,7 @@ pub struct CaIngestOpts {
backend: String,
channels: Option<PathBuf>,
api_bind: String,
udp_broadcast_bind: Option<String>,
search: Vec<String>,
#[serde(default)]
search_blacklist: Vec<String>,
@@ -53,6 +54,10 @@ impl CaIngestOpts {
self.api_bind.clone()
}
pub fn udp_broadcast_bind(&self) -> Option<&str> {
self.udp_broadcast_bind.as_ref().map(String::as_str)
}
pub fn postgresql_config(&self) -> &Database {
&self.postgresql
}

View File

@@ -1,4 +1,5 @@
#![allow(unused)]
pub mod ingest;
pub mod postingest;
pub mod status;
@@ -15,17 +16,21 @@ use async_channel::Sender;
use async_channel::WeakSender;
use axum::extract::Query;
use axum::http;
use axum::http::HeaderMap;
use axum::response::IntoResponse;
use axum::response::Response;
use bytes::Bytes;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::Error;
use http::Request;
use http::StatusCode;
use http_body::Body;
use log::*;
use scywr::insertqueues::InsertQueuesTx;
use scywr::iteminsertqueue::QueryItem;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use stats::CaConnSetStats;
use stats::CaConnStats;
use stats::CaConnStatsAgg;
@@ -37,12 +42,17 @@ use stats::IocFinderStats;
use stats::SeriesByChannelStats;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use taskrun::tokio;
use taskrun::tokio::net::TcpListener;
struct PublicErrorMsg(String);
@@ -59,15 +69,45 @@ impl ToPublicErrorMsg for err::Error {
}
}
pub struct Res123 {
content: Option<Bytes>,
}
impl http_body::Body for Res123 {
type Data = Bytes;
type Error = Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
use Poll::*;
match self.content.take() {
Some(x) => Ready(Some(Ok(http_body::Frame::data(x)))),
None => Ready(None),
}
}
}
impl IntoResponse for PublicErrorMsg {
fn into_response(self) -> axum::response::Response {
let msgbytes = self.0.as_bytes();
let body = axum::body::Bytes::from(msgbytes.to_vec());
let body = axum::body::Full::new(body);
let body = body.map_err(|_| axum::Error::new(Error::from_string("error while trying to create fixed body")));
let body = axum::body::BoxBody::new(body);
let x = axum::response::Response::builder().status(500).body(body).unwrap();
x
// let body = axum::body::Bytes::from(msgbytes.to_vec());
// let body = http_body::Frame::data(body);
// let body = body.map_err(|_| axum::Error::new(Error::from_string("error while trying to create fixed body")));
// let body = http_body::combinators::BoxBody::new(body);
// let body = axum::body::Body::new(body);
// let x = axum::response::Response::builder().status(500).body(body).unwrap();
// return x;
// x
// let boddat = http_body::Empty::new();
let res: Res123 = Res123 {
content: Some(Bytes::from(self.0.as_bytes().to_vec())),
};
let bod = axum::body::Body::new(res);
// let ret: http::Response<Bytes> = todo!();
let ret = http::Response::builder().status(500).body(bod).unwrap();
ret
}
}
@@ -268,10 +308,30 @@ fn metrics(stats_set: &StatsSet) -> String {
[s1, s2, s3, s4, s5, s6, s7].join("")
}
fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, stats_set: StatsSet) -> axum::Router {
pub struct RoutesResources {
backend: String,
worker_tx: Sender<ChannelInfoQuery>,
iqtx: InsertQueuesTx,
}
impl RoutesResources {
pub fn new(backend: String, worker_tx: Sender<ChannelInfoQuery>, iqtx: InsertQueuesTx) -> Self {
Self {
backend,
worker_tx,
iqtx,
}
}
}
fn make_routes(
rres: Arc<RoutesResources>,
dcom: Arc<DaemonComm>,
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
) -> axum::Router {
use axum::extract;
use axum::routing::get;
use axum::routing::put;
use axum::routing::{get, post, put};
use axum::Router;
use http::StatusCode;
@@ -290,12 +350,53 @@ fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, st
)
.route("/path3/", get(|| async { (StatusCode::OK, format!("Hello there!")) })),
)
.route(
"/daqingest/metrics",
get({
let stats_set = stats_set.clone();
|| async move { metrics(&stats_set) }
}),
.nest(
"/daqingest",
Router::new()
.fallback(|| async { axum::Json(json!({"subcommands":["channel", "metrics"]})) })
.nest(
"/metrics",
Router::new().fallback(|| async { StatusCode::NOT_FOUND }).route(
"/",
get({
let stats_set = stats_set.clone();
|| async move { metrics(&stats_set) }
}),
),
)
.nest(
"/channel",
Router::new()
.fallback(|| async { axum::Json(json!({"subcommands":["states"]})) })
.route(
"/states",
get({
let tx = connset_cmd_tx.clone();
|Query(params): Query<HashMap<String, String>>| status::channel_states(params, tx)
}),
)
.route(
"/add",
get({
let dcom = dcom.clone();
|Query(params): Query<HashMap<String, String>>| channel_add(params, dcom)
}),
),
)
.nest(
"/ingest",
Router::new().route(
"/v1",
post({
let rres = rres.clone();
move |(headers, params, body): (
HeaderMap,
Query<HashMap<String, String>>,
axum::body::Body,
)| { ingest::post_v01((headers, params, body), rres) }
}),
),
),
)
.route(
"/daqingest/metricbeat",
@@ -315,13 +416,6 @@ fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, st
|Query(params): Query<HashMap<String, String>>| find_channel(params, dcom)
}),
)
.route(
"/daqingest/channel/states",
get({
let tx = connset_cmd_tx.clone();
|Query(params): Query<HashMap<String, String>>| status::channel_states(params, tx)
}),
)
.route(
"/daqingest/private/channel/states",
get({
@@ -329,13 +423,6 @@ fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, st
|Query(params): Query<HashMap<String, String>>| private_channel_states(params, tx)
}),
)
.route(
"/daqingest/channel/add",
get({
let dcom = dcom.clone();
|Query(params): Query<HashMap<String, String>>| channel_add(params, dcom)
}),
)
.route(
"/daqingest/channel/remove",
get({
@@ -393,20 +480,18 @@ pub async fn metrics_service(
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
shutdown_signal: Receiver<u32>,
rres: Arc<RoutesResources>,
) -> Result<(), Error> {
info!("metrics service start {bind_to}");
let addr = bind_to.parse().map_err(Error::from_string)?;
let router = make_routes(dcom, connset_cmd_tx, stats_set).into_make_service();
axum::Server::bind(&addr)
.serve(router)
let addr: SocketAddr = bind_to.parse().map_err(Error::from_string)?;
let router = make_routes(rres, dcom, connset_cmd_tx, stats_set).into_make_service();
let listener = TcpListener::bind(addr).await?;
// into_make_service_with_connect_info
axum::serve(listener, router)
.with_graceful_shutdown(async move {
let _ = shutdown_signal.recv().await;
})
.await
.inspect(|x| {
info!("metrics service finished with {x:?}");
})
.map_err(Error::from_string)?;
.await?;
Ok(())
}

View File

@@ -0,0 +1,317 @@
use super::RoutesResources;
use axum::extract::FromRequest;
use axum::extract::Query;
use axum::http::HeaderMap;
use axum::Json;
use bytes::Bytes;
use core::fmt;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim0::EventsDim0NoPulse;
use items_2::eventsdim1::EventsDim1;
use items_2::eventsdim1::EventsDim1NoPulse;
use netpod::log::*;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use netpod::APP_CBOR_FRAMED;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::ArrayValue;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ScalarValue;
use serde::Deserialize;
use serieswriter::writer::SeriesWriter;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use streams::framed_bytes::FramedBytesStream;
use taskrun::tokio::time::timeout;
// use core::io::BorrowedBuf;
#[allow(unused)]
macro_rules! debug_setup {
($($arg:tt)*) => {
if true {
info!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_input {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_queues {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[derive(Debug, ThisError)]
pub enum Error {
UnsupportedContentType,
Logic,
SeriesWriter(#[from] serieswriter::writer::Error),
MissingChannelName,
MissingScalarType,
MissingShape,
SendError,
Decode,
FramedBytes(#[from] streams::framed_bytes::Error),
InsertQueues(#[from] scywr::insertqueues::Error),
Serde(#[from] serde_json::Error),
#[error("Parse({0})")]
Parse(String),
NotSupported,
}
pub async fn post_v01(
(headers, Query(params), body): (HeaderMap, Query<HashMap<String, String>>, axum::body::Body),
rres: Arc<RoutesResources>,
) -> Json<serde_json::Value> {
match post_v01_try(headers, params, body, rres).await {
Ok(k) => k,
Err(e) => Json(serde_json::json!({
"error": e.to_string(),
})),
}
}
async fn post_v01_try(
headers: HeaderMap,
params: HashMap<String, String>,
body: axum::body::Body,
rres: Arc<RoutesResources>,
) -> Result<Json<serde_json::Value>, Error> {
if let Some(ct) = headers.get("content-type") {
if let Ok(s) = ct.to_str() {
if s == APP_CBOR_FRAMED {
} else {
return Err(Error::UnsupportedContentType);
}
} else {
return Err(Error::UnsupportedContentType);
}
} else {
return Err(Error::UnsupportedContentType);
};
debug_setup!("params {:?}", params);
let stnow = SystemTime::now();
let worker_tx = rres.worker_tx.clone();
let backend = rres.backend.clone();
let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.into();
let s = params.get("scalarType").ok_or(Error::MissingScalarType)?;
let scalar_type = ScalarType::from_variant_str(&s).map_err(|e| Error::Parse(e.to_string()))?;
let shape: Shape = serde_json::from_str(params.get("shape").map_or("[]", |x| x.as_str()))?;
debug_setup!("parsed scalar_type {scalar_type:?}");
debug_setup!("parsed shape {shape:?}");
debug_setup!(
"establishing series writer for {:?} {:?} {:?}",
channel,
scalar_type,
shape
);
let mut writer =
SeriesWriter::establish(worker_tx, backend, channel, scalar_type.clone(), shape.clone(), stnow).await?;
debug_setup!("series writer established");
let mut iqdqs = InsertDeques::new();
let mut iqtx = rres.iqtx.clone();
let mut frames = FramedBytesStream::new(body.into_data_stream().map_err(|_| streams::framed_bytes::Error::Logic));
loop {
let x = timeout(Duration::from_millis(2000), frames.try_next()).await;
let x = match x {
Ok(x) => x,
Err(_) => {
tick_writers(&mut writer, &mut iqdqs)?;
continue;
}
};
let frame = match x? {
Some(x) => x,
None => {
trace!("input stream done");
break;
}
};
trace_input!("got frame len {}", frame.len());
let deque = &mut iqdqs.st_rf3_rx;
match &shape {
Shape::Scalar => match &scalar_type {
ScalarType::U8 => {
evpush_dim0::<u8, _>(&frame, deque, &mut writer, |x| {
DataValue::Scalar(ScalarValue::U8(x as _))
})?;
}
ScalarType::U16 => {
evpush_dim0::<u16, _>(&frame, deque, &mut writer, |x| {
DataValue::Scalar(ScalarValue::U16(x as _))
})?;
}
ScalarType::U32 => {
evpush_dim0::<u32, _>(&frame, deque, &mut writer, |x| {
DataValue::Scalar(ScalarValue::U32(x as _))
})?;
}
ScalarType::U64 => {
evpush_dim0::<u64, _>(&frame, deque, &mut writer, |x| {
DataValue::Scalar(ScalarValue::U64(x as _))
})?;
}
ScalarType::I8 => {
evpush_dim0::<i8, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I8(x)))?;
}
ScalarType::I16 => {
evpush_dim0::<i16, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I16(x)))?;
}
ScalarType::I32 => {
evpush_dim0::<i32, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I32(x)))?;
}
ScalarType::I64 => {
evpush_dim0::<i64, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I64(x)))?;
}
ScalarType::F32 => {
evpush_dim0::<f32, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::F32(x)))?;
}
ScalarType::F64 => {
evpush_dim0::<f64, _>(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::F64(x)))?;
}
ScalarType::BOOL => return Err(Error::NotSupported),
ScalarType::STRING => {
evpush_dim0::<String, _>(&frame, deque, &mut writer, |x| {
DataValue::Scalar(ScalarValue::String(x))
})?;
}
ScalarType::Enum => return Err(Error::NotSupported),
ScalarType::ChannelStatus => return Err(Error::NotSupported),
},
Shape::Wave(_) => match &scalar_type {
ScalarType::U8 => {
evpush_dim1::<u8, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U8(x)))?;
}
ScalarType::U16 => {
evpush_dim1::<u16, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U16(x)))?;
}
ScalarType::U32 => {
evpush_dim1::<u32, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U32(x)))?;
}
ScalarType::U64 => {
evpush_dim1::<u64, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U64(x)))?;
}
ScalarType::I8 => {
evpush_dim1::<i8, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I8(x)))?;
}
ScalarType::I16 => {
evpush_dim1::<i16, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I16(x)))?;
}
ScalarType::I32 => {
evpush_dim1::<i32, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I32(x)))?;
}
ScalarType::I64 => {
evpush_dim1::<i64, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I64(x)))?;
}
ScalarType::F32 => {
evpush_dim1::<f32, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::F32(x)))?;
}
ScalarType::F64 => {
evpush_dim1::<f64, _>(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::F64(x)))?;
}
ScalarType::BOOL => return Err(Error::NotSupported),
ScalarType::STRING => return Err(Error::NotSupported),
ScalarType::Enum => return Err(Error::NotSupported),
ScalarType::ChannelStatus => return Err(Error::NotSupported),
},
Shape::Image(_, _) => return Err(Error::NotSupported),
}
trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary());
iqtx.send_all(&mut iqdqs).await?;
trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary());
tick_writers(&mut writer, &mut iqdqs)?;
trace_queues!("frame tick_writers done {} {}", iqdqs.summary(), iqtx.summary());
}
trace_queues!("after send_all begin {} {}", iqdqs.summary(), iqtx.summary());
iqtx.send_all(&mut iqdqs).await?;
trace_queues!("after send_all done {} {}", iqdqs.summary(), iqtx.summary());
finish_writers(&mut writer, &mut iqdqs)?;
trace_queues!("after finish_writers done {} {}", iqdqs.summary(), iqtx.summary());
let ret = Json(serde_json::json!({}));
Ok(ret)
}
fn evpush_dim0<T, F1>(
frame: &Bytes,
deque: &mut VecDeque<QueryItem>,
writer: &mut SeriesWriter,
f1: F1,
) -> Result<(), Error>
where
T: for<'a> Deserialize<'a> + fmt::Debug + Clone,
F1: Fn(T) -> DataValue,
{
let evs: EventsDim0NoPulse<T> = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
.map_err(|_| Error::Decode)?;
let evs: EventsDim0<T> = evs.into();
trace_input!("see events {:?}", evs);
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?;
}
Ok(())
}
fn evpush_dim1<T, F1>(
frame: &Bytes,
deque: &mut VecDeque<QueryItem>,
writer: &mut SeriesWriter,
f1: F1,
) -> Result<(), Error>
where
T: for<'a> Deserialize<'a> + fmt::Debug + Clone,
F1: Fn(Vec<T>) -> DataValue,
{
let evs: EventsDim1NoPulse<T> = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
.map_err(|_| Error::Decode)?;
let evs: EventsDim1<T> = evs.into();
trace_input!("see events {:?}", evs);
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?;
}
Ok(())
}
fn tick_writers(writer: &mut SeriesWriter, deque: &mut InsertDeques) -> Result<(), Error> {
writer.tick(&mut deque.st_rf3_rx)?;
Ok(())
}
fn finish_writers(writer: &mut SeriesWriter, deque: &mut InsertDeques) -> Result<(), Error> {
writer.tick(&mut deque.st_rf3_rx)?;
Ok(())
}

View File

@@ -7,6 +7,7 @@ use serde::Serialize;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::time::SystemTime;
#[derive(Debug, Serialize)]
pub struct ChannelStates {
@@ -20,6 +21,20 @@ struct ChannelState {
archiving_configuration: ChannelConfig,
recv_count: u64,
recv_bytes: u64,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
recv_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
write_st_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
write_mt_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
write_lt_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
updated: SystemTime,
}
fn system_time_epoch(x: &SystemTime) -> bool {
*x == SystemTime::UNIX_EPOCH
}
#[derive(Debug, Serialize)]
@@ -62,6 +77,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -72,6 +92,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -85,6 +110,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -98,6 +128,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -111,6 +146,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -128,6 +168,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count,
recv_bytes,
recv_last: st6.recv_last,
write_st_last: st6.write_st_last,
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
};
states.channels.insert(k, chst);
}
@@ -138,6 +183,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count,
recv_bytes,
recv_last: st6.recv_last,
write_st_last: st6.write_st_last,
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
};
states.channels.insert(k, chst);
}
@@ -148,6 +198,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count,
recv_bytes,
recv_last: st6.recv_last,
write_st_last: st6.write_st_last,
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
};
states.channels.insert(k, chst);
}
@@ -158,6 +213,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count,
recv_bytes,
recv_last: st6.recv_last,
write_st_last: st6.write_st_last,
write_mt_last: st6.write_mt_last,
write_lt_last: st6.write_lt_last,
updated: st6.stnow,
};
states.channels.insert(k, chst);
}
@@ -174,6 +234,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -184,6 +249,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}
@@ -194,6 +264,11 @@ pub async fn channel_states(params: HashMap<String, String>, tx: Sender<CaConnSe
archiving_configuration: st1.config,
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
};
states.channels.insert(k, chst);
}

46
postingest.md Normal file
View File

@@ -0,0 +1,46 @@
# HTTP POST Ingest
Example:
```
Method: POST
Url: http://sf-ingest-mg-01.psi.ch:9009/daqingest/ingest/v1?channelName=MY:DEVICE:POS&shape=[]&scalarType=f32
Headers: Content-Type: application/cbor-framed
```
The body must be a stream of length delimited frames, where the payload of each frame is
a CBOR object.
The http body of the response then looks like this:
```txt
[CBOR-frame]
[CBOR-frame]
[CBOR-frame]
... etc
```
where each `[CBOR-frame]` looks like:
```txt
[length N of the following CBOR object: uint32 little-endian]
[reserved: 12 bytes of zero-padding]
[CBOR object: N bytes]
[padding: P zero-bytes, 0 <= P <= 7, such that (N + P) mod 8 = 0]
```
Each CBOR object must contain the timestamps (integer nanoseconds) and the values (depends on type), e.g:
```json
{
"tss": [1712100002000000000, 1712100003000000000, 1712100004000000000],
"values": [5.6, 7.8, 8.1]
}
```
## Shape of data
The `shape` URL parameter indicates whether the data is scalar or 1-dimensional,
for example `shape=[]` indicates a scalar and `shape=[4096]` indicates an array
with 4096 elements.
The shape nowadays only distinguishes between scalar and 1-dimensional, but the actual length of
the array dimension may vary from event to event and is therefore not meaningful.
Still, it doesn't hurt to pass the "typical" size of array data as parameter.

View File

@@ -21,11 +21,10 @@ to the most basic linux system libraries.
```yml
# Address to bind the HTTP API to, for runtime control and Prometheus metrics scrape:
api_bind: "0.0.0.0:3011"
# The hostname to send to channel access peers as our own hostname:
local_epics_hostname: sf-daqsync-02.psi.ch
api_bind: 0.0.0.0:3011
# The backend name to use for the channels handled by this daqingest instance:
backend: scylla
channels: directory-name-with-channel-config-files
# Addresses to use for channel access search:
search:
- "172.26.0.255"
@@ -35,19 +34,30 @@ search:
postgresql:
host: postgresql-host
port: 5432
user: database-username
user: the-username
pass: the-password
name: the-database-name
scylla:
name: the-database
scylla_st:
keyspace: backend_st
hosts:
- "sf-nube-11:19042"
- "sf-nube-12:19042"
- "sf-nube-13:19042"
- "sf-nube-14:19042"
keyspace: ks1
channels:
- "SOME-CHANNEL:1"
- "OTHER-CHANNEL:2"
- sf-nube-11:19042
- sf-nube-12:19042
- sf-nube-13:19042
- sf-nube-14:19042
scylla_mt:
keyspace: backend_mt
hosts:
- sf-nube-11:19042
- sf-nube-12:19042
- sf-nube-13:19042
- sf-nube-14:19042
scylla_lt:
keyspace: backend_lt
hosts:
- sf-nube-11:19042
- sf-nube-12:19042
- sf-nube-13:19042
- sf-nube-14:19042
```
@@ -61,3 +71,8 @@ as configured by the `api_bind` parameter.
```txt
http://<api_bind>/daqingest/channel/state?name=[...]
```
# HTTP POST ingest
It is possible to [ingest](postingest.md) data via the `api_bind` socket address.

View File

@@ -2,9 +2,11 @@ use crate::iteminsertqueue::QueryItem;
use crate::senderpolling::SenderPolling;
use async_channel::Receiver;
use async_channel::Sender;
use core::fmt;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use pin_project::pin_project;
use std::collections::VecDeque;
use std::pin::Pin;
@@ -12,6 +14,8 @@ use std::pin::Pin;
#[derive(Debug, ThisError)]
pub enum Error {
QueuePush,
#[error("ChannelSend({0}, {1})")]
ChannelSend(RetentionTime, u8),
}
#[derive(Clone)]
@@ -24,22 +28,72 @@ pub struct InsertQueuesTx {
impl InsertQueuesTx {
/// Send all accumulated batches
pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), ()> {
pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
// Send each buffer down the corresponding channel
let item = core::mem::replace(&mut iqdqs.st_rf1_rx, VecDeque::new());
self.st_rf1_tx.send(item).await.map_err(|_| ())?;
let item = core::mem::replace(&mut iqdqs.st_rf3_rx, VecDeque::new());
self.st_rf3_tx.send(item).await.map_err(|_| ())?;
let item = core::mem::replace(&mut iqdqs.mt_rf3_rx, VecDeque::new());
self.mt_rf3_tx.send(item).await.map_err(|_| ())?;
let item = core::mem::replace(&mut iqdqs.lt_rf3_rx, VecDeque::new());
self.lt_rf3_tx.send(item).await.map_err(|_| ())?;
if false {
let item = core::mem::replace(&mut iqdqs.st_rf1_rx, VecDeque::new());
self.st_rf1_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Short, 1))?;
}
{
let item = core::mem::replace(&mut iqdqs.st_rf3_rx, VecDeque::new());
self.st_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Short, 3))?;
}
{
let item = core::mem::replace(&mut iqdqs.mt_rf3_rx, VecDeque::new());
self.mt_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Medium, 3))?;
}
{
let item = core::mem::replace(&mut iqdqs.lt_rf3_rx, VecDeque::new());
self.lt_rf3_tx
.send(item)
.await
.map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?;
}
Ok(())
}
pub fn clone2(&self) -> Self {
self.clone()
}
pub fn summary(&self) -> InsertQueuesTxSummary {
InsertQueuesTxSummary { obj: self }
}
}
pub struct InsertQueuesTxSummary<'a> {
obj: &'a InsertQueuesTx,
}
impl<'a> fmt::Display for InsertQueuesTxSummary<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let obj = self.obj;
write!(
fmt,
"InsertQueuesTx {{ st_rf1_tx: {} {} {}, st_rf3_tx: {} {} {}, mt_rf3_tx: {} {} {}, lt_rf3_tx: {} {} {} }}",
obj.st_rf1_tx.is_closed(),
obj.st_rf1_tx.is_full(),
obj.st_rf1_tx.len(),
obj.st_rf3_tx.is_closed(),
obj.st_rf3_tx.is_full(),
obj.st_rf3_tx.len(),
obj.mt_rf3_tx.is_closed(),
obj.mt_rf3_tx.is_full(),
obj.mt_rf3_tx.len(),
obj.lt_rf3_tx.is_closed(),
obj.lt_rf3_tx.is_full(),
obj.lt_rf3_tx.len(),
)
}
}
#[derive(Clone)]
@@ -72,7 +126,6 @@ impl InsertDeques {
self.st_rf1_rx.len() + self.st_rf3_rx.len() + self.mt_rf3_rx.len() + self.lt_rf3_rx.len()
}
///
pub fn clear(&mut self) {
self.st_rf1_rx.clear();
self.st_rf3_rx.clear();
@@ -80,14 +133,8 @@ impl InsertDeques {
self.lt_rf3_rx.clear();
}
pub fn log_summary(&self) {
let summ = InsertDequesSummary {
st_rf1_len: self.st_rf1_rx.len(),
st_rf3_len: self.st_rf3_rx.len(),
mt_rf3_len: self.mt_rf3_rx.len(),
lt_rf3_len: self.lt_rf3_rx.len(),
};
info!("{summ:?}");
pub fn summary(&self) -> InsertDequesSummary {
InsertDequesSummary { obj: self }
}
// Should be used only for connection and channel status items.
@@ -98,13 +145,22 @@ impl InsertDeques {
}
}
#[derive(Debug)]
#[allow(unused)]
struct InsertDequesSummary {
st_rf1_len: usize,
st_rf3_len: usize,
mt_rf3_len: usize,
lt_rf3_len: usize,
pub struct InsertDequesSummary<'a> {
obj: &'a InsertDeques,
}
impl<'a> fmt::Display for InsertDequesSummary<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let obj = self.obj;
write!(
fmt,
"InsertDeques {{ st_rf1_len: {}, st_rf3_len: {}, mt_rf3_len: {}, lt_rf3_len: {} }}",
obj.st_rf1_rx.len(),
obj.st_rf3_rx.len(),
obj.mt_rf3_rx.len(),
obj.lt_rf3_rx.len()
)
}
}
#[pin_project]
@@ -156,22 +212,29 @@ impl InsertSenderPolling {
unsafe { self.map_unchecked_mut(|x| &mut x.st_rf1_sp) }
}
pub fn log_summary(&self) {
let summ = InsertSenderPollingSummary {
st_rf1_idle: self.st_rf1_sp.is_idle(),
st_rf3_idle: self.st_rf3_sp.is_idle(),
mt_rf3_idle: self.mt_rf3_sp.is_idle(),
lt_rf3_idle: self.lt_rf3_sp.is_idle(),
};
info!("{summ:?}");
pub fn summary(&self) -> InsertSenderPollingSummary {
InsertSenderPollingSummary { obj: self }
}
}
#[derive(Debug)]
#[allow(unused)]
struct InsertSenderPollingSummary {
st_rf1_idle: bool,
st_rf3_idle: bool,
mt_rf3_idle: bool,
lt_rf3_idle: bool,
pub struct InsertSenderPollingSummary<'a> {
obj: &'a InsertSenderPolling,
}
impl<'a> fmt::Display for InsertSenderPollingSummary<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let obj = self.obj;
write!(
fmt,
"InsertSenderPolling {{ st_rf1_idle_len: {:?} {:?}, st_rf3_idle_len: {:?} {:?}, mt_rf3_idle_len: {:?} {:?}, lt_rf3_idle_len: {:?} {:?} }}",
obj.st_rf1_sp.is_idle(),
obj.st_rf1_sp.len(),
obj.st_rf3_sp.is_idle(),
obj.st_rf3_sp.len(),
obj.mt_rf3_sp.is_idle(),
obj.mt_rf3_sp.len(),
obj.lt_rf3_sp.is_idle(),
obj.lt_rf3_sp.len(),
)
}
}

View File

@@ -3,7 +3,6 @@ use crate::iteminsertqueue::insert_channel_status;
use crate::iteminsertqueue::insert_channel_status_fut;
use crate::iteminsertqueue::insert_connection_status;
use crate::iteminsertqueue::insert_connection_status_fut;
use crate::iteminsertqueue::insert_item;
use crate::iteminsertqueue::insert_item_fut;
use crate::iteminsertqueue::insert_msp_fut;
use crate::iteminsertqueue::Accounting;
@@ -35,7 +34,7 @@ use tokio::task::JoinHandle;
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
if true {
if false {
trace!($($arg)*);
}
};
@@ -44,7 +43,7 @@ macro_rules! trace2 {
#[allow(unused)]
macro_rules! trace3 {
($($arg:tt)*) => {
if true {
if false {
trace!($($arg)*);
}
};
@@ -53,7 +52,16 @@ macro_rules! trace3 {
#[allow(unused)]
macro_rules! trace_item_execute {
($($arg:tt)*) => {
if true {
if false {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! debug_setup {
($($arg:tt)*) => {
if false {
debug!($($arg)*);
}
};
@@ -181,86 +189,6 @@ pub async fn spawn_scylla_insert_workers_dummy(
Ok(jhs)
}
#[allow(unused)]
async fn worker_unused(
worker_ix: usize,
item_inp: Receiver<QueryItem>,
insert_worker_opts: Arc<InsertWorkerOpts>,
data_store: Arc<DataStore>,
stats: Arc<InsertWorkerStats>,
) -> Result<(), Error> {
stats.worker_start().inc();
insert_worker_opts
.insert_workers_running
.fetch_add(1, atomic::Ordering::AcqRel);
let backoff_0 = Duration::from_millis(10);
let mut backoff = backoff_0.clone();
let mut i1 = 0;
loop {
let item = if let Ok(item) = item_inp.recv().await {
stats.item_recv.inc();
item
} else {
break;
};
match item {
QueryItem::ConnectionStatus(item) => match insert_connection_status(item, &data_store).await {
Ok(_) => {
stats.inserted_connection_status().inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
},
QueryItem::ChannelStatus(item) => match insert_channel_status(item, &data_store).await {
Ok(_) => {
stats.inserted_channel_status().inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
},
QueryItem::Insert(item) => {
let tsnow = TsMs::from_system_time(SystemTime::now());
let item_ts_net = item.ts_net.clone();
let dt = tsnow.to_u64().saturating_sub(item_ts_net.to_u64()) as u32;
stats.item_lat_net_worker().ingest(dt);
let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire);
let do_insert = i1 % 1000 < insert_frac;
match insert_item(item, &data_store, do_insert, &stats).await {
Ok(_) => {
stats.inserted_values().inc();
let tsnow = TsMs::from_system_time(SystemTime::now());
let dt = tsnow.to_u64().saturating_sub(item_ts_net.to_u64()) as u32;
stats.item_lat_net_store().ingest(dt);
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
}
i1 += 1;
}
QueryItem::TimeBinSimpleF32(item) => {
info!("have time bin patch to insert: {item:?}");
return Err(Error::with_msg_no_trace("TODO insert item old path"));
}
QueryItem::Accounting(..) => {}
}
}
stats.worker_finish().inc();
insert_worker_opts
.insert_workers_running
.fetch_sub(1, atomic::Ordering::AcqRel);
trace2!("insert worker {worker_ix} done");
Ok(())
}
async fn worker_streamed(
worker_ix: usize,
concurrency: usize,
@@ -269,7 +197,7 @@ async fn worker_streamed(
data_store: Option<Arc<DataStore>>,
stats: Arc<InsertWorkerStats>,
) -> Result<(), Error> {
trace!("worker_streamed begin");
debug_setup!("worker_streamed begin");
stats.worker_start().inc();
insert_worker_opts
.insert_workers_running
@@ -290,7 +218,9 @@ async fn worker_streamed(
// })
.buffer_unordered(concurrency);
let mut stream = Box::pin(stream);
debug_setup!("waiting for item");
while let Some(item) = stream.next().await {
trace_item_execute!("see item");
match item {
Ok(_) => {
stats.inserted_values().inc();
@@ -321,7 +251,7 @@ async fn worker_streamed(
insert_worker_opts
.insert_workers_running
.fetch_sub(1, atomic::Ordering::AcqRel);
trace2!("insert worker {worker_ix} done");
debug_setup!("insert worker {worker_ix} done");
Ok(())
}
@@ -386,7 +316,7 @@ fn inspect_items(
trace_item_execute!("execute {worker_name} TimeBinSimpleF32");
}
QueryItem::Accounting(x) => {
if x.series.id() & 0x7f == 77 {
if x.series.id() & 0x7f == 200 {
debug!("execute {worker_name} Accounting {item:?}");
} else {
trace_item_execute!("execute {worker_name} Accounting {item:?}");

View File

@@ -48,13 +48,17 @@ pub enum Error {
#[derive(Clone, Debug, PartialEq)]
pub enum ScalarValue {
U8(u8),
U16(u16),
U32(u32),
U64(u64),
I8(i8),
I16(i16),
I32(i32),
I64(i64),
F32(f32),
F64(f64),
Enum(i16),
Enum(i16, String),
String(String),
Bool(bool),
}
@@ -62,13 +66,17 @@ pub enum ScalarValue {
impl ScalarValue {
pub fn byte_size(&self) -> u32 {
match self {
ScalarValue::U8(_) => 1,
ScalarValue::U16(_) => 1,
ScalarValue::U32(_) => 1,
ScalarValue::U64(_) => 1,
ScalarValue::I8(_) => 1,
ScalarValue::I16(_) => 2,
ScalarValue::I32(_) => 4,
ScalarValue::I64(_) => 8,
ScalarValue::F32(_) => 4,
ScalarValue::F64(_) => 8,
ScalarValue::Enum(_) => 2,
ScalarValue::Enum(_, y) => 2 + y.len() as u32,
ScalarValue::String(x) => x.len() as u32,
ScalarValue::Bool(_) => 1,
}
@@ -76,13 +84,17 @@ impl ScalarValue {
pub fn string_short(&self) -> String {
match self {
ScalarValue::U8(x) => x.to_string(),
ScalarValue::U16(x) => x.to_string(),
ScalarValue::U32(x) => x.to_string(),
ScalarValue::U64(x) => x.to_string(),
ScalarValue::I8(x) => x.to_string(),
ScalarValue::I16(x) => x.to_string(),
ScalarValue::I32(x) => x.to_string(),
ScalarValue::I64(x) => x.to_string(),
ScalarValue::F32(x) => x.to_string(),
ScalarValue::F64(x) => x.to_string(),
ScalarValue::Enum(x) => x.to_string(),
ScalarValue::Enum(x, y) => format!("({}, {})", x, y),
ScalarValue::String(x) => x.to_string(),
ScalarValue::Bool(x) => x.to_string(),
}
@@ -91,9 +103,14 @@ impl ScalarValue {
#[derive(Clone, Debug, PartialEq)]
pub enum ArrayValue {
U8(Vec<u8>),
U16(Vec<u16>),
U32(Vec<u32>),
U64(Vec<u64>),
I8(Vec<i8>),
I16(Vec<i16>),
I32(Vec<i32>),
I64(Vec<i64>),
F32(Vec<f32>),
F64(Vec<f64>),
Bool(Vec<bool>),
@@ -103,9 +120,14 @@ impl ArrayValue {
pub fn len(&self) -> usize {
use ArrayValue::*;
match self {
U8(a) => a.len(),
U16(a) => a.len(),
U32(a) => a.len(),
U64(a) => a.len(),
I8(a) => a.len(),
I16(a) => a.len(),
I32(a) => a.len(),
I64(a) => a.len(),
F32(a) => a.len(),
F64(a) => a.len(),
Bool(a) => a.len(),
@@ -115,9 +137,14 @@ impl ArrayValue {
pub fn byte_size(&self) -> u32 {
use ArrayValue::*;
match self {
U8(a) => 1 * a.len() as u32,
U16(a) => 2 * a.len() as u32,
U32(a) => 4 * a.len() as u32,
U64(a) => 8 * a.len() as u32,
I8(a) => 1 * a.len() as u32,
I16(a) => 2 * a.len() as u32,
I32(a) => 4 * a.len() as u32,
I64(a) => 8 * a.len() as u32,
F32(a) => 4 * a.len() as u32,
F64(a) => 8 * a.len() as u32,
Bool(a) => 1 * a.len() as u32,
@@ -127,6 +154,50 @@ impl ArrayValue {
pub fn to_binary_blob(&self) -> Vec<u8> {
use ArrayValue::*;
match self {
U8(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
for _ in 0..4 {
blob.put_u64_le(0);
}
for &x in a {
blob.put_u8(x);
}
blob
}
U16(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
for _ in 0..4 {
blob.put_u64_le(0);
}
for &x in a {
blob.put_u16_le(x);
}
blob
}
U32(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
for _ in 0..4 {
blob.put_u64_le(0);
}
for &x in a {
blob.put_u32_le(x);
}
blob
}
U64(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
for _ in 0..4 {
blob.put_u64_le(0);
}
for &x in a {
blob.put_u64_le(x);
}
blob
}
I8(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
@@ -160,6 +231,17 @@ impl ArrayValue {
}
blob
}
I64(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
for _ in 0..4 {
blob.put_u64_le(0);
}
for &x in a {
blob.put_i64_le(x);
}
blob
}
F32(a) => {
let n = self.byte_size();
let mut blob = Vec::with_capacity(32 + n as usize);
@@ -200,9 +282,14 @@ impl ArrayValue {
pub fn string_short(&self) -> String {
use ArrayValue::*;
match self {
U8(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
U16(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
U32(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
U64(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
I8(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
I16(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
I32(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
I64(x) => format!("{}", x.get(0).map_or(0, |x| *x)),
F32(x) => format!("{}", x.get(0).map_or(0., |x| *x)),
F64(x) => format!("{}", x.get(0).map_or(0., |x| *x)),
Bool(x) => format!("{}", x.get(0).map_or(false, |x| *x)),
@@ -227,20 +314,29 @@ impl DataValue {
pub fn scalar_type(&self) -> ScalarType {
match self {
DataValue::Scalar(x) => match x {
ScalarValue::U8(_) => ScalarType::U8,
ScalarValue::U16(_) => ScalarType::U16,
ScalarValue::U32(_) => ScalarType::U32,
ScalarValue::U64(_) => ScalarType::U64,
ScalarValue::I8(_) => ScalarType::I8,
ScalarValue::I16(_) => ScalarType::I16,
ScalarValue::I32(_) => ScalarType::I32,
ScalarValue::I64(_) => ScalarType::I64,
ScalarValue::F32(_) => ScalarType::F32,
ScalarValue::F64(_) => ScalarType::F64,
ScalarValue::Enum(_) => ScalarType::U16,
ScalarValue::Enum(_, _) => ScalarType::Enum,
ScalarValue::String(_) => ScalarType::STRING,
ScalarValue::Bool(_) => ScalarType::BOOL,
},
DataValue::Array(x) => match x {
ArrayValue::U8(_) => ScalarType::U8,
ArrayValue::U16(_) => ScalarType::U16,
ArrayValue::U32(_) => ScalarType::U32,
ArrayValue::U64(_) => ScalarType::U64,
ArrayValue::I8(_) => ScalarType::I8,
ArrayValue::I16(_) => ScalarType::I16,
ArrayValue::I32(_) => ScalarType::I32,
ArrayValue::I64(_) => ScalarType::I64,
ArrayValue::F32(_) => ScalarType::F32,
ArrayValue::F64(_) => ScalarType::F64,
ArrayValue::Bool(_) => ScalarType::BOOL,
@@ -471,7 +567,8 @@ impl ChannelStatus {
#[derive(Debug, Clone)]
pub enum ShutdownReason {
ConnectFail,
ConnectRefused,
ConnectTimeout,
IoError,
ShutdownCommand,
InternalError,
@@ -646,143 +743,6 @@ impl Future for InsertFut {
}
}
async fn insert_scalar_gen<ST>(
par: InsParCom,
val: ST,
qu: &PreparedStatement,
data_store: &DataStore,
) -> Result<(), Error>
where
ST: Value + SerializeCql,
{
let params = (
par.series.to_i64(),
par.ts_msp.to_i64(),
par.ts_lsp.to_i64(),
par.ts_alt_1.ns() as i64,
par.pulse as i64,
val,
);
if par.do_insert {
let y = data_store.scy.execute(qu, params).await;
match y {
Ok(_) => Ok(()),
Err(e) => match e {
QueryError::TimeoutError => Err(Error::DbTimeout),
// TODO use `msg`
QueryError::DbError(e, _msg) => match e {
DbError::Overloaded => Err(Error::DbOverload),
_ => Err(e.into()),
},
_ => Err(e.into()),
},
}
} else {
Ok(())
}
}
async fn insert_array_gen<ST>(
par: InsParCom,
val: Vec<ST>,
qu: &PreparedStatement,
data_store: &DataStore,
) -> Result<(), Error>
where
ST: Value + SerializeCql,
{
if par.do_insert {
let params = (
par.series.to_i64(),
par.ts_msp.to_i64(),
par.ts_lsp.to_i64(),
par.ts_alt_1.ns() as i64,
par.pulse as i64,
val,
);
let y = data_store.scy.execute(qu, params).await;
match y {
Ok(_) => Ok(()),
Err(e) => match e {
QueryError::TimeoutError => Err(Error::DbTimeout),
// TODO use `msg`
QueryError::DbError(e, _msg) => match e {
DbError::Overloaded => Err(Error::DbOverload),
_ => Err(e.into()),
},
_ => Err(e.into()),
},
}
} else {
Ok(())
}
}
// TODO currently not in use, anything to merge?
pub async fn insert_item(
item: InsertItem,
data_store: &DataStore,
do_insert: bool,
stats: &Arc<InsertWorkerStats>,
) -> Result<(), Error> {
if item.msp_bump {
let params = (item.series.id() as i64, item.ts_msp.to_i64());
data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?;
stats.inserts_msp().inc();
}
use DataValue::*;
match item.val {
Scalar(val) => {
let par = InsParCom {
series: item.series,
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
ts_net: item.ts_net,
ts_alt_1: item.ts_alt_1,
pulse: item.pulse,
do_insert,
stats: stats.clone(),
};
use ScalarValue::*;
match val {
I8(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i8, &data_store).await?,
I16(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?,
Enum(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?,
I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?,
I64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i64, &data_store).await?,
F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?,
F64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f64, &data_store).await?,
String(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_string, &data_store).await?,
Bool(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_bool, &data_store).await?,
}
}
Array(val) => {
let par = InsParCom {
series: item.series,
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
ts_net: item.ts_net,
ts_alt_1: item.ts_alt_1,
pulse: item.pulse,
do_insert,
stats: stats.clone(),
};
err::todo();
use ArrayValue::*;
match val {
I8(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i8, &data_store).await?,
I16(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i16, &data_store).await?,
I32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i32, &data_store).await?,
F32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f32, &data_store).await?,
F64(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f64, &data_store).await?,
Bool(val) => insert_array_gen(par, val, &data_store.qu_insert_array_bool, &data_store).await?,
}
}
}
stats.inserts_value().inc();
Ok(())
}
pub fn insert_msp_fut(
series: SeriesId,
ts_msp: TsMs,
@@ -818,13 +778,17 @@ pub fn insert_item_fut(
};
use ScalarValue::*;
match val {
U8(val) => insert_scalar_gen_fut(par, val as i8, data_store.qu_insert_scalar_u8.clone(), scy),
U16(val) => insert_scalar_gen_fut(par, val as i16, data_store.qu_insert_scalar_u16.clone(), scy),
U32(val) => insert_scalar_gen_fut(par, val as i32, data_store.qu_insert_scalar_u32.clone(), scy),
U64(val) => insert_scalar_gen_fut(par, val as i64, data_store.qu_insert_scalar_u64.clone(), scy),
I8(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i8.clone(), scy),
I16(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy),
I32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i32.clone(), scy),
I64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i64.clone(), scy),
F32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f32.clone(), scy),
F64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f64.clone(), scy),
Enum(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy),
Enum(a, b) => insert_scalar_gen_fut(par, a, data_store.qu_insert_scalar_i16.clone(), scy),
String(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_string.clone(), scy),
Bool(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_bool.clone(), scy),
}
@@ -844,9 +808,14 @@ pub fn insert_item_fut(
let blob = val.to_binary_blob();
#[allow(unused)]
match val {
U8(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u8.clone(), scy),
U16(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u16.clone(), scy),
U32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u32.clone(), scy),
U64(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u64.clone(), scy),
I8(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i8.clone(), scy),
I16(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i16.clone(), scy),
I32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i32.clone(), scy),
I64(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i64.clone(), scy),
F32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_f32.clone(), scy),
F64(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_f64.clone(), scy),
Bool(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_bool.clone(), scy),

View File

@@ -20,6 +20,10 @@ pub struct DataStore {
pub rett: RetentionTime,
pub scy: Arc<ScySession>,
pub qu_insert_ts_msp: Arc<PreparedStatement>,
pub qu_insert_scalar_u8: Arc<PreparedStatement>,
pub qu_insert_scalar_u16: Arc<PreparedStatement>,
pub qu_insert_scalar_u32: Arc<PreparedStatement>,
pub qu_insert_scalar_u64: Arc<PreparedStatement>,
pub qu_insert_scalar_i8: Arc<PreparedStatement>,
pub qu_insert_scalar_i16: Arc<PreparedStatement>,
pub qu_insert_scalar_i32: Arc<PreparedStatement>,
@@ -28,6 +32,10 @@ pub struct DataStore {
pub qu_insert_scalar_f64: Arc<PreparedStatement>,
pub qu_insert_scalar_bool: Arc<PreparedStatement>,
pub qu_insert_scalar_string: Arc<PreparedStatement>,
pub qu_insert_array_u8: Arc<PreparedStatement>,
pub qu_insert_array_u16: Arc<PreparedStatement>,
pub qu_insert_array_u32: Arc<PreparedStatement>,
pub qu_insert_array_u64: Arc<PreparedStatement>,
pub qu_insert_array_i8: Arc<PreparedStatement>,
pub qu_insert_array_i16: Arc<PreparedStatement>,
pub qu_insert_array_i32: Arc<PreparedStatement>,
@@ -100,6 +108,10 @@ impl DataStore {
.await?;
let qu_insert_ts_msp = Arc::new(q);
let qu_insert_scalar_u8 = prep_qu_ins_a!("events_scalar_u8", rett, scy);
let qu_insert_scalar_u16 = prep_qu_ins_a!("events_scalar_u16", rett, scy);
let qu_insert_scalar_u32 = prep_qu_ins_a!("events_scalar_u32", rett, scy);
let qu_insert_scalar_u64 = prep_qu_ins_a!("events_scalar_u64", rett, scy);
let qu_insert_scalar_i8 = prep_qu_ins_a!("events_scalar_i8", rett, scy);
let qu_insert_scalar_i16 = prep_qu_ins_a!("events_scalar_i16", rett, scy);
let qu_insert_scalar_i32 = prep_qu_ins_a!("events_scalar_i32", rett, scy);
@@ -109,7 +121,10 @@ impl DataStore {
let qu_insert_scalar_bool = prep_qu_ins_a!("events_scalar_bool", rett, scy);
let qu_insert_scalar_string = prep_qu_ins_a!("events_scalar_string", rett, scy);
// array
let qu_insert_array_u8 = prep_qu_ins_b!("events_array_u8", rett, scy);
let qu_insert_array_u16 = prep_qu_ins_b!("events_array_u16", rett, scy);
let qu_insert_array_u32 = prep_qu_ins_b!("events_array_u32", rett, scy);
let qu_insert_array_u64 = prep_qu_ins_b!("events_array_u64", rett, scy);
let qu_insert_array_i8 = prep_qu_ins_b!("events_array_i8", rett, scy);
let qu_insert_array_i16 = prep_qu_ins_b!("events_array_i16", rett, scy);
let qu_insert_array_i32 = prep_qu_ins_b!("events_array_i32", rett, scy);
@@ -172,6 +187,10 @@ impl DataStore {
rett,
scy,
qu_insert_ts_msp,
qu_insert_scalar_u8,
qu_insert_scalar_u16,
qu_insert_scalar_u32,
qu_insert_scalar_u64,
qu_insert_scalar_i8,
qu_insert_scalar_i16,
qu_insert_scalar_i32,
@@ -180,6 +199,10 @@ impl DataStore {
qu_insert_scalar_f64,
qu_insert_scalar_bool,
qu_insert_scalar_string,
qu_insert_array_u8,
qu_insert_array_u16,
qu_insert_array_u32,
qu_insert_array_u64,
qu_insert_array_i8,
qu_insert_array_i16,
qu_insert_array_i32,

View File

@@ -19,7 +19,7 @@ pub struct PatchCollect {
impl PatchCollect {
pub fn new(bin_len: TsNano, bin_count: u64) -> Self {
Self {
patch_len: TsNano(bin_len.0 * bin_count),
patch_len: TsNano::from_ns(bin_len.ns() * bin_count),
bin_len,
bin_count,
coll: None,
@@ -68,13 +68,13 @@ impl PatchCollect {
for (i2, (ts1, ts2)) in ts1s.iter().zip(ts2s).enumerate() {
info!("EDGE {}", ts1 / SEC);
if self.locked {
if ts2 % self.patch_len.0 == 0 {
if ts2 % self.patch_len.ns() == 0 {
info!("FOUND PATCH EDGE-END at {}", ts2 / SEC);
i3 = i2 + 1;
emit = true;
}
} else {
if ts1 % self.patch_len.0 == 0 {
if ts1 % self.patch_len.ns() == 0 {
info!("FOUND PATCH EDGE-BEG at {}", ts1 / SEC);
self.locked = true;
i3 = i2;

View File

@@ -124,9 +124,9 @@ impl RtWriter {
ts_local: TsNano,
val: DataValue,
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
) -> Result<((bool, bool, bool),), Error> {
let sid = self.sid;
Self::write_inner(
let (did_write_st,) = Self::write_inner(
"ST",
self.min_quiets.st,
&mut self.state_st,
@@ -136,7 +136,7 @@ impl RtWriter {
val.clone(),
sid,
)?;
Self::write_inner(
let (did_write_mt,) = Self::write_inner(
"MT",
self.min_quiets.mt,
&mut self.state_mt,
@@ -146,7 +146,7 @@ impl RtWriter {
val.clone(),
sid,
)?;
Self::write_inner(
let (did_write_lt,) = Self::write_inner(
"LT",
self.min_quiets.lt,
&mut self.state_lt,
@@ -156,7 +156,7 @@ impl RtWriter {
val.clone(),
sid,
)?;
Ok(())
Ok(((did_write_st, did_write_mt, did_write_lt),))
}
fn write_inner(
@@ -168,7 +168,7 @@ impl RtWriter {
ts_local: TsNano,
val: DataValue,
sid: SeriesId,
) -> Result<(), Error> {
) -> Result<(bool,), Error> {
// Decide whether we want to write.
// Use the IOC time for the decision whether to write.
// But use the ingest local time as the primary index.
@@ -200,7 +200,7 @@ impl RtWriter {
});
state.writer.write(ts_ioc, ts_local, val.clone(), deque)?;
}
Ok(())
Ok((do_write,))
}
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {

View File

@@ -432,7 +432,7 @@ fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque<Query
let bin_len_sec = (pc.bin_len().ns() / MS);
let bin_count = pc.bin_count();
let off = ts0 / pc.patch_len().0;
let off = ts0 / pc.patch_len().ns();
let off_msp = off / 1000;
let off_lsp = off % 1000;
// let item = TimeBinSimpleF32 {

View File

@@ -409,7 +409,6 @@ stats_proc::stats_struct!((
ca_conn_eos_unexpected,
response_tx_fail,
try_push_ca_conn_cmds_sent,
try_push_ca_conn_cmds_full,
try_push_ca_conn_cmds_closed,
logic_error,
logic_issue,
@@ -451,6 +450,7 @@ stats_proc::stats_struct!((
name(SeriesByChannelStats),
prefix(seriesbychannel),
counters(res_tx_fail, res_tx_timeout, recv_batch, recv_items,),
histolog2s(commit_duration_ms),
),
stats_struct(
name(InsertWorkerStats),