WIP insert

This commit is contained in:
Dominik Werder
2022-05-04 15:50:45 +02:00
parent 2d7d8f0bbd
commit b206bd0eb0
8 changed files with 524 additions and 119 deletions

View File

@@ -2,7 +2,7 @@ use crate::zmtp::ZmtpMessage;
use err::Error;
#[allow(unused)]
use log::*;
use netpod::{ByteOrder, ScalarType, Shape};
use netpod::{AggKind, ByteOrder, ScalarType, Shape};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsVal;
@@ -42,6 +42,54 @@ fn bsread_encoding_default() -> String {
"little".into()
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum CompressionKind {
Lz4,
BitshuffleLz4,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelDescDecoded {
pub name: String,
pub scalar_type: ScalarType,
pub shape: Shape,
pub byte_order: ByteOrder,
pub compression: Option<CompressionKind>,
pub agg_kind: AggKind,
}
impl TryFrom<&ChannelDesc> for ChannelDescDecoded {
type Error = Error;
fn try_from(cd: &ChannelDesc) -> Result<Self, Self::Error> {
let ret = ChannelDescDecoded {
name: cd.name.clone(),
scalar_type: ScalarType::from_bsread_str(&cd.ty)?,
shape: Shape::from_bsread_jsval(&cd.shape)?,
compression: match &cd.compression {
None => None,
Some(k) => match k.as_str() {
"none" => None,
"lz4" => Some(CompressionKind::Lz4),
"bitshuffle_lz4" => Some(CompressionKind::BitshuffleLz4),
_ => {
return Err(Error::with_msg_no_trace(format!(
"can not understand bsread compression kind: {k:?}"
)))
}
},
},
byte_order: match cd.encoding.as_str() {
"little" => ByteOrder::LE,
"big" => ByteOrder::BE,
_ => ByteOrder::LE,
},
agg_kind: AggKind::Plain,
};
Ok(ret)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HeadA {
pub htype: String,

View File

@@ -1,5 +1,6 @@
pub mod conn;
pub mod proto;
pub mod store;
use conn::{CaConn, FindIoc};
use err::Error;
@@ -13,6 +14,7 @@ use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, VecDeque};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
@@ -20,6 +22,8 @@ use tokio::net::TcpStream;
use tokio::task::JoinError;
use tokio::time::error::Elapsed;
use self::store::{ChannelRegistry, DataStore};
#[derive(Debug, Serialize, Deserialize)]
struct ChannelConfig {
channels: Vec<String>,
@@ -229,9 +233,12 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
for (host, channels) in &channels_by_host {
info!("Have: {:?} {:?}", host, channels.len());
}
let nil = None::<i8>;
for ch in &opts.channels {
if !channels_set.contains_key(ch) {
error!("Could not locate {ch:?}");
scy.execute(&qu, (ch, "", nil))
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
}
}
Ok(())
@@ -246,6 +253,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
.build()
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scy = Arc::new(scy);
let qu_find_addr = scy
.prepare("select addr from ioc_by_channel where channel = ?")
.await
@@ -274,12 +282,14 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
if opts.abort_after_search == 1 {
return Ok(());
}
let data_store = Arc::new(DataStore::new(scy.clone()).await?);
let mut conn_jhs = vec![];
for (host, channels) in channels_by_host {
let data_store = data_store.clone();
let conn_block = async move {
info!("Create TCP connection to {:?}", (host.ip(), host.port()));
let tcp = TcpStream::connect((host.ip().clone(), host.port())).await?;
let mut conn = CaConn::new(tcp);
let mut conn = CaConn::new(tcp, data_store.clone());
for c in channels {
conn.channel_add(c);
}

View File

@@ -1,15 +1,22 @@
use super::proto::{CaItem, CaMsg, CaMsgTy, CaProto};
use super::store::DataStore;
use crate::bsread::ChannelDescDecoded;
use crate::ca::proto::{CreateChan, EventAdd, HeadInfo, ReadNotify};
use crate::series::{Existence, SeriesId};
use err::Error;
use futures_util::{Future, FutureExt, Stream, StreamExt};
use futures_util::stream::FuturesUnordered;
use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt};
use libc::c_int;
use log::*;
use netpod::timeunits::SEC;
use netpod::{ScalarType, Shape};
use std::collections::BTreeMap;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::time::{Duration, Instant, SystemTime};
use tokio::io::unix::AsyncFd;
use tokio::net::TcpStream;
@@ -25,8 +32,10 @@ struct EventedState {
#[derive(Debug)]
enum MonitoringState {
AddingEvent,
Evented(EventedState),
FetchSeriesId,
AddingEvent(SeriesId),
Evented(SeriesId, EventedState),
// TODO we also want to read while staying subscribed:
Reading,
Read,
Muted,
@@ -36,6 +45,8 @@ enum MonitoringState {
struct CreatedState {
cid: u32,
sid: u32,
scalar_type: ScalarType,
shape: Shape,
ts_created: Instant,
state: MonitoringState,
}
@@ -82,10 +93,15 @@ pub struct CaConn {
cid_by_subid: BTreeMap<u32, u32>,
name_by_cid: BTreeMap<u32, String>,
poll_count: usize,
data_store: Arc<DataStore>,
fut_get_series: FuturesUnordered<
Pin<Box<dyn Future<Output = Result<(u32, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>,
>,
value_insert_futs: FuturesUnordered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
}
impl CaConn {
pub fn new(tcp: TcpStream) -> Self {
pub fn new(tcp: TcpStream, data_store: Arc<DataStore>) -> Self {
Self {
state: CaConnState::Init,
proto: CaProto::new(tcp),
@@ -97,6 +113,9 @@ impl CaConn {
cid_by_subid: BTreeMap::new(),
name_by_cid: BTreeMap::new(),
poll_count: 0,
data_store,
fut_get_series: FuturesUnordered::new(),
value_insert_futs: FuturesUnordered::new(),
}
}
@@ -135,6 +154,65 @@ impl Stream for CaConn {
return Ready(None);
}
loop {
while self.value_insert_futs.len() > 0 {
match self.fut_get_series.poll_next_unpin(cx) {
Pending => break,
_ => {}
}
}
while self.fut_get_series.len() > 0 {
match self.fut_get_series.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
info!("Have SeriesId {k:?}");
let cid = k.0;
let sid = k.1;
let data_type = k.2;
let data_count = k.3;
let series = match k.4 {
Existence::Created(k) => k,
Existence::Existing(k) => k,
};
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
let name = self.name_by_cid(cid).unwrap().to_string();
let msg = CaMsg {
ty: CaMsgTy::EventAdd(EventAdd {
sid,
data_type,
data_count,
subid,
}),
};
self.proto.push_out(msg);
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Created(CreatedState {
cid,
sid,
// TODO handle error better! Transition channel to Error state?
scalar_type: ScalarType::from_ca_id(data_type)?,
shape: Shape::from_ca_count(data_count)?,
ts_created: Instant::now(),
state: MonitoringState::AddingEvent(series),
});
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
let _cd = ChannelDescDecoded {
name: name.to_string(),
scalar_type,
shape,
agg_kind: netpod::AggKind::Plain,
// TODO these play no role in series id:
byte_order: netpod::ByteOrder::LE,
compression: None,
};
cx.waker().wake_by_ref();
}
Ready(Some(Err(e))) => error!("series error: {e:?}"),
Ready(None) => {}
Pending => break,
}
}
break match &self.state {
CaConnState::Init => {
let msg = CaMsg { ty: CaMsgTy::Version };
@@ -229,75 +307,169 @@ impl Stream for CaConn {
let res = match self.proto.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
match k {
CaItem::Msg(k) => match k.ty {
CaMsgTy::SearchRes(k) => {
let a = k.addr.to_be_bytes();
let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port);
info!("Search result indicates server address: {addr}");
}
CaMsgTy::CreateChanRes(k) => {
// TODO handle cid-not-found which can also indicate peer error.
let cid = k.cid;
let name = self.name_by_cid(cid);
info!("Channel created for {name:?} now register for events");
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
let msg = CaMsg {
ty: CaMsgTy::EventAdd(EventAdd {
sid: k.sid,
data_type: k.data_type,
data_count: k.data_count,
subid,
}),
};
self.proto.push_out(msg);
do_wake_again = true;
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&k.cid).unwrap();
*ch_s = ChannelState::Created(CreatedState {
cid: k.cid,
sid: k.sid,
ts_created: Instant::now(),
state: MonitoringState::AddingEvent,
});
info!(
"Channel is created cid {} sid {} name {}",
k.cid, k.sid, self.name_by_cid[&k.cid]
);
}
CaMsgTy::EventAddRes(k) => {
// TODO handle subid-not-found which can also be peer error:
let cid = *self.cid_by_subid.get(&k.subid).unwrap();
// TODO get rid of the string clone when I don't want the log output any longer:
let name: String = self.name_by_cid(cid).unwrap().into();
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
match ch_s {
ChannelState::Created(st) => {
match st.state {
MonitoringState::AddingEvent => {
info!("Confirmation {name} is subscribed.");
// TODO get ts from faster common source:
st.state = MonitoringState::Evented(EventedState {
ts_last: Instant::now(),
});
}
MonitoringState::Evented(ref mut st) => {
// TODO get ts from faster common source:
st.ts_last = Instant::now();
}
_ => {
warn!("bad state? not always, could be late message.");
CaItem::Msg(k) => {
match k.ty {
CaMsgTy::SearchRes(k) => {
let a = k.addr.to_be_bytes();
let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port);
info!("Search result indicates server address: {addr}");
}
CaMsgTy::CreateChanRes(k) => {
// TODO handle cid-not-found which can also indicate peer error.
let cid = k.cid;
let sid = k.sid;
// TODO handle error:
let name = self.name_by_cid(cid).unwrap().to_string();
info!("CreateChanRes {name:?}");
let scalar_type = ScalarType::from_ca_id(k.data_type)?;
let shape = Shape::from_ca_count(k.data_count)?;
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Created(CreatedState {
cid,
sid,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
ts_created: Instant::now(),
state: MonitoringState::FetchSeriesId,
});
// TODO handle error in different way. Should most likely not abort.
let cd = ChannelDescDecoded {
name: name.to_string(),
scalar_type,
shape,
agg_kind: netpod::AggKind::Plain,
// TODO these play no role in series id:
byte_order: netpod::ByteOrder::LE,
compression: None,
};
let y = self.as_ref();
let y = unsafe { Pin::into_inner_unchecked(y) };
let y = unsafe { &*(y as *const CaConn) };
let fut =
y.data_store.chan_reg.get_series_id(cd).map_ok(move |series| {
(cid, k.sid, k.data_type, k.data_count, series)
});
// TODO throttle execution rate:
self.fut_get_series.push(Box::pin(fut) as _);
do_wake_again = true;
}
CaMsgTy::EventAddRes(k) => {
// TODO handle subid-not-found which can also be peer error:
let cid = *self.cid_by_subid.get(&k.subid).unwrap();
// TODO get rid of the string clone when I don't want the log output any longer:
let name: String = self.name_by_cid(cid).unwrap().into();
// TODO handle not-found error:
let mut series_2 = None;
let ch_s = self.channels.get_mut(&cid).unwrap();
match ch_s {
ChannelState::Created(st) => {
match st.state {
MonitoringState::AddingEvent(ref series) => {
let series = series.clone();
series_2 = Some(series.clone());
info!("Confirmation {name} is subscribed.");
// TODO get ts from faster common source:
st.state = MonitoringState::Evented(
series,
EventedState {
ts_last: Instant::now(),
},
);
}
MonitoringState::Evented(ref series, ref mut st) => {
series_2 = Some(series.clone());
// TODO get ts from faster common source:
st.ts_last = Instant::now();
}
_ => {
error!(
"unexpected state: EventAddRes while having {ch_s:?}"
);
}
}
}
_ => {
error!("unexpected state: EventAddRes while having {ch_s:?}");
}
}
_ => {
warn!("unexpected state: EventAddRes while having {ch_s:?}");
{
let series = series_2.unwrap();
// TODO where to actually get the time from?
let ts = SystemTime::now();
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
let ts = epoch.as_secs() * 1000000000 + epoch.subsec_nanos() as u64;
let ts_msp = ts / (30 * SEC) * (30 * SEC);
let ts_lsp = ts - ts_msp;
// TODO make sure that I only accept types I expect.
use crate::ca::proto::CaDataScalarValue::*;
use crate::ca::proto::CaDataValue::*;
match k.value {
Scalar(v) => {
match v {
F64(val) => {
match ch_s {
ChannelState::Created(st) => {
match st.shape {
Shape::Scalar => {
match st.scalar_type {
ScalarType::F64 => {
// TODO get msp/lsp: cap at some count. only one writer!!
let y = self.as_ref();
let y = unsafe {
Pin::into_inner_unchecked(y)
};
let y = unsafe {
&*(y as *const CaConn)
};
let fut1 = y.data_store.scy.execute(
&y.data_store.qu_insert_ts_msp,
(ts_msp as i64, ts_lsp as i64),
).map(|_| Ok::<_, Error>(()))
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")));
let fut2 = y.data_store.scy.execute(
&y.data_store.qu_insert_scalar_f64,
(series.id() as i64, ts_msp as i64, ts_lsp as i64, 0i64, val),
).map(|_| Ok::<_, Error>(()))
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")));
let fut = fut1
.and_then(move |a| fut2);
if self.value_insert_futs.len()
> 10
{
warn!("can not keep up");
} else {
self.value_insert_futs
.push(
Box::pin(fut) as _
);
}
}
_ => {
error!("unexpected value type");
}
}
}
_ => {
error!("unexpected value shape");
}
}
}
_ => {
error!("got value but channel not created");
}
}
}
_ => {}
}
}
Array => {}
}
}
}
_ => {}
}
_ => {}
},
}
_ => {}
}
Ready(Some(Ok(())))

View File

@@ -63,6 +63,7 @@ pub struct EventAddRes {
pub data_count: u16,
pub status: u32,
pub subid: u32,
pub value: CaDataValue,
}
#[derive(Debug)]
@@ -92,6 +93,23 @@ enum CaScalarType {
String,
}
#[derive(Clone, Debug)]
pub enum CaDataScalarValue {
I8(i8),
I16(i16),
I32(i32),
F32(f32),
F64(f64),
Enum(i16),
String(String),
}
#[derive(Clone, Debug)]
pub enum CaDataValue {
Scalar(CaDataScalarValue),
Array,
}
impl CaScalarType {
fn from_ca_u16(k: u16) -> Result<Self, Error> {
use CaScalarType::*;
@@ -416,7 +434,7 @@ impl CaMsg {
}
1 => {
let ca_st = CaScalarType::from_ca_u16(hi.data_type)?;
match ca_st {
let value = match ca_st {
CaScalarType::F64 => {
if payload.len() < 2 {
return Err(Error::with_msg_no_trace(format!(
@@ -425,7 +443,7 @@ impl CaMsg {
)));
}
let v = f64::from_be_bytes(payload.try_into()?);
info!("f64: {v}");
CaDataValue::Scalar(CaDataScalarValue::F64(v))
}
CaScalarType::Enum => {
if payload.len() < 2 {
@@ -434,8 +452,8 @@ impl CaMsg {
payload.len()
)));
}
let v = u16::from_be_bytes(payload[..2].try_into()?);
info!("enum payload: {v}");
let v = i16::from_be_bytes(payload[..2].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::I16(v))
}
CaScalarType::String => {
let mut ixn = payload.len();
@@ -447,17 +465,19 @@ impl CaMsg {
}
//info!("try to read string from payload len {} ixn {}", payload.len(), ixn);
let v = String::from_utf8_lossy(&payload[..ixn]);
info!("String payload: {v}");
CaDataValue::Scalar(CaDataScalarValue::String(v.into()))
}
_ => {
warn!("TODO handle {ca_st:?}");
return Err(Error::with_msg_no_trace(format!("can not yet handle type {ca_st:?}")));
}
}
};
let d = EventAddRes {
data_type: hi.data_type,
data_count: hi.data_count,
status: hi.param1,
subid: hi.param2,
value,
};
CaMsg {
ty: CaMsgTy::EventAddRes(d),

65
netfetch/src/ca/store.rs Normal file
View File

@@ -0,0 +1,65 @@
use crate::bsread::ChannelDescDecoded;
use crate::series::{Existence, SeriesId};
use async_channel::{Receiver, Sender};
use err::Error;
use scylla::prepared_statement::PreparedStatement;
use scylla::Session as ScySession;
use std::sync::Arc;
pub struct RegisterJob {
desc: ChannelDescDecoded,
}
impl RegisterJob {
pub fn new(desc: ChannelDescDecoded) -> Self {
Self { desc }
}
}
pub struct RegisterChannel {
tx: Sender<RegisterJob>,
rx: Receiver<RegisterJob>,
}
pub struct ChannelRegistry {
scy: Arc<ScySession>,
}
impl ChannelRegistry {
pub fn new(scy: Arc<ScySession>) -> Self {
Self { scy }
}
pub async fn get_series_id(&self, cd: ChannelDescDecoded) -> Result<Existence<SeriesId>, Error> {
crate::series::get_series_id(&self.scy, &cd).await
}
}
pub struct DataStore {
pub scy: Arc<ScySession>,
pub qu_insert_ts_msp: Arc<PreparedStatement>,
pub qu_insert_scalar_f64: Arc<PreparedStatement>,
pub chan_reg: Arc<ChannelRegistry>,
}
impl DataStore {
pub async fn new(scy: Arc<ScySession>) -> Result<Self, Error> {
let q = scy
.prepare("insert into ts_msp (series, ts_msp) values (?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_ts_msp = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_f64 = Arc::new(q);
let ret = Self {
chan_reg: Arc::new(ChannelRegistry::new(scy.clone())),
scy,
qu_insert_ts_msp,
qu_insert_scalar_f64,
};
Ok(ret)
}
}

View File

@@ -2,6 +2,7 @@ pub mod bsread;
pub mod ca;
pub mod channelwriter;
pub mod netbuf;
pub mod series;
#[cfg(test)]
pub mod test;
pub mod zmtp;

87
netfetch/src/series.rs Normal file
View File

@@ -0,0 +1,87 @@
use crate::bsread::ChannelDescDecoded;
use crate::zmtp::ErrConv;
use err::Error;
#[allow(unused)]
use log::*;
use scylla::Session as ScySession;
use std::time::Duration;
#[derive(Clone, Debug)]
pub enum Existence<T> {
Created(T),
Existing(T),
}
#[derive(Clone, Debug)]
pub struct SeriesId(u64);
impl SeriesId {
pub fn id(&self) -> u64 {
self.0
}
}
// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration.
pub async fn get_series_id(scy: &ScySession, cd: &ChannelDescDecoded) -> Result<Existence<SeriesId>, Error> {
let facility = "scylla";
let channel_name = &cd.name;
let scalar_type = cd.scalar_type.to_scylla_i32();
let shape = cd.shape.to_scylla_vec();
info!("get_series_id {facility:?} {channel_name:?} {scalar_type:?} {shape:?}");
let res = scy
.query(
"select series, agg_kind from series_by_channel where facility = ? and channel_name = ? and scalar_type = ? and shape_dims = ?",
(facility, channel_name, &scalar_type, &shape),
)
.await
.err_conv()?;
let mut all = vec![];
for row in res.rows_typed_or_empty::<(i64, Option<String>)>() {
match row {
Ok(k) => {
if k.1.is_none() {
all.push(k.0);
}
}
Err(e) => return Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
info!("all: {all:?}");
let rn = all.len();
if rn == 0 {
use md5::Digest;
let mut h = md5::Md5::new();
h.update(facility.as_bytes());
h.update(channel_name.as_bytes());
h.update(format!("{:?}", scalar_type).as_bytes());
h.update(format!("{:?}", shape).as_bytes());
let f = h.finalize();
let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
for _ in 0..2000 {
let res = scy
.query(
concat!(
"insert into series_by_channel",
" (facility, channel_name, scalar_type, shape_dims, agg_kind, series)",
" values (?, ?, ?, ?, null, ?) if not exists"
),
(facility, channel_name, &scalar_type, &shape, series as i64),
)
.await
.err_conv()?;
let row = res.first_row().err_conv()?;
if row.columns[0].as_ref().unwrap().as_boolean().unwrap() {
return Ok(Existence::Created(SeriesId(series)));
} else {
error!("tried to insert but series exists...");
}
tokio::time::sleep(Duration::from_millis(20)).await;
series += 1;
}
Err(Error::with_msg_no_trace(format!("can not create and insert series id")))
} else {
let series = all[0] as u64;
info!("series: {:?}", series);
Ok(Existence::Existing(SeriesId(series)))
}
}

View File

@@ -1,4 +1,4 @@
use crate::bsread::{BsreadMessage, Parser};
use crate::bsread::{BsreadMessage, ChannelDescDecoded, Parser};
use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB};
use crate::channelwriter::{ChannelWriter, ChannelWriterAll};
use crate::netbuf::NetBuf;
@@ -10,10 +10,10 @@ use futures_core::{Future, Stream};
use futures_util::{pin_mut, FutureExt, StreamExt};
use log::*;
use netpod::timeunits::*;
use netpod::{ByteOrder, ScalarType, Shape};
use scylla::batch::{Batch, BatchType, Consistency};
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::QueryError;
use scylla::transport::query_result::{FirstRowError, RowsExpectedError};
use scylla::{Session as ScySession, SessionBuilder};
use serde_json::Value as JsVal;
use stats::CheckEvery;
@@ -41,6 +41,24 @@ impl<T> ErrConv<T> for Result<T, QueryError> {
}
}
impl<T> ErrConv<T> for Result<T, RowsExpectedError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, FirstRowError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
#[allow(unused)]
fn test_listen() -> Result<(), Error> {
use std::time::Duration;
@@ -72,7 +90,7 @@ fn test_service() -> Result<(), Error> {
taskrun::run(fut)
}
pub fn get_series_id(chn: &ChannelDesc) -> u64 {
pub fn __get_series_id(chn: &ChannelDesc) -> u64 {
// TODO use a more stable format (with ScalarType, Shape) as hash input.
// TODO do not depend at all on the mapping, instead look it up on demand and cache.
use md5::Digest;
@@ -84,6 +102,11 @@ pub fn get_series_id(chn: &ChannelDesc) -> u64 {
u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap())
}
pub async fn get_series_id(scy: &ScySession, chn: &ChannelDescDecoded) -> Result<u64, Error> {
error!("TODO get_series_id");
err::todoval()
}
pub struct CommonQueries {
pub qu1: PreparedStatement,
pub qu2: PreparedStatement,
@@ -231,9 +254,11 @@ impl BsreadClient {
if dh_md5_last.is_empty() {
info!("data header hash {}", bm.head_b_md5);
dh_md5_last = bm.head_b_md5.clone();
let scy = self.scy.clone();
for chn in &head_b.channels {
info!("Setup writer for {}", chn.name);
match self.setup_channel_writers(chn).await {
let cd: ChannelDescDecoded = chn.try_into()?;
match self.setup_channel_writers(&scy, &cd).await {
Ok(_) => {}
Err(e) => {
warn!("can not set up writer for {} {e:?}", chn.name);
@@ -283,13 +308,16 @@ impl BsreadClient {
.len()
.min(self.opts.process_channel_count_limit.unwrap_or(4000))
{
// TODO skip decoding if header unchanged.
let chn = &head_b.channels[i1];
let chd: ChannelDescDecoded = chn.try_into()?;
let fr = &msg.frames[2 + 2 * i1];
// TODO refactor to make correctness evident.
if i1 >= series_ids.len() {
series_ids.resize(head_b.channels.len(), (0u8, 0u64));
}
if series_ids[i1].0 == 0 {
let series = get_series_id(chn);
let series = get_series_id(&self.scy, &chd).await?;
series_ids[i1].0 = 1;
series_ids[i1].1 = series;
}
@@ -349,38 +377,32 @@ impl BsreadClient {
Ok(())
}
async fn setup_channel_writers(&mut self, chn: &ChannelDesc) -> Result<(), Error> {
let series = get_series_id(chn);
let has_comp = match &chn.compression {
Some(s) => s != "none",
None => false,
};
async fn setup_channel_writers(&mut self, scy: &ScySession, cd: &ChannelDescDecoded) -> Result<(), Error> {
let series = get_series_id(scy, cd).await?;
let has_comp = cd.compression.is_some();
if has_comp {
warn!("Compression not yet supported [{}]", chn.name);
warn!("Compression not yet supported [{}]", cd.name);
return Ok(());
}
let scalar_type = ScalarType::from_bsread_str(&chn.ty)?;
let shape = Shape::from_bsread_jsval(&chn.shape)?;
let byte_order = ByteOrder::from_bsread_str(&chn.encoding)?;
let trunc = self.opts.array_truncate.unwrap_or(64);
let cw = ChannelWriterAll::new(
series,
self.common_queries.clone(),
self.scy.clone(),
scalar_type.clone(),
shape.clone(),
byte_order.clone(),
cd.scalar_type.clone(),
cd.shape.clone(),
cd.byte_order.clone(),
trunc,
self.opts.skip_insert,
)?;
let shape_dims = shape.to_scylla_vec();
let shape_dims = cd.shape.to_scylla_vec();
self.channel_writers.insert(series, Box::new(cw));
if !self.opts.skip_insert {
// TODO insert correct facility name
self.scy
.query(
"insert into series_by_channel (facility, channel_name, series, scalar_type, shape_dims) values (?, ?, ?, ?, ?)",
("scylla", &chn.name, series as i64, scalar_type.to_scylla_i32(), &shape_dims),
"insert into series_by_channel (facility, channel_name, series, scalar_type, shape_dims) values (?, ?, ?, ?, ?) if not exists",
("scylla", &cd.name, series as i64, cd.scalar_type.to_scylla_i32(), &shape_dims),
)
.await
.err_conv()?;
@@ -655,28 +677,8 @@ impl BsreadDumper {
let mut bytes_payload = 0u64;
for i1 in 0..head_b.channels.len() {
let chn = &head_b.channels[i1];
let _cd: ChannelDescDecoded = chn.try_into()?;
let fr = &msg.frames[2 + 2 * i1];
let _series = get_series_id(chn);
if chn.ty == "string" {
info!("string channel: {} {:?}", chn.name, chn.shape);
if let Ok(shape) = Shape::from_bsread_jsval(&chn.shape) {
if let Ok(_bo) = ByteOrder::from_bsread_str(&chn.encoding) {
match &shape {
Shape::Scalar => {
info!("scalar string...");
let s = String::from_utf8_lossy(fr.data());
info!("STRING: {s:?}");
}
_ => {
warn!(
"non-scalar string channels not yet implemented {}",
chn.name
);
}
}
}
}
}
bytes_payload += fr.data().len() as u64;
}
info!("zmtp message ts {ts} pulse {pulse} bytes_payload {bytes_payload}");