This commit is contained in:
Dominik Werder
2024-01-12 20:09:09 +01:00
parent 0b4a5c0a34
commit bbc2855767
7 changed files with 504 additions and 258 deletions
+223 -107
View File
@@ -176,6 +176,9 @@ struct Cid(pub u32);
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct Subid(pub u32);
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct Sid(pub u32);
#[derive(Clone, Debug)]
enum ChannelError {
CreateChanFail(ChannelStatusSeriesId),
@@ -199,11 +202,7 @@ enum MonitoringState {
struct CreatedState {
cssid: ChannelStatusSeriesId,
cid: Cid,
sid: u32,
data_type: u16,
data_count: u32,
scalar_type: ScalarType,
shape: Shape,
sid: Sid,
#[allow(unused)]
ts_created: Instant,
ts_alive_last: Instant,
@@ -224,11 +223,7 @@ impl Default for CreatedState {
Self {
cssid: ChannelStatusSeriesId::new(123123),
cid: Cid(123123),
sid: 123123,
data_type: 4242,
data_count: 42,
scalar_type: ScalarType::U8,
shape: Shape::Scalar,
sid: Sid(123123),
ts_created: Instant::now(),
ts_alive_last: Instant::now(),
state: MonitoringState::FetchSeriesId,
@@ -579,6 +574,7 @@ pub struct CaConn {
cid_by_subid: HashMap<Subid, Cid>,
name_by_cid: HashMap<Cid, String>,
channel_status_emit_last: Instant,
tick_last_writer: Instant,
init_state_count: u64,
insert_item_queue: VecDeque<QueryItem>,
remote_addr_dbg: SocketAddrV4,
@@ -605,6 +601,7 @@ pub struct CaConn {
writer_establish_tx: Pin<Box<SenderPolling<EstablishWorkerJob>>>,
writer_tx: Sender<(JobId, Result<SeriesWriter, serieswriter::writer::Error>)>,
writer_rx: Pin<Box<Receiver<(JobId, Result<SeriesWriter, serieswriter::writer::Error>)>>>,
tmp_ts_poll: SystemTime,
}
impl Drop for CaConn {
@@ -625,13 +622,14 @@ impl CaConn {
ca_proto_stats: Arc<CaProtoStats>,
writer_establish_tx: Sender<EstablishWorkerJob>,
) -> Self {
let tsnow = Instant::now();
let (writer_tx, writer_rx) = async_channel::bounded(32);
let (cq_tx, cq_rx) = async_channel::bounded(32);
let mut rng = stats::xoshiro_from_time();
Self {
opts,
backend,
state: CaConnState::Unconnected(Instant::now()),
state: CaConnState::Unconnected(tsnow),
ticker: Self::new_self_ticker(),
proto: None,
cid_store: CidStore::new_from_time(),
@@ -641,7 +639,8 @@ impl CaConn {
cid_by_name: BTreeMap::new(),
cid_by_subid: HashMap::new(),
name_by_cid: HashMap::new(),
channel_status_emit_last: Instant::now(),
channel_status_emit_last: tsnow,
tick_last_writer: tsnow,
insert_item_queue: VecDeque::new(),
remote_addr_dbg,
local_epics_hostname,
@@ -653,8 +652,8 @@ impl CaConn {
conn_backoff_beg: 0.02,
inserts_counter: 0,
extra_inserts_conf: ExtraInsertsConf::new(),
ioc_ping_last: Instant::now(),
ioc_ping_next: Instant::now() + Self::ioc_ping_ivl_rng(&mut rng),
ioc_ping_last: tsnow,
ioc_ping_next: tsnow + Self::ioc_ping_ivl_rng(&mut rng),
ioc_ping_start: None,
storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)),
ca_conn_event_out_queue: VecDeque::new(),
@@ -667,6 +666,7 @@ impl CaConn {
writer_establish_tx: Box::pin(SenderPolling::new(writer_establish_tx)),
writer_tx,
writer_rx: Box::pin(writer_rx),
tmp_ts_poll: SystemTime::now(),
}
}
@@ -716,7 +716,7 @@ impl CaConn {
let addr = self.remote_addr_dbg.clone();
self.insert_item_queue
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: SystemTime::now(),
ts: self.tmp_ts_poll,
addr,
// TODO map to appropriate status
status: ConnectionStatus::Closing,
@@ -925,7 +925,7 @@ impl CaConn {
self.stats.get_series_id_ok.inc();
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: SystemTime::now(),
ts: self.tmp_ts_poll,
cssid: st2.cssid.clone(),
status: ChannelStatus::Opened,
});
@@ -942,7 +942,7 @@ impl CaConn {
let data_type_asked = data_type + 14;
debug!("send out EventAdd for {cid:?}");
let ty = CaMsgTy::EventAdd(EventAdd {
sid: st2.sid,
sid: st2.sid.0,
data_type: data_type_asked,
data_count: wr.shape().to_ca_count()? as _,
subid: subid.0,
@@ -1077,7 +1077,7 @@ impl CaConn {
ChannelState::Writable(st2) => {
let cssid = st2.created.cssid.clone();
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: SystemTime::now(),
ts: self.tmp_ts_poll,
cssid: cssid.clone(),
status: ChannelStatus::Closed(channel_reason.clone()),
});
@@ -1144,7 +1144,7 @@ impl CaConn {
}
fn emit_channel_info_insert_items(&mut self) -> Result<(), Error> {
let timenow = SystemTime::now();
let timenow = self.tmp_ts_poll;
for (_, st) in &mut self.channels {
match st {
ChannelState::Init(_cssid) => {
@@ -1209,14 +1209,12 @@ impl CaConn {
// return Err(Error::with_msg_no_trace());
return Ok(());
};
debug!("handle_event_add_res {ev:?}");
// debug!("handle_event_add_res {ev:?}");
match ch_s {
ChannelState::Writable(st) => {
let created = &mut st.created;
created.ts_alive_last = tsnow;
created.item_recv_ivl_ema.tick(tsnow);
let scalar_type = st.writer.scalar_type().clone();
let shape = st.writer.shape().clone();
let series = match &mut created.state {
MonitoringState::AddingEvent(series) => {
let series = series.clone();
@@ -1246,7 +1244,7 @@ impl CaConn {
st2.recv_bytes += ev.payload_len as u64;
}
let ts_local = {
let ts = SystemTime::now();
let ts = self.tmp_ts_poll;
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
};
@@ -1262,39 +1260,8 @@ impl CaConn {
let ivl_min = (self.insert_ivl_min_mus as f32) * 1e-6;
let dt = (ivl_min - ema).max(0.) / em.k();
created.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64);
let ts_msp_last = created.ts_msp_last;
}
#[cfg(DISABLED)]
match &ev.value.data {
CaDataValue::Scalar(x) => match &x {
proto::CaDataScalarValue::F32(..) => match &scalar_type {
ScalarType::F32 => {}
_ => {
error!("MISMATCH got f32 exp {:?}", scalar_type);
}
},
proto::CaDataScalarValue::F64(..) => match &scalar_type {
ScalarType::F64 => {}
_ => {
error!("MISMATCH got f64 exp {:?}", scalar_type);
}
},
proto::CaDataScalarValue::I16(..) => match &scalar_type {
ScalarType::I16 => {}
_ => {
error!("MISMATCH got i16 exp {:?}", scalar_type);
}
},
proto::CaDataScalarValue::I32(..) => match &scalar_type {
ScalarType::I32 => {}
_ => {
error!("MISMATCH got i32 exp {:?}", scalar_type);
}
},
_ => {}
},
_ => {}
}
Self::check_ev_value_data(&ev.value.data, st.writer.scalar_type())?;
{
let val: DataValue = ev.value.data.into();
st.writer
@@ -1341,6 +1308,42 @@ impl CaConn {
Ok(())
}
fn check_ev_value_data(data: &proto::CaDataValue, scalar_type: &ScalarType) -> Result<(), Error> {
use crate::ca::proto::CaDataScalarValue;
use crate::ca::proto::CaDataValue;
match data {
CaDataValue::Scalar(x) => match &x {
CaDataScalarValue::F32(..) => match &scalar_type {
ScalarType::F32 => {}
_ => {
error!("MISMATCH got f32 exp {:?}", scalar_type);
}
},
CaDataScalarValue::F64(..) => match &scalar_type {
ScalarType::F64 => {}
_ => {
error!("MISMATCH got f64 exp {:?}", scalar_type);
}
},
CaDataScalarValue::I16(..) => match &scalar_type {
ScalarType::I16 => {}
_ => {
error!("MISMATCH got i16 exp {:?}", scalar_type);
}
},
CaDataScalarValue::I32(..) => match &scalar_type {
ScalarType::I32 => {}
_ => {
error!("MISMATCH got i32 exp {:?}", scalar_type);
}
},
_ => {}
},
_ => {}
}
Ok(())
}
/*
Acts more like a stream? Can be:
Pending
@@ -1444,6 +1447,7 @@ impl CaConn {
.time_check_channels_state_init
.add((ts2.duration_since(ts1) * MS as u32).as_secs());
ts1 = ts2;
let _ = ts1;
let tsnow = Instant::now();
let proto = if let Some(x) = self.proto.as_mut() {
x
@@ -1562,7 +1566,7 @@ impl CaConn {
fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> {
// TODO handle cid-not-found which can also indicate peer error.
let cid = Cid(k.cid);
let sid = k.sid;
let sid = Sid(k.sid);
let name = if let Some(x) = self.name_by_cid(cid) {
x.to_string()
} else {
@@ -1589,10 +1593,6 @@ impl CaConn {
cssid,
cid,
sid,
data_type: k.data_type,
data_count: k.data_count,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
ts_created: tsnow,
ts_alive_last: tsnow,
state: MonitoringState::FetchSeriesId,
@@ -1604,7 +1604,7 @@ impl CaConn {
insert_recv_ivl_last: tsnow,
insert_next_earliest: tsnow,
muted_before: 0,
info_store_msp_last: info_store_msp_from_time(SystemTime::now()),
info_store_msp_last: info_store_msp_from_time(self.tmp_ts_poll),
};
*ch_s = ChannelState::MakingSeriesWriter(created_state);
let name = self
@@ -1618,6 +1618,7 @@ impl CaConn {
scalar_type,
shape,
self.writer_tx.clone(),
self.tmp_ts_poll,
);
self.writer_establish_qu.push_back(job);
Ok(())
@@ -1655,7 +1656,7 @@ impl CaConn {
let addr = addr.clone();
self.insert_item_queue
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: SystemTime::now(),
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::Established,
}));
@@ -1676,7 +1677,7 @@ impl CaConn {
let addr = addr.clone();
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
ConnectionStatusItem {
ts: SystemTime::now(),
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectError,
},
@@ -1697,7 +1698,7 @@ impl CaConn {
let addr = addr.clone();
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
ConnectionStatusItem {
ts: SystemTime::now(),
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectTimeout,
},
@@ -1806,11 +1807,11 @@ impl CaConn {
}
}
fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<()>, Error> {
fn poll_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<()>, Error> {
use Poll::*;
match self.ticker.poll_unpin(cx) {
Ready(()) => {
match self.as_mut().handle_own_ticker_tick(cx) {
match self.as_mut().handle_own_ticker(cx) {
Ok(_) => {
if !self.is_shutdown() {
self.ticker = Self::new_self_ticker();
@@ -1830,13 +1831,18 @@ impl CaConn {
}
}
fn handle_own_ticker_tick(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> {
fn handle_own_ticker(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> {
// debug!("tick CaConn {}", self.remote_addr_dbg);
let tsnow = Instant::now();
// TODO add some random variation
if self.channel_status_emit_last + Duration::from_millis(3000) <= tsnow {
self.channel_status_emit_last = tsnow;
self.emit_channel_status()?;
}
if self.tick_last_writer + Duration::from_millis(2000) <= tsnow {
self.tick_last_writer = tsnow;
self.tick_writers()?;
}
match &self.state {
CaConnState::Unconnected(_) => {}
CaConnState::Connecting(since, _, _) => {
@@ -1878,6 +1884,17 @@ impl CaConn {
Ok(())
}
fn tick_writers(&mut self) -> Result<(), Error> {
for (k, st) in &mut self.channels {
if let ChannelState::Writable(st2) = st {
st2.writer
.tick(&mut self.insert_item_queue)
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
}
}
Ok(())
}
fn check_ticker_connecting_timeout(&mut self, since: Instant) -> Result<(), Error> {
Ok(())
}
@@ -1896,6 +1913,7 @@ impl CaConn {
use Poll::*;
let (qu, sd, stats) = Self::storage_queue_vars(&mut self);
{
// TODO use stats histogram type to test the native prometheus histogram feature
let n = qu.len();
if n >= 128 {
stats.storage_queue_above_128().inc();
@@ -1965,35 +1983,110 @@ impl CaConn {
fn attempt_flush_writer_establish(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
if self.is_shutdown() {
Ok(Ready(None))
} else {
let sd = self.writer_establish_tx.as_mut();
if !sd.has_sender() {
return Err(Error::with_msg_no_trace(
"attempt_flush_channel_info_query no more sender",
));
let sd = self.writer_establish_tx.as_mut();
if !sd.has_sender() {
return Err(Error::with_msg_no_trace(
"attempt_flush_channel_info_query no more sender",
));
}
if sd.is_idle() {
if let Some(item) = self.writer_establish_qu.pop_front() {
trace3!("send EstablishWorkerJob");
let sd = self.writer_establish_tx.as_mut();
sd.send_pin(item);
}
if sd.is_idle() {
if let Some(item) = self.writer_establish_qu.pop_front() {
trace3!("send EstablishWorkerJob");
let sd = self.writer_establish_tx.as_mut();
sd.send_pin(item);
}
let sd = &mut self.writer_establish_tx;
if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => {
debug!("flushed writer establish job");
Ok(Ready(Some(())))
}
Ready(Err(_)) => Err(Error::with_msg_no_trace(
"attempt_flush_channel_info_query can not send into channel",
)),
Pending => Ok(Pending),
}
let sd = &mut self.writer_establish_tx;
if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => Ok(Ready(Some(()))),
Ready(Err(_)) => Err(Error::with_msg_no_trace(
"attempt_flush_channel_info_query can not send into channel",
)),
Pending => Ok(Pending),
} else {
Ok(Ready(None))
}
}
fn attempt_flush_queue<T, Q, FB>(
qu: &mut VecDeque<T>,
sp: &mut Pin<Box<SenderPolling<Q>>>,
qu_to_si: FB,
loop_max: u32,
cx: &mut Context,
) -> Result<Poll<Option<()>>, Error>
where
Q: Unpin,
FB: Fn(&mut VecDeque<T>) -> Option<Q>,
{
use Poll::*;
let mut have_progress = false;
let mut i = 0;
loop {
i += 1;
if i > loop_max {
break;
}
if !sp.has_sender() {
return Err(Error::with_msg_no_trace("attempt_flush_queue no sender"));
}
if sp.is_idle() {
if let Some(item) = qu_to_si(qu) {
sp.as_mut().send_pin(item);
} else {
}
// TODO maybe use a generic function which produces the next
// item from a queue: can be a batch!
// if let Some(item) = qu.pop_front() {
// // let sd = self.writer_establish_tx.as_mut();
// // sp.as_mut().send_pin(item);
// } else {
// // break;
// }
}
// let sd = &mut self.writer_establish_tx;
if sp.is_sending() {
match sp.poll_unpin(cx) {
Ready(Ok(())) => {
have_progress = true;
}
Ready(Err(e)) => {
let e = Error::with_msg_no_trace(format!("attempt_flush_queue {e}"));
return Err(e);
}
Pending => {
return Ok(Pending);
}
}
} else {
Ok(Ready(None))
let e = Error::with_msg_no_trace(format!("attempt_flush_queue not sending"));
return Err(e);
}
}
if have_progress {
Ok(Ready(Some(())))
} else {
Ok(Ready(None))
}
}
fn send_individual<T>(qu: &mut VecDeque<T>) -> Option<T> {
qu.pop_front()
}
fn send_batched<const N: usize, T>(qu: &mut VecDeque<T>) -> Option<VecDeque<T>> {
let n = qu.len();
if n == 0 {
None
} else {
let batch = qu.drain(..n.min(N)).collect();
Some(batch)
}
}
}
@@ -2002,6 +2095,7 @@ impl Stream for CaConn {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.tmp_ts_poll = SystemTime::now();
let poll_ts1 = Instant::now();
self.stats.poll_count().inc();
self.stats.poll_fn_begin().inc();
@@ -2027,9 +2121,7 @@ impl Stream for CaConn {
break Ready(Some(Ok(item)));
}
let lts2 = Instant::now();
match self.as_mut().handle_own_ticker(cx) {
match self.as_mut().poll_own_ticker(cx) {
Ok(Ready(())) => {
have_progress = true;
}
@@ -2039,28 +2131,52 @@ impl Stream for CaConn {
Err(e) => break Ready(Some(Err(e))),
}
match self.as_mut().attempt_flush_storage_queue(cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
if !self.is_shutdown() {
fn abc(
obj: &mut CaConn,
) -> (
&mut VecDeque<QueryItem>,
&mut Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
) {
(&mut obj.insert_item_queue, &mut obj.storage_insert_sender)
}
Ok(Ready(None)) => {}
Ok(Pending) => {
have_pending = true;
let (qu, sp) = abc(self.as_mut().get_mut());
match Self::attempt_flush_queue(qu, sp, Self::send_batched::<32, _>, 32, cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
}
Err(e) => break Ready(Some(Err(e))),
// match self.as_mut().attempt_flush_storage_queue(cx) {
// Ok(Ready(Some(()))) => {
// have_progress = true;
// }
// Ok(Ready(None)) => {}
// Ok(Pending) => {
// have_pending = true;
// }
// Err(e) => break Ready(Some(Err(e))),
// }
}
let lts3 = Instant::now();
match self.as_mut().attempt_flush_writer_establish(cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
if !self.is_shutdown() {
match self.as_mut().attempt_flush_writer_establish(cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
}
Ok(Ready(None)) => {}
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
}
let lts2 = Instant::now();
+51 -27
View File
@@ -9,6 +9,7 @@ use crate::iteminsertqueue::ConnectionStatusItem;
use crate::iteminsertqueue::InsertFut;
use crate::iteminsertqueue::InsertItem;
use crate::iteminsertqueue::QueryItem;
use crate::iteminsertqueue::TimeBinSimpleF32;
use crate::store::DataStore;
use async_channel::Receiver;
use async_channel::Sender;
@@ -289,34 +290,9 @@ async fn worker(
}
}
}
QueryItem::TimeBinPatchSimpleF32(item) => {
QueryItem::TimeBinSimpleF32(item) => {
info!("have time bin patch to insert: {item:?}");
let params = (
item.series.id() as i64,
item.bin_len_sec as i32,
item.bin_count as i32,
item.off_msp as i32,
item.off_lsp as i32,
item.counts,
item.mins,
item.maxs,
item.avgs,
ttls.binned.as_secs() as i32,
);
let qres = data_store
.scy
.execute(&data_store.qu_insert_binned_scalar_f32_v01, params)
.await;
match qres {
Ok(_) => {
stats.inserted_binned().inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e));
back_off_sleep(&mut backoff).await;
}
}
return Err(Error::with_msg_no_trace("TODO insert item old path"));
}
}
}
@@ -365,8 +341,12 @@ async fn worker_streamed(
stats.inserted_channel_status().inc();
insert_channel_status_fut(item, &ttls, &data_store, stats.clone())
}
QueryItem::TimeBinSimpleF32(item) => {
prepare_timebin_insert_futs(item, &ttls, &data_store, &stats, tsnow_u64)
}
_ => {
// TODO
debug!("TODO insert item {item:?}");
SmallVec::new()
}
};
@@ -465,3 +445,47 @@ fn prepare_query_insert_futs(
futs
}
fn prepare_timebin_insert_futs(
item: TimeBinSimpleF32,
ttls: &Ttls,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow_u64: u64,
) -> SmallVec<[InsertFut; 4]> {
// debug!("have time bin patch to insert: {item:?}");
let params = (
item.series.id() as i64,
item.bin_len_ms,
item.ts_msp,
item.off,
item.count,
item.min,
item.max,
item.avg,
ttls.binned.as_secs() as i32,
);
// TODO would be better to count inserts only on completed insert
stats.inserted_binned().inc();
let fut = InsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_binned_scalar_f32_v02.clone(),
params,
tsnow_u64,
stats.clone(),
);
let futs = smallvec![fut];
// TODO match on the query result:
// match qres {
// Ok(_) => {
// backoff = backoff_0;
// }
// Err(e) => {
// stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e));
// back_off_sleep(&mut backoff).await;
// }
// }
futs
}
+9 -10
View File
@@ -332,16 +332,15 @@ pub struct ChannelInfoItem {
}
#[derive(Debug)]
pub struct TimeBinPatchSimpleF32 {
pub struct TimeBinSimpleF32 {
pub series: SeriesId,
pub bin_len_sec: u32,
pub bin_count: u32,
pub off_msp: u32,
pub off_lsp: u32,
pub counts: Vec<i64>,
pub mins: Vec<f32>,
pub maxs: Vec<f32>,
pub avgs: Vec<f32>,
pub bin_len_ms: i32,
pub ts_msp: i64,
pub off: i32,
pub count: i64,
pub min: f32,
pub max: f32,
pub avg: f32,
}
#[derive(Debug)]
@@ -352,7 +351,7 @@ pub enum QueryItem {
Mute(MuteItem),
Ivl(IvlItem),
ChannelInfo(ChannelInfoItem),
TimeBinPatchSimpleF32(TimeBinPatchSimpleF32),
TimeBinSimpleF32(TimeBinSimpleF32),
}
struct InsParCom {
+10 -11
View File
@@ -489,20 +489,19 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Er
}
{
let tab = GenTwcsTab::new(
"binned_scalar_f32_v01",
"binned_scalar_f32",
&[
("series", "bigint"),
("bin_len_sec", "int"),
("bin_count", "int"),
("off_msp", "int"),
("off_lsp", "int"),
("counts", "frozen<list<bigint>>"),
("mins", "frozen<list<float>>"),
("maxs", "frozen<list<float>>"),
("avgs", "frozen<list<float>>"),
("bin_len_ms", "int"),
("ts_msp", "bigint"),
("off", "int"),
("count", "bigint"),
("min", "float"),
("max", "float"),
("avg", "float"),
],
["series", "bin_len_sec", "bin_count", "off_msp"],
["off_lsp"],
["series", "bin_len_ms", "ts_msp"],
["off"],
ddays(30),
ddays(4),
);
+6 -6
View File
@@ -40,7 +40,7 @@ pub struct DataStore {
pub qu_insert_channel_status: Arc<PreparedStatement>,
pub qu_insert_channel_status_by_ts_msp: Arc<PreparedStatement>,
pub qu_insert_channel_ping: Arc<PreparedStatement>,
pub qu_insert_binned_scalar_f32_v01: Arc<PreparedStatement>,
pub qu_insert_binned_scalar_f32_v02: Arc<PreparedStatement>,
}
impl DataStore {
@@ -163,12 +163,12 @@ impl DataStore {
let qu_insert_channel_ping = Arc::new(q);
let cql = concat!(
"insert into binned_scalar_f32_v01 (",
"series, bin_len_sec, bin_count, off_msp, off_lsp, counts, mins, maxs, avgs)",
" values (?, ?, ?, ?, ?, ?, ?, ?, ?) using ttl ?"
"insert into binned_scalar_f32 (",
"series, bin_len_ms, ts_msp, off, count, min, max, avg)",
" values (?, ?, ?, ?, ?, ?, ?, ?) using ttl ?"
);
let q = scy.prepare(cql).await?;
let qu_insert_binned_scalar_f32_v01 = Arc::new(q);
let qu_insert_binned_scalar_f32_v02 = Arc::new(q);
let ret = Self {
scy,
qu_insert_ts_msp,
@@ -194,7 +194,7 @@ impl DataStore {
qu_insert_channel_status,
qu_insert_channel_status_by_ts_msp,
qu_insert_channel_ping,
qu_insert_binned_scalar_f32_v01,
qu_insert_binned_scalar_f32_v02,
};
Ok(ret)
}
+180 -83
View File
@@ -8,9 +8,12 @@ use items_0::Appendable;
use items_0::Empty;
use items_0::Events;
use items_0::Resettable;
use items_0::WithLen;
use items_2::binsdim0::BinsDim0;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim0::EventsDim0TimeBinner;
use netpod::log::*;
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
@@ -20,7 +23,7 @@ use netpod::TsNano;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::GetValHelp;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinPatchSimpleF32;
use scywr::iteminsertqueue::TimeBinSimpleF32;
use series::SeriesId;
use std::any;
use std::any::Any;
@@ -51,6 +54,7 @@ struct TickParams<'a> {
tb: &'a mut Box<dyn TimeBinner>,
pc: &'a mut PatchCollect,
iiq: &'a mut VecDeque<QueryItem>,
next_coarse: Option<&'a mut EventsDim0TimeBinner<f32>>,
}
pub struct PushFnParams<'a> {
@@ -63,11 +67,13 @@ pub struct PushFnParams<'a> {
pub struct ConnTimeBin {
did_setup: bool,
series: SeriesId,
bin_len: TsNano,
next_coarse: Option<Box<EventsDim0TimeBinner<f32>>>,
patch_collect: PatchCollect,
events_binner: Option<Box<dyn TimeBinner>>,
acc: Box<dyn Any + Send>,
push_fn: Box<dyn Fn(PushFnParams) -> Result<(), Error> + Send>,
tick_fn: Box<dyn Fn(TickParams) -> Result<(), Error> + Send>,
events_binner: Option<Box<dyn TimeBinner>>,
patch_collect: PatchCollect,
}
impl fmt::Debug for ConnTimeBin {
@@ -85,28 +91,49 @@ impl fmt::Debug for ConnTimeBin {
}
impl ConnTimeBin {
pub fn empty() -> Self {
pub fn empty(series: SeriesId, bin_len: TsNano) -> Self {
let do_time_weight = true;
#[cfg(DISABLED)]
let next_coarse = if bin_len.ns() < SEC * 60 {
type ST = f32;
let brange = BinnedRange {
bin_len: TsNano::from_ns(SEC * 60),
bin_off: todo!(),
bin_cnt: todo!(),
};
let binned_range = BinnedRangeEnum::Time(brange);
let tb = EventsDim0TimeBinner::<ST>::new(binned_range, do_time_weight).unwrap();
Some(tb)
} else if bin_len.ns() < SEC * 60 * 2 {
todo!()
} else if bin_len.ns() < SEC * 60 * 10 {
todo!()
} else {
None
}
.map(Box::new);
Self {
patch_collect: PatchCollect::new(bin_len.clone(), 1),
did_setup: false,
series: SeriesId::new(0),
series,
bin_len,
next_coarse: None,
events_binner: None,
acc: Box::new(()),
push_fn: Box::new(push::<i32>),
tick_fn: Box::new(tick::<i32>),
events_binner: None,
patch_collect: PatchCollect::new(TsNano(SEC * 60), 1),
}
}
pub fn setup_for(&mut self, series: SeriesId, scalar_type: &ScalarType, shape: &Shape) -> Result<(), Error> {
pub fn setup_for(&mut self, scalar_type: &ScalarType, shape: &Shape, tsnow: SystemTime) -> Result<(), Error> {
use ScalarType::*;
self.series = series;
let tsnow = SystemTime::now();
// TODO should not take a system time here:
let bin_len = &self.bin_len;
let ts0 = SEC * tsnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
let bin_len = self.patch_collect.bin_len();
let range1 = BinnedRange {
bin_off: ts0 / bin_len.ns(),
bin_cnt: u64::MAX / bin_len.ns() - 10,
bin_len,
bin_len: bin_len.clone(),
};
let binrange = BinnedRangeEnum::Time(range1);
//info!("binrange {binrange:?}");
@@ -195,7 +222,7 @@ impl ConnTimeBin {
pub fn push(&mut self, ts: TsNano, val: &DataValue) -> Result<(), Error> {
if !self.did_setup {
//return Err(Error::with_msg_no_trace("ConnTimeBin not yet set up"));
// TODO record as logic error
return Ok(());
}
let (f, acc) = (&self.push_fn, &mut self.acc);
@@ -219,43 +246,12 @@ impl ConnTimeBin {
tb: self.events_binner.as_mut().unwrap(),
pc: &mut self.patch_collect,
iiq: insert_item_queue,
next_coarse: self.next_coarse.as_mut().map(|x| x.as_mut()),
};
f(params)
}
}
fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque<QueryItem>) -> Result<(), Error> {
for item in pc.take_outq() {
if let Some(k) = item.as_any_ref().downcast_ref::<BinsDim0<f32>>() {
let ts0 = if let Some(x) = k.ts1s.front() {
*x
} else {
return Err(Error::PatchWithoutBins);
};
let off = ts0 / pc.patch_len().0;
let off_msp = off / 1000;
let off_lsp = off % 1000;
let item = TimeBinPatchSimpleF32 {
series: series.clone(),
bin_len_sec: (pc.bin_len().ns() / SEC) as u32,
bin_count: pc.bin_count() as u32,
off_msp: off_msp as u32,
off_lsp: off_lsp as u32,
counts: k.counts.iter().map(|x| *x as i64).collect(),
mins: k.mins.iter().map(|x| *x).collect(),
maxs: k.maxs.iter().map(|x| *x).collect(),
avgs: k.avgs.iter().map(|x| *x).collect(),
};
let item = QueryItem::TimeBinPatchSimpleF32(item);
iiq.push_back(item);
} else {
error!("unexpected container!");
return Err(Error::PatchUnexpectedContainer);
}
}
Ok(())
}
fn push<STY>(params: PushFnParams) -> Result<(), Error>
where
STY: ScalarOps,
@@ -266,6 +262,7 @@ where
let v = match GetValHelp::<STY>::get(params.val) {
Ok(x) => x,
Err(e) => {
// TODO throttle the error
let msg = format!(
"GetValHelp mismatch: series {:?} STY {} data {:?} {e}",
sid,
@@ -291,51 +288,41 @@ fn tick<STY>(params: TickParams) -> Result<(), Error>
where
STY: ScalarOps,
{
use items_0::WithLen;
let acc = params.acc;
let tb = params.tb;
let pc = params.pc;
// let pc = params.pc;
let iiq = params.iiq;
let next = params.next_coarse;
if let Some(c) = acc.downcast_mut::<EventsDim0<STY>>() {
if c.len() >= 1 {
//info!("push events len {}", c.len());
tb.ingest(c);
c.reset();
if tb.bins_ready_count() >= 1 {
info!("store bins len {}", tb.bins_ready_count());
if let Some(mut bins) = tb.bins_ready() {
//info!("store bins {bins:?}");
let mut bins = bins.to_simple_bins_f32();
pc.ingest(bins.as_mut())?;
if pc.outq_len() != 0 {
store_patch(params.series.clone(), pc, iiq)?;
for item in pc.take_outq() {
if let Some(k) = item.as_any_ref().downcast_ref::<BinsDim0<f32>>() {
// TODO
//let off_msp =
let item = TimeBinPatchSimpleF32 {
series: params.series.clone(),
bin_len_sec: (pc.bin_len().ns() / SEC) as u32,
bin_count: pc.bin_count() as u32,
off_msp: 0,
off_lsp: 0,
counts: k.counts.iter().map(|x| *x as i64).collect(),
mins: k.mins.iter().map(|x| *x).collect(),
maxs: k.maxs.iter().map(|x| *x).collect(),
avgs: k.avgs.iter().map(|x| *x).collect(),
};
let item = QueryItem::TimeBinPatchSimpleF32(item);
iiq.push_back(item);
} else {
error!("unexpected container!");
}
}
}
Ok(())
} else {
error!("have bins but none returned");
Err(Error::HaveBinsButNoneReturned)
}
let nbins = tb.bins_ready_count();
if nbins >= 1 {
info!("store bins len {} {:?}", nbins, params.series);
store_bins(params.series.clone(), tb, iiq, next)?;
// if let Some(mut bins) = tb.bins_ready() {
// //info!("store bins {bins:?}");
// let mut bins = bins.to_simple_bins_f32();
// TODO;
// pc.ingest(bins.as_mut())?;
// let noutq = pc.outq_len();
// info!("noutq {noutq}");
// if noutq != 0 {
// store_patch(params.series.clone(), pc, iiq)?;
// Ok(())
// } else {
// warn!("pc outq len zero");
// Ok(())
// }
// } else {
// error!("have bins but none returned");
// Err(Error::HaveBinsButNoneReturned)
// }
Ok(())
} else {
Ok(())
}
@@ -348,3 +335,113 @@ where
Ok(())
}
}
fn store_bins(
series: SeriesId,
tb: &mut Box<dyn TimeBinner>,
iiq: &mut VecDeque<QueryItem>,
next: Option<&mut EventsDim0TimeBinner<f32>>,
) -> Result<(), Error> {
if let Some(mut bins) = tb.bins_ready() {
let bins = bins.to_simple_bins_f32();
if let Some(k) = bins.as_any_ref().downcast_ref::<BinsDim0<f32>>() {
if k.len() == 0 {
return Err(Error::PatchWithoutBins);
} else {
for (((((&ts1, &ts2), &count), &min), &max), &avg) in k
.ts1s
.iter()
.zip(k.ts2s.iter())
.zip(k.counts.iter())
.zip(k.mins.iter())
.zip(k.maxs.iter())
.zip(k.avgs.iter())
{
// TODO the inner must be of BinsDim0<f32> type so we feed also count, min, max, etc.
if let Some(next) = &next {
// next.ingest();
}
// TODO this must depend on the data type: waveforms need smaller batches
let bins_per_msp = 10000;
let ts1ms = ts1 / MS;
let ts2ms = ts2 / MS;
let bin_len_ms = ts2ms - ts1ms;
let h = bins_per_msp * bin_len_ms;
let ts_msp = ts1ms / h * h;
let off = (ts1ms - ts_msp) / bin_len_ms;
let item = TimeBinSimpleF32 {
series: series.clone(),
bin_len_ms: bin_len_ms as i32,
ts_msp: ts_msp as i64,
off: off as i32,
count: count as i64,
min,
max,
avg,
};
let item = QueryItem::TimeBinSimpleF32(item);
debug!("push item B ts1ms {ts1ms} bin_len_ms {bin_len_ms} ts_msp {ts_msp} off {off}");
iiq.push_back(item);
}
}
} else {
error!("unexpected container!");
return Err(Error::PatchUnexpectedContainer);
}
// TODO feed also the next patch collector for the next coarse resolution.
// pc.ingest(bins.as_mut())?;
// let noutq = pc.outq_len();
// info!("noutq {noutq}");
// if noutq != 0 {
// store_patch(params.series.clone(), pc, iiq)?;
// Ok(())
// } else {
// warn!("pc outq len zero");
// Ok(())
// }
Ok(())
} else {
error!("have bins but none returned");
Err(Error::HaveBinsButNoneReturned)
}
}
fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque<QueryItem>) -> Result<(), Error> {
// TODO
// I probably still want to keep the "patchcollect" because I want to store also the next
// resolutions.
// But I need to emit each bin as they come.
for item in pc.take_outq() {
if let Some(k) = item.as_any_ref().downcast_ref::<BinsDim0<f32>>() {
let ts0 = if let Some(x) = k.ts1s.front() {
*x
} else {
return Err(Error::PatchWithoutBins);
};
// TODO insert each bin individually
let bin_len_sec = (pc.bin_len().ns() / MS);
let bin_count = pc.bin_count();
let off = ts0 / pc.patch_len().0;
let off_msp = off / 1000;
let off_lsp = off % 1000;
// let item = TimeBinSimpleF32 {
// };
// let item = QueryItem::TimeBinSimpleF32(item);
// warn!(
// "push item B bin_len_sec {bin_len_sec} bin_count {bin_count} off_msp {off_msp} off_lsp {off_lsp}"
// );
// iiq.push_back(item);
} else {
error!("unexpected container!");
return Err(Error::PatchUnexpectedContainer);
}
}
Ok(())
}
+25 -14
View File
@@ -5,12 +5,9 @@ use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use log::*;
use netpod::timeunits::DAY;
use netpod::timeunits::HOUR;
use netpod::timeunits::SEC;
use netpod::Database;
use netpod::ScalarType;
use netpod::ScyllaConfig;
use netpod::Shape;
use netpod::TsNano;
use netpod::TS_MSP_GRID_SPACING;
@@ -23,7 +20,7 @@ use series::ChannelStatusSeriesId;
use series::SeriesId;
use stats::SeriesByChannelStats;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
pub enum Error {
@@ -72,6 +69,7 @@ impl SeriesWriter {
channel: String,
scalar_type: ScalarType,
shape: Shape,
tsnow: SystemTime,
) -> Result<Self, Error> {
let (tx, rx) = async_channel::bounded(1);
let item = ChannelInfoQuery {
@@ -95,8 +93,8 @@ impl SeriesWriter {
worker_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let sid = res.series.into_inner();
let mut binner = ConnTimeBin::empty();
binner.setup_for(sid.clone(), &scalar_type, &shape)?;
let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ns(SEC * 10));
binner.setup_for(&scalar_type, &shape, tsnow)?;
let res = Self {
cssid,
sid,
@@ -130,21 +128,19 @@ impl SeriesWriter {
val: DataValue,
item_qu: &mut VecDeque<QueryItem>,
) -> Result<(), Error> {
// TODO check for compatibility of the given data..
// TODO compute the binned data here as well and flush completed bins if needed.
self.binner.push(ts.clone(), &val)?;
// TODO decide on better msp/lsp: random offset!
// As long as one writer is active, the msp is arbitrary.
// TODO need to choose this better?
let div = SEC * 10;
// Maximum resolution of the ts msp:
let msp_res_max = SEC * 10;
let (ts_msp, ts_msp_changed) = match self.ts_msp_last.clone() {
Some(ts_msp_last) => {
if self.inserted_in_current_msp >= self.msp_max_entries || ts_msp_last.clone().add_ns(HOUR) <= ts {
let ts_msp = ts.clone().div(div).mul(div);
let ts_msp = ts.clone().div(msp_res_max).mul(msp_res_max);
if ts_msp == ts_msp_last {
(ts_msp, false)
} else {
@@ -158,7 +154,7 @@ impl SeriesWriter {
}
}
None => {
let ts_msp = ts.clone().div(div).mul(div);
let ts_msp = ts.clone().div(msp_res_max).mul(msp_res_max);
self.ts_msp_last = Some(ts_msp.clone());
self.inserted_in_current_msp = 1;
(ts_msp, true)
@@ -191,6 +187,11 @@ impl SeriesWriter {
item_qu.push_back(QueryItem::Insert(item));
Ok(())
}
pub fn tick(&mut self, iiq: &mut VecDeque<QueryItem>) -> Result<(), Error> {
self.binner.tick(iiq)?;
Ok(())
}
}
pub struct JobId(pub u64);
@@ -207,12 +208,15 @@ impl EstablishWriterWorker {
async fn work(self) {
while let Ok(item) = self.jobrx.recv().await {
// TODO
debug!("got job");
let res = SeriesWriter::establish(
self.worker_tx.clone(),
item.backend,
item.channel,
item.scalar_type,
item.shape,
item.tsnow,
)
.await;
if item.restx.send((item.job_id, res)).await.is_err() {
@@ -229,6 +233,7 @@ pub struct EstablishWorkerJob {
scalar_type: ScalarType,
shape: Shape,
restx: Sender<(JobId, Result<SeriesWriter, Error>)>,
tsnow: SystemTime,
}
impl EstablishWorkerJob {
@@ -239,6 +244,7 @@ impl EstablishWorkerJob {
scalar_type: ScalarType,
shape: Shape,
restx: Sender<(JobId, Result<SeriesWriter, Error>)>,
tsnow: SystemTime,
) -> Self {
Self {
job_id,
@@ -247,6 +253,7 @@ impl EstablishWorkerJob {
scalar_type,
shape,
restx,
tsnow,
}
}
}
@@ -262,6 +269,9 @@ pub fn start_writer_establish_worker(
#[test]
fn write_00() {
use netpod::Database;
use scywr::session::ScyllaConfig;
use std::sync::Arc;
let fut = async {
let dbconf = &Database {
name: "daqbuffer".into(),
@@ -285,12 +295,13 @@ fn write_00() {
let channel = "chn-test-00";
let scalar_type = ScalarType::I16;
let shape = Shape::Scalar;
let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape).await?;
let tsnow = SystemTime::now();
let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape, tsnow).await?;
eprintln!("{writer:?}");
let mut item_queue = VecDeque::new();
let item_qu = &mut item_queue;
for i in 0..10 {
let ts = TsNano::from_ns(DAY + SEC * i);
let ts = TsNano::from_ns(HOUR * 24 + SEC * i);
let ts_local = ts.clone();
let val = DataValue::Scalar(scywr::iteminsertqueue::ScalarValue::I16(i as _));
writer.write(ts, ts_local, val, item_qu)?;