Factor out match arms

This commit is contained in:
Dominik Werder
2022-05-09 13:25:41 +02:00
parent c6b2756d4a
commit a964e49aa6
17 changed files with 1039 additions and 441 deletions

View File

@@ -8,4 +8,9 @@ rustflags = [
#"-C", "inline-threshold=1000",
#"-Z", "time-passes=yes",
#"-Z", "time-llvm-passes=yes",
"--cfg", "tokio_unstable",
]
rustdocflags = [
"--cfg", "tokio_unstable"
]

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
/target
/Cargo.lock
/tmpdoc

View File

@@ -11,4 +11,4 @@ codegen-units = 32
incremental = true
[patch.crates-io]
tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" }
#tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" }

View File

@@ -21,7 +21,7 @@ pub fn main() -> Result<(), Error> {
}
SubCmd::ChannelAccess(k) => match k {
ChannelAccess::CaChannel(_) => todo!(),
ChannelAccess::CaSearch(k) => netfetch::ca::ca_search_2(k.into()).await?,
ChannelAccess::CaSearch(k) => netfetch::ca::ca_search(k.into()).await?,
ChannelAccess::CaConfig(k) => netfetch::ca::ca_connect(k.into()).await?,
},
}

View File

@@ -99,6 +99,7 @@ impl From<CaChannel> for CaConnectOpts {
max_simul: 113,
timeout: 2000,
abort_after_search: 0,
pg_pass: "".into(),
}
}
}

View File

@@ -12,7 +12,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11"
serde_yaml = "0.8.23"
tokio = { version = "1.7", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
tokio = { version = "1.18.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] }
tokio-stream = { version = "0.1", features = ["fs"]}
async-channel = "1.6"
bytes = "1.0"
@@ -22,6 +22,7 @@ futures-core = "0.3"
futures-util = "0.3"
#pin-project-lite = "0.2"
scylla = "0.4"
tokio-postgres = "0.7.6"
md-5 = "0.9"
hex = "0.4"
libc = "0.2"

View File

@@ -4,18 +4,18 @@ pub mod store;
use self::conn::FindIocStream;
use self::store::DataStore;
use crate::zmtp::ErrConv;
use conn::CaConn;
use err::Error;
use futures_util::StreamExt;
use log::*;
use netpod::Database;
use scylla::batch::Consistency;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
@@ -32,6 +32,7 @@ struct ChannelConfig {
timeout: Option<u64>,
#[serde(default)]
abort_after_search: u32,
pg_pass: String,
}
pub struct ListenFromFileOpts {
@@ -68,6 +69,7 @@ pub async fn parse_config(config: PathBuf) -> Result<CaConnectOpts, Error> {
max_simul: conf.max_simul.unwrap_or(113),
timeout: conf.timeout.unwrap_or(2000),
abort_after_search: conf.abort_after_search,
pg_pass: conf.pg_pass,
})
}
@@ -79,6 +81,7 @@ pub struct CaConnectOpts {
pub max_simul: usize,
pub timeout: u64,
pub abort_after_search: u32,
pub pg_pass: String,
}
async fn resolve_address(addr_str: &str) -> Result<SocketAddrV4, Error> {
@@ -112,20 +115,42 @@ async fn resolve_address(addr_str: &str) -> Result<SocketAddrV4, Error> {
Ok(ac)
}
pub async fn ca_search_2(opts: ListenFromFileOpts) -> Result<(), Error> {
pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
let facility = "scylla";
let opts = parse_config(opts.config).await?;
let scy = scylla::SessionBuilder::new()
.known_node("sf-nube-11:19042")
.default_consistency(Consistency::Quorum)
.use_keyspace("ks1", true)
.build()
let d = Database {
name: "daqbuffer".into(),
host: "sf-nube-11".into(),
user: "daqbuffer".into(),
pass: opts.pg_pass.clone(),
};
let (pg_client, pg_conn) = tokio_postgres::connect(
&format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name),
tokio_postgres::tls::NoTls,
)
.await
.unwrap();
// TODO join pg_conn in the end:
tokio::spawn(pg_conn);
let pg_client = Arc::new(pg_client);
let qu_insert = {
const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT;
pg_client
.prepare_typed(
"insert into ioc_by_channel (facility, channel, searchaddr, addr) values ($1, $2, $3, $4)",
&[TEXT, TEXT, TEXT, TEXT],
)
.await
.unwrap()
};
let qu_select = pg_client
.prepare("select addr from ioc_by_channel where facility = $1 and channel = $2 and searchaddr = $3")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu = scy
.prepare("insert into ioc_by_channel (facility, channel, searchaddr, addr) values (?, ?, ?, ?)")
.unwrap();
let qu_update = pg_client
.prepare("update ioc_by_channel set addr = $4 where facility = $1 and channel = $2 and searchaddr = $3")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
.unwrap();
let mut addrs = vec![];
for s in &opts.search {
let x = resolve_address(s).await?;
@@ -135,12 +160,14 @@ pub async fn ca_search_2(opts: ListenFromFileOpts) -> Result<(), Error> {
for ch in &opts.channels {
finder.push(ch.into());
}
let deadline = tokio::time::Instant::now()
.checked_add(Duration::from_millis(100000000))
.unwrap();
let mut i1 = 0;
let mut ts_last = Instant::now();
loop {
let k = tokio::time::timeout_at(deadline, finder.next()).await;
let ts_now = Instant::now();
if ts_now.duration_since(ts_last) >= Duration::from_millis(1000) {
ts_last = ts_now;
info!("{}", finder.quick_state());
}
let k = tokio::time::timeout(Duration::from_millis(200), finder.next()).await;
let item = match k {
Ok(Some(k)) => k,
Ok(None) => {
@@ -148,36 +175,39 @@ pub async fn ca_search_2(opts: ListenFromFileOpts) -> Result<(), Error> {
break;
}
Err(_) => {
warn!("timed out");
break;
continue;
}
};
let item = match item {
Ok(k) => k,
Err(e) => {
error!("ca_search_2 {e:?}");
error!("ca_search {e:?}");
continue;
}
};
for item in item {
scy.execute(
&qu,
(
facility,
&item.channel,
item.src.to_string(),
item.addr.map(|x| x.to_string()),
),
)
.await
.err_conv()?;
let searchaddr = item.src.to_string();
let addr = item.addr.map(|x| x.to_string()).unwrap_or(String::new());
let rows = pg_client
.query(&qu_select, &[&facility, &item.channel, &searchaddr])
.await
.unwrap();
if rows.is_empty() {
pg_client
.execute(&qu_insert, &[&facility, &item.channel, &searchaddr, &addr])
.await
.unwrap();
} else {
let addr2: &str = rows[0].get(0);
if addr2 != addr {
pg_client
.execute(&qu_update, &[&facility, &item.channel, &searchaddr, &addr])
.await
.unwrap();
}
}
}
tokio::time::sleep(Duration::from_millis(1)).await;
i1 += 1;
if i1 > 500 {
i1 = 0;
info!("{}", finder.quick_state());
}
}
Ok(())
}
@@ -185,6 +215,21 @@ pub async fn ca_search_2(opts: ListenFromFileOpts) -> Result<(), Error> {
pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let facility = "scylla";
let opts = parse_config(opts.config).await?;
let d = Database {
name: "daqbuffer".into(),
host: "sf-nube-11".into(),
user: "daqbuffer".into(),
pass: opts.pg_pass.clone(),
};
let (pg_client, pg_conn) = tokio_postgres::connect(
&format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name),
tokio_postgres::tls::NoTls,
)
.await
.unwrap();
// TODO allow clean shutdown on ctrl-c and join the pg_conn in the end:
tokio::spawn(pg_conn);
let pg_client = Arc::new(pg_client);
let scy = scylla::SessionBuilder::new()
.known_node("sf-nube-11:19042")
.default_consistency(Consistency::Quorum)
@@ -194,22 +239,28 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scy = Arc::new(scy);
info!("FIND IOCS");
let qu_find_addr = scy
.prepare("select addr from ioc_by_channel where facility = ? and channel = ?")
let qu_find_addr = pg_client
.prepare("select addr from ioc_by_channel where facility = $1 and channel = $2")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let mut channels_by_host = BTreeMap::new();
for (ix, ch) in opts.channels.iter().enumerate() {
let res = scy
.execute(&qu_find_addr, (facility, ch))
let rows = pg_client
.query(&qu_find_addr, &[&facility, ch])
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
if res.rows_num().unwrap() == 0 {
if rows.is_empty() {
error!("can not find address of channel {}", ch);
} else {
let (addr,) = res.first_row_typed::<(String,)>().unwrap();
let addr: SocketAddrV4 = addr.parse().unwrap();
if ix % 500 == 0 {
let addr: &str = rows[0].get(0);
let addr: SocketAddrV4 = match addr.parse() {
Ok(k) => k,
Err(e) => {
error!("can not parse {addr:?} {e:?}");
continue;
}
};
if ix % 1 == 0 {
info!("{} {} {:?}", ix, ch, addr);
}
if !channels_by_host.contains_key(&addr) {
@@ -223,14 +274,18 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
return Ok(());
}
info!("CONNECT TO HOSTS");
let data_store = Arc::new(DataStore::new(scy.clone()).await?);
let data_store = Arc::new(DataStore::new(pg_client, scy.clone()).await?);
let mut conn_jhs = vec![];
for (host, channels) in channels_by_host {
if false && host.ip() != &"172.26.24.76".parse::<Ipv4Addr>().unwrap() {
continue;
}
let data_store = data_store.clone();
let conn_block = async move {
info!("Create TCP connection to {:?}", (host.ip(), host.port()));
let tcp = TcpStream::connect((host.ip().clone(), host.port())).await?;
let mut conn = CaConn::new(tcp, data_store.clone());
let addr = SocketAddrV4::new(host.ip().clone(), host.port());
let tcp = TcpStream::connect(addr).await?;
let mut conn = CaConn::new(tcp, addr, data_store.clone());
for c in channels {
conn.channel_add(c);
}

View File

@@ -3,6 +3,7 @@ use super::store::DataStore;
use crate::bsread::ChannelDescDecoded;
use crate::ca::proto::{CreateChan, EventAdd, HeadInfo, ReadNotify};
use crate::series::{Existence, SeriesId};
use crate::store::ScyInsertFut;
use err::Error;
use futures_util::stream::FuturesUnordered;
use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt};
@@ -20,7 +21,8 @@ use std::time::{Duration, Instant, SystemTime};
use tokio::io::unix::AsyncFd;
use tokio::net::TcpStream;
const INSERT_FUTS_MAX: usize = 10;
const INSERT_FUTS_MAX: usize = 200;
const TABLE_SERIES_MOD: u32 = 2;
#[derive(Debug)]
enum ChannelError {
@@ -78,12 +80,206 @@ impl IdStore {
}
fn next(&mut self) -> u32 {
let ret = self.next;
self.next += 1;
let ret = self.next;
ret
}
}
// TODO test that errors are properly forwarded.
macro_rules! insert_scalar_impl {
($fname:ident, $valty:ty, $qu_insert:ident) => {
fn $fname(
data_store: Arc<DataStore>,
// TODO maybe use a newtype?
futs_queue: &mut FuturesUnordered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
series: SeriesId,
ts_msp: u64,
ts_lsp: u64,
val: $valty,
ts_msp_changed: bool,
st: Option<ScalarType>,
sh: Option<Shape>,
) {
let pulse = 0 as u64;
let params = (
series.id() as i64,
ts_msp as i64,
ts_lsp as i64,
pulse as i64,
val,
);
let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params);
let fut = if ts_msp_changed {
let fut1 = ScyInsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_series.clone(),
(
(series.id() as u32 % TABLE_SERIES_MOD) as i32,
series.id() as i64,
ts_msp as i64,
st.map(|x| x.to_scylla_i32()),
sh.map(|x| x.to_scylla_vec()),
),
);
let fut2 = ScyInsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_ts_msp.clone(),
(series.id() as i64, ts_msp as i64),
);
Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _
} else {
Box::pin(fut3) as _
};
if futs_queue.len() >= INSERT_FUTS_MAX {
warn!("can not keep up");
// TODO count these events, this means dataloss.
} else {
futs_queue.push(fut);
}
}
};
}
// TODO test that errors are properly forwarded.
macro_rules! insert_array_impl {
($fname:ident, $valty:ty, $qu_insert:ident) => {
fn $fname(
data_store: Arc<DataStore>,
// TODO maybe use a newtype?
futs_queue: &mut FuturesUnordered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
series: SeriesId,
ts_msp: u64,
ts_lsp: u64,
val: Vec<$valty>,
ts_msp_changed: bool,
st: Option<ScalarType>,
sh: Option<Shape>,
) {
let pulse = 0 as u64;
let params = (
series.id() as i64,
ts_msp as i64,
ts_lsp as i64,
pulse as i64,
val,
);
let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params);
let fut = if ts_msp_changed {
let fut1 = ScyInsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_series.clone(),
(
(series.id() as u32 % TABLE_SERIES_MOD) as i32,
series.id() as i64,
ts_msp as i64,
st.map(|x| x.to_scylla_i32()),
sh.map(|x| x.to_scylla_vec()),
),
);
let fut2 = ScyInsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_ts_msp.clone(),
(series.id() as i64, ts_msp as i64),
);
Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _
} else {
Box::pin(fut3) as _
};
if futs_queue.len() >= INSERT_FUTS_MAX {
warn!("can not keep up");
// TODO count these events, this means dataloss.
} else {
futs_queue.push(fut);
}
}
};
}
insert_scalar_impl!(insert_scalar_i8, i8, qu_insert_scalar_i8);
insert_scalar_impl!(insert_scalar_i16, i16, qu_insert_scalar_i16);
insert_scalar_impl!(insert_scalar_i32, i32, qu_insert_scalar_i32);
insert_scalar_impl!(insert_scalar_f32, f32, qu_insert_scalar_f32);
insert_scalar_impl!(insert_scalar_f64, f64, qu_insert_scalar_f64);
insert_scalar_impl!(insert_scalar_string, String, qu_insert_scalar_string);
insert_array_impl!(insert_array_f32, f32, qu_insert_array_f32);
insert_array_impl!(insert_array_f64, f64, qu_insert_array_f64);
macro_rules! match_scalar_value_insert {
($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{
let (data_store, futs_queue, ch_s, series, ts_msp, ts_lsp, ts_msp_changed, channel_scalar_type, channel_shape) =
$comm;
match ch_s {
ChannelState::Created(st) => match st.shape {
Shape::Scalar => match st.scalar_type {
ScalarType::$stv => $insf(
data_store,
futs_queue,
series,
ts_msp,
ts_lsp,
$val,
ts_msp_changed,
channel_scalar_type,
channel_shape,
),
_ => {
error!("unexpected value type insf {:?}", stringify!($insf));
}
},
_ => {
error!(
"unexpected value shape insf {:?} st.shape {:?}",
stringify!($insf),
st.shape
);
}
},
_ => {
error!("got value but channel not created insf {:?}", stringify!($insf));
}
}
}};
}
macro_rules! match_array_value_insert {
($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{
let (data_store, futs_queue, ch_s, series, ts_msp, ts_lsp, ts_msp_changed, channel_scalar_type, channel_shape) =
$comm;
match ch_s {
ChannelState::Created(st) => match st.shape {
Shape::Wave(_) => match st.scalar_type {
ScalarType::$stv => $insf(
data_store,
futs_queue,
series,
ts_msp,
ts_lsp,
$val,
ts_msp_changed,
channel_scalar_type,
channel_shape,
),
_ => {
error!("unexpected value type insf {:?}", stringify!($insf));
}
},
_ => {
error!(
"unexpected value shape insf {:?} st.shape {:?}",
stringify!($insf),
st.shape
);
}
},
_ => {
error!("got value but channel not created insf {:?}", stringify!($insf));
}
}
}};
}
pub struct CaConn {
state: CaConnState,
proto: CaProto,
@@ -93,6 +289,7 @@ pub struct CaConn {
channels: BTreeMap<u32, ChannelState>,
cid_by_name: BTreeMap<String, u32>,
cid_by_subid: BTreeMap<u32, u32>,
ts_msp_last_by_series: BTreeMap<SeriesId, u64>,
name_by_cid: BTreeMap<u32, String>,
poll_count: usize,
data_store: Arc<DataStore>,
@@ -100,24 +297,27 @@ pub struct CaConn {
Pin<Box<dyn Future<Output = Result<(u32, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>,
>,
value_insert_futs: FuturesUnordered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
remote_addr_dbg: SocketAddrV4,
}
impl CaConn {
pub fn new(tcp: TcpStream, data_store: Arc<DataStore>) -> Self {
pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, data_store: Arc<DataStore>) -> Self {
Self {
state: CaConnState::Init,
proto: CaProto::new(tcp),
proto: CaProto::new(tcp, remote_addr_dbg),
cid_store: IdStore::new(),
ioid_store: IdStore::new(),
subid_store: IdStore::new(),
channels: BTreeMap::new(),
cid_by_name: BTreeMap::new(),
cid_by_subid: BTreeMap::new(),
ts_msp_last_by_series: BTreeMap::new(),
name_by_cid: BTreeMap::new(),
poll_count: 0,
data_store,
fut_get_series: FuturesUnordered::new(),
value_insert_futs: FuturesUnordered::new(),
remote_addr_dbg,
}
}
@@ -144,6 +344,75 @@ impl CaConn {
self.name_by_cid.get(&cid).map(|x| x.as_str())
}
fn handle_insert_futs(&mut self, cx: &mut Context) -> Result<(), Error> {
use Poll::*;
while self.value_insert_futs.len() > 0 {
match self.value_insert_futs.poll_next_unpin(cx) {
Pending => break,
_ => {}
}
}
Ok(())
}
fn handle_get_series_futs(&mut self, cx: &mut Context) -> Result<(), Error> {
use Poll::*;
while self.fut_get_series.len() > 0 {
match self.fut_get_series.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
info!("Have SeriesId {k:?}");
let cid = k.0;
let sid = k.1;
let data_type = k.2;
let data_count = k.3;
let series = match k.4 {
Existence::Created(k) => k,
Existence::Existing(k) => k,
};
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
let name = self.name_by_cid(cid).unwrap().to_string();
let msg = CaMsg {
ty: CaMsgTy::EventAdd(EventAdd {
sid,
data_type,
data_count,
subid,
}),
};
self.proto.push_out(msg);
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Created(CreatedState {
cid,
sid,
// TODO handle error better! Transition channel to Error state?
scalar_type: ScalarType::from_ca_id(data_type)?,
shape: Shape::from_ca_count(data_count)?,
ts_created: Instant::now(),
state: MonitoringState::AddingEvent(series),
});
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
let _cd = ChannelDescDecoded {
name: name.to_string(),
scalar_type,
shape,
agg_kind: netpod::AggKind::Plain,
// TODO these play no role in series id:
byte_order: netpod::ByteOrder::LE,
compression: None,
};
cx.waker().wake_by_ref();
}
Ready(Some(Err(e))) => error!("series error: {e:?}"),
Ready(None) => {}
Pending => break,
}
}
Ok(())
}
fn handle_event_add_res(&mut self, ev: proto::EventAddRes) {
// TODO handle subid-not-found which can also be peer error:
let cid = *self.cid_by_subid.get(&ev.subid).unwrap();
@@ -152,13 +421,16 @@ impl CaConn {
// TODO handle not-found error:
let mut series_2 = None;
let ch_s = self.channels.get_mut(&cid).unwrap();
let mut channel_scalar_type = None;
let mut channel_shape = None;
match ch_s {
ChannelState::Created(st) => {
channel_scalar_type = Some(st.scalar_type.clone());
channel_shape = Some(st.shape.clone());
match st.state {
MonitoringState::AddingEvent(ref series) => {
let series = series.clone();
series_2 = Some(series.clone());
info!("Confirmation {name} is subscribed.");
// TODO get ts from faster common source:
st.state = MonitoringState::Evented(
series,
@@ -182,68 +454,237 @@ impl CaConn {
}
}
{
let series = series_2.unwrap();
let series = match series_2 {
Some(k) => k,
None => {
error!("handle_event_add_res but no series");
// TODO allow return Result
return;
}
};
// TODO where to actually get the time from?
let ts = SystemTime::now();
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
// TODO decide on better msp/lsp: random offset!
// As long as one writer is active, the msp is arbitrary.
let ts = epoch.as_secs() * 1000000000 + epoch.subsec_nanos() as u64;
let ts_msp = ts / (30 * SEC) * (30 * SEC);
let ts_msp = ts / (60 * SEC) * (60 * SEC);
let ts_lsp = ts - ts_msp;
let ts_msp_changed = if let Some(ts_msp_cur) = self.ts_msp_last_by_series.get_mut(&series) {
if ts_msp != *ts_msp_cur {
*ts_msp_cur = ts_msp;
true
} else {
false
}
} else {
self.ts_msp_last_by_series.insert(series.clone(), ts_msp);
true
};
// TODO make sure that I only accept types I expect.
use crate::ca::proto::CaDataScalarValue::*;
use crate::ca::proto::CaDataValue::*;
let data_store = self.data_store.clone();
let futs_queue = &mut self.value_insert_futs;
let comm = (
data_store,
futs_queue,
ch_s,
series,
ts_msp,
ts_lsp,
ts_msp_changed,
channel_scalar_type,
channel_shape,
);
match ev.value {
Scalar(v) => match v {
F64(val) => match ch_s {
ChannelState::Created(st) => match st.shape {
Shape::Scalar => match st.scalar_type {
ScalarType::F64 => self.insert_scalar_f64(series, ts_msp, ts_lsp, val),
_ => {
error!("unexpected value type");
}
},
_ => {
error!("unexpected value shape");
}
},
_ => {
error!("got value but channel not created");
}
},
_ => {}
I8(val) => match_scalar_value_insert!(I8, insert_scalar_i8, val, comm),
I16(val) => match_scalar_value_insert!(I16, insert_scalar_i16, val, comm),
I32(val) => match_scalar_value_insert!(I32, insert_scalar_i32, val, comm),
F32(val) => match_scalar_value_insert!(F32, insert_scalar_f32, val, comm),
F64(val) => match_scalar_value_insert!(F64, insert_scalar_f64, val, comm),
String(val) => match_scalar_value_insert!(STRING, insert_scalar_string, val, comm),
_ => {
warn!("can not handle Scalar {:?}", v);
}
},
Array => {}
Array(v) => {
use crate::ca::proto::CaDataArrayValue::*;
match v {
F32(val) => match_array_value_insert!(F32, insert_array_f32, val, comm),
F64(val) => match_array_value_insert!(F64, insert_array_f64, val, comm),
_ => {
warn!("can not handle Array ty {} n {}", ev.data_type, ev.data_count);
}
}
}
}
}
}
fn insert_scalar_f64(&mut self, series: SeriesId, ts_msp: u64, ts_lsp: u64, val: f64) {
let pulse = 0 as u64;
let y = unsafe { &*(self as *const CaConn) };
let fut1 = y
.data_store
.scy
.execute(&y.data_store.qu_insert_ts_msp, (series.id() as i64, ts_msp as i64))
.map(|_| Ok::<_, Error>(()))
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")));
let fut2 = y
.data_store
.scy
.execute(
&y.data_store.qu_insert_scalar_f64,
(series.id() as i64, ts_msp as i64, ts_lsp as i64, pulse as i64, val),
)
.map(|_| Ok::<_, Error>(()))
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")));
let fut = fut1.and_then(move |_a| fut2);
if self.value_insert_futs.len() > INSERT_FUTS_MAX {
warn!("can not keep up");
} else {
self.value_insert_futs.push(Box::pin(fut) as _);
fn handle_conn_listen(&mut self, cx: &mut Context) -> Option<Poll<Option<Result<(), Error>>>> {
use Poll::*;
match self.proto.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(k) => match k {
CaItem::Empty => {
info!("CaItem::Empty");
Some(Ready(Some(Ok(()))))
}
CaItem::Msg(msg) => match msg.ty {
CaMsgTy::VersionRes(n) => {
if n < 12 || n > 13 {
error!("See some unexpected version {n} channel search may not work.");
Some(Ready(Some(Ok(()))))
} else {
info!("Received peer version {n}");
self.state = CaConnState::PeerReady;
None
}
}
k => {
warn!("Got some other unhandled message: {k:?}");
Some(Ready(Some(Ok(()))))
}
},
},
Err(e) => {
error!("got error item from CaProto {e:?}");
Some(Ready(Some(Ok(()))))
}
},
Ready(None) => {
warn!("CaProto is done {:?}", self.remote_addr_dbg);
self.state = CaConnState::Done;
None
}
Pending => Some(Pending),
}
}
fn check_channels_state_init(&mut self, msgs_tmp: &mut Vec<CaMsg>) -> Result<(), Error> {
// TODO profile, efficient enough?
let keys: Vec<u32> = self.channels.keys().map(|x| *x).collect();
for cid in keys {
match self.channels.get_mut(&cid).unwrap() {
ChannelState::Init => {
let name = self
.name_by_cid(cid)
.ok_or_else(|| Error::with_msg_no_trace("name for cid not known"));
let name = match name {
Ok(k) => k,
Err(e) => return Err(e),
};
info!("Sending CreateChan for {}", name);
let msg = CaMsg {
ty: CaMsgTy::CreateChan(CreateChan {
cid,
channel: name.into(),
}),
};
msgs_tmp.push(msg);
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Creating {
cid,
ts_beg: Instant::now(),
};
}
_ => {}
}
}
Ok(())
}
fn handle_peer_ready(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
use Poll::*;
// TODO unify with Listen state where protocol gets polled as well.
let mut msgs_tmp = vec![];
self.check_channels_state_init(&mut msgs_tmp)?;
let mut do_wake_again = false;
if msgs_tmp.len() > 0 {
info!("msgs_tmp.len() {}", msgs_tmp.len());
do_wake_again = true;
}
// TODO be careful to not overload outgoing message queue.
for msg in msgs_tmp {
self.proto.push_out(msg);
}
let res = match self.proto.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
match k {
CaItem::Msg(k) => {
match k.ty {
CaMsgTy::SearchRes(k) => {
let a = k.addr.to_be_bytes();
let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port);
info!("Search result indicates server address: {addr}");
}
CaMsgTy::CreateChanRes(k) => {
// TODO handle cid-not-found which can also indicate peer error.
let cid = k.cid;
let sid = k.sid;
// TODO handle error:
let name = self.name_by_cid(cid).unwrap().to_string();
info!("CreateChanRes {name:?}");
let scalar_type = ScalarType::from_ca_id(k.data_type)?;
let shape = Shape::from_ca_count(k.data_count)?;
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Created(CreatedState {
cid,
sid,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
ts_created: Instant::now(),
state: MonitoringState::FetchSeriesId,
});
// TODO handle error in different way. Should most likely not abort.
let cd = ChannelDescDecoded {
name: name.to_string(),
scalar_type,
shape,
agg_kind: netpod::AggKind::Plain,
// TODO these play no role in series id:
byte_order: netpod::ByteOrder::LE,
compression: None,
};
let y = unsafe { &*(&self as &Self as *const CaConn) };
let fut = y
.data_store
.chan_reg
.get_series_id(cd)
.map_ok(move |series| (cid, k.sid, k.data_type, k.data_count, series));
// TODO throttle execution rate:
self.fut_get_series.push(Box::pin(fut) as _);
do_wake_again = true;
}
CaMsgTy::EventAddRes(k) => Self::handle_event_add_res(self, k),
_ => {}
}
}
_ => {}
}
Ready(Some(Ok(())))
}
Ready(Some(Err(e))) => {
error!("CaProto yields error: {e:?}");
Ready(Some(Err(e)))
}
Ready(None) => {
warn!("CaProto is done");
self.state = CaConnState::Done;
Ready(Some(Ok(())))
}
Pending => Pending,
};
if do_wake_again {
info!("do_wake_again");
cx.waker().wake_by_ref();
}
res
}
}
impl Stream for CaConn {
@@ -252,70 +693,9 @@ impl Stream for CaConn {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.poll_count += 1;
if false && self.poll_count > 3000 {
error!("TODO CaConn reached poll_count limit");
return Ready(None);
}
loop {
while self.value_insert_futs.len() > 0 {
match self.fut_get_series.poll_next_unpin(cx) {
Pending => break,
_ => {}
}
}
while self.fut_get_series.len() > 0 {
match self.fut_get_series.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
info!("Have SeriesId {k:?}");
let cid = k.0;
let sid = k.1;
let data_type = k.2;
let data_count = k.3;
let series = match k.4 {
Existence::Created(k) => k,
Existence::Existing(k) => k,
};
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
let name = self.name_by_cid(cid).unwrap().to_string();
let msg = CaMsg {
ty: CaMsgTy::EventAdd(EventAdd {
sid,
data_type,
data_count,
subid,
}),
};
self.proto.push_out(msg);
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Created(CreatedState {
cid,
sid,
// TODO handle error better! Transition channel to Error state?
scalar_type: ScalarType::from_ca_id(data_type)?,
shape: Shape::from_ca_count(data_count)?,
ts_created: Instant::now(),
state: MonitoringState::AddingEvent(series),
});
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
let _cd = ChannelDescDecoded {
name: name.to_string(),
scalar_type,
shape,
agg_kind: netpod::AggKind::Plain,
// TODO these play no role in series id:
byte_order: netpod::ByteOrder::LE,
compression: None,
};
cx.waker().wake_by_ref();
}
Ready(Some(Err(e))) => error!("series error: {e:?}"),
Ready(None) => {}
Pending => break,
}
}
self.handle_insert_futs(cx)?;
self.handle_get_series_futs(cx)?;
break match &self.state {
CaConnState::Init => {
let msg = CaMsg { ty: CaMsgTy::Version };
@@ -329,157 +709,11 @@ impl Stream for CaConn {
self.state = CaConnState::Listen;
continue;
}
CaConnState::Listen => match self.proto.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(k) => match k {
CaItem::Empty => {
info!("CaItem::Empty");
Ready(Some(Ok(())))
}
CaItem::Msg(msg) => match msg.ty {
CaMsgTy::VersionRes(n) => {
if n < 12 || n > 13 {
error!("See some unexpected version {n} channel search may not work.");
Ready(Some(Ok(())))
} else {
info!("Received peer version {n}");
self.state = CaConnState::PeerReady;
continue;
}
}
k => {
warn!("Got some other unhandled message: {k:?}");
Ready(Some(Ok(())))
}
},
},
Err(e) => {
error!("got error item from CaProto {e:?}");
Ready(Some(Ok(())))
}
},
Ready(None) => {
warn!("CaProto is done");
self.state = CaConnState::Done;
continue;
}
Pending => Pending,
CaConnState::Listen => match self.handle_conn_listen(cx) {
Some(k) => k,
None => continue,
},
CaConnState::PeerReady => {
// TODO unify with Listen state where protocol gets polled as well.
let mut msgs_tmp = vec![];
// TODO profile, efficient enough?
let keys: Vec<u32> = self.channels.keys().map(|x| *x).collect();
for cid in keys {
match self.channels.get_mut(&cid).unwrap() {
ChannelState::Init => {
let name = self
.name_by_cid(cid)
.ok_or_else(|| Error::with_msg_no_trace("name for cid not known"));
let name = match name {
Ok(k) => k,
Err(e) => return Ready(Some(Err(e))),
};
info!("Sending CreateChan for {}", name);
let msg = CaMsg {
ty: CaMsgTy::CreateChan(CreateChan {
cid,
channel: name.into(),
}),
};
msgs_tmp.push(msg);
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Creating {
cid,
ts_beg: Instant::now(),
};
}
_ => {}
}
}
let mut do_wake_again = false;
if msgs_tmp.len() > 0 {
info!("msgs_tmp.len() {}", msgs_tmp.len());
do_wake_again = true;
}
// TODO be careful to not overload outgoing message queue.
for msg in msgs_tmp {
self.proto.push_out(msg);
}
let res = match self.proto.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
match k {
CaItem::Msg(k) => {
match k.ty {
CaMsgTy::SearchRes(k) => {
let a = k.addr.to_be_bytes();
let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port);
info!("Search result indicates server address: {addr}");
}
CaMsgTy::CreateChanRes(k) => {
// TODO handle cid-not-found which can also indicate peer error.
let cid = k.cid;
let sid = k.sid;
// TODO handle error:
let name = self.name_by_cid(cid).unwrap().to_string();
info!("CreateChanRes {name:?}");
let scalar_type = ScalarType::from_ca_id(k.data_type)?;
let shape = Shape::from_ca_count(k.data_count)?;
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Created(CreatedState {
cid,
sid,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
ts_created: Instant::now(),
state: MonitoringState::FetchSeriesId,
});
// TODO handle error in different way. Should most likely not abort.
let cd = ChannelDescDecoded {
name: name.to_string(),
scalar_type,
shape,
agg_kind: netpod::AggKind::Plain,
// TODO these play no role in series id:
byte_order: netpod::ByteOrder::LE,
compression: None,
};
let y = unsafe { &*(&self as &Self as *const CaConn) };
let fut =
y.data_store.chan_reg.get_series_id(cd).map_ok(move |series| {
(cid, k.sid, k.data_type, k.data_count, series)
});
// TODO throttle execution rate:
self.fut_get_series.push(Box::pin(fut) as _);
do_wake_again = true;
}
CaMsgTy::EventAddRes(k) => Self::handle_event_add_res(&mut self, k),
_ => {}
}
}
_ => {}
}
Ready(Some(Ok(())))
}
Ready(Some(Err(e))) => {
error!("CaProto yields error: {e:?}");
Ready(Some(Err(e)))
}
Ready(None) => {
warn!("CaProto is done");
self.state = CaConnState::Done;
Ready(Some(Ok(())))
}
Pending => Pending,
};
if do_wake_again {
info!("do_wake_again");
cx.waker().wake_by_ref();
}
res
}
CaConnState::PeerReady => self.handle_peer_ready(cx),
CaConnState::Done => Ready(None),
};
}
@@ -499,6 +733,7 @@ impl Drop for SockBox {
}
}
// TODO should be able to get away with non-atomic counters.
static BATCH_ID: AtomicUsize = AtomicUsize::new(0);
static SEARCH_ID2: AtomicUsize = AtomicUsize::new(0);
@@ -509,7 +744,6 @@ struct BatchId(u32);
struct SearchId(u32);
struct SearchBatch {
id: BatchId,
ts_beg: Instant,
tgts: VecDeque<usize>,
channels: Vec<String>,
@@ -547,7 +781,6 @@ pub struct FindIocStream {
impl FindIocStream {
pub fn new(tgts: Vec<SocketAddrV4>) -> Self {
info!("FindIocStream tgts {tgts:?}");
let sock = unsafe { Self::create_socket() }.unwrap();
let afd = AsyncFd::new(sock.0).unwrap();
Self {
@@ -669,7 +902,6 @@ impl FindIocStream {
sin_zero: [0; 8],
};
let addr_len = std::mem::size_of::<libc::sockaddr_in>();
//info!("sendto {ip:?} {} n {}", port, buf.len());
let ec = libc::sendto(
sock,
&buf[0] as *const _ as _,
@@ -756,7 +988,6 @@ impl FindIocStream {
nb.adv(hi.payload())?;
msgs.push(msg);
}
//info!("received {} msgs {:?}", msgs.len(), msgs);
let mut res = vec![];
for msg in msgs.iter() {
match &msg.ty {
@@ -793,7 +1024,6 @@ impl FindIocStream {
fn create_in_flight(&mut self) {
let bid = BATCH_ID.fetch_add(1, Ordering::AcqRel);
let bid = BatchId(bid as u32);
//info!("create_in_flight {bid:?}");
let mut sids = vec![];
let mut chs = vec![];
while chs.len() < self.channels_per_batch && self.channels_input.len() > 0 {
@@ -804,7 +1034,6 @@ impl FindIocStream {
chs.push(self.channels_input.pop_front().unwrap());
}
let batch = SearchBatch {
id: bid.clone(),
ts_beg: Instant::now(),
channels: chs,
tgts: self.tgts.iter().enumerate().map(|x| x.0).collect(),
@@ -855,7 +1084,6 @@ impl FindIocStream {
all_done = false;
}
if all_done {
//info!("all searches done for {bid:?}");
self.bids_all_done.insert(bid.clone(), ());
self.in_flight.remove(bid);
}
@@ -888,9 +1116,10 @@ impl FindIocStream {
let mut chns = vec![];
for (bid, batch) in &mut self.in_flight {
if now.duration_since(batch.ts_beg) > self.batch_run_max {
self.bids_timed_out.insert(bid.clone(), ());
for (i2, sid) in batch.sids.iter().enumerate() {
if batch.done.contains(sid) == false {
warn!("Timeout: {bid:?} {}", batch.channels[i2]);
debug!("Timeout: {bid:?} {}", batch.channels[i2]);
}
sids.push(sid.clone());
chns.push(batch.channels[i2].clone());
@@ -1014,7 +1243,6 @@ impl Stream for FindIocStream {
}
Pending => {
g.clear_ready();
//warn!("socket seemed ready for read, but is not");
continue;
}
},

View File

@@ -3,6 +3,7 @@ use err::Error;
use futures_util::{pin_mut, Stream};
use log::*;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
@@ -104,10 +105,16 @@ pub enum CaDataScalarValue {
String(String),
}
#[derive(Clone, Debug)]
pub enum CaDataArrayValue {
F32(Vec<f32>),
F64(Vec<f64>),
}
#[derive(Clone, Debug)]
pub enum CaDataValue {
Scalar(CaDataScalarValue),
Array,
Array(CaDataArrayValue),
}
impl CaScalarType {
@@ -151,18 +158,18 @@ impl CaMsgTy {
match self {
Version => 0,
VersionRes(_) => 0,
ClientName => 20,
ClientNameRes(_) => 20,
HostName => 21,
Search(_) => 6,
SearchRes(_) => 6,
CreateChan(_) => 18,
CreateChanRes(_) => 18,
AccessRightsRes(_) => 22,
EventAdd(_) => 1,
EventAddRes(_) => 1,
ReadNotify(_) => 15,
ReadNotifyRes(_) => 15,
ClientName => 0x14,
ClientNameRes(_) => 0x14,
HostName => 0x15,
Search(_) => 0x06,
SearchRes(_) => 0x06,
CreateChan(_) => 0x12,
CreateChanRes(_) => 0x12,
AccessRightsRes(_) => 0x16,
EventAdd(_) => 0x01,
EventAddRes(_) => 0x01,
ReadNotify(_) => 0x0f,
ReadNotifyRes(_) => 0x0f,
}
}
@@ -175,9 +182,9 @@ impl CaMsgTy {
match self {
Version => 0,
VersionRes(_) => 0,
ClientName => 8,
ClientName => 0x10,
ClientNameRes(x) => (x.name.len() + 1 + 7) / 8 * 8,
HostName => 8,
HostName => 0x18,
Search(x) => (x.channel.len() + 1 + 7) / 8 * 8,
SearchRes(_) => 8,
CreateChan(x) => (x.channel.len() + 1 + 7) / 8 * 8,
@@ -199,7 +206,7 @@ impl CaMsgTy {
fn data_type(&self) -> u16 {
use CaMsgTy::*;
match self {
Version => CA_PROTO_VERSION,
Version => 0,
VersionRes(n) => *n,
ClientName => 0,
ClientNameRes(_) => 0,
@@ -222,7 +229,7 @@ impl CaMsgTy {
fn data_count(&self) -> u16 {
use CaMsgTy::*;
match self {
Version => 0,
Version => CA_PROTO_VERSION,
VersionRes(_) => 0,
ClientName => 0,
ClientNameRes(_) => 0,
@@ -285,8 +292,11 @@ impl CaMsgTy {
Version => {}
VersionRes(_) => {}
ClientName => {
// TODO allow variable client name. Null-extend always to 8 byte align.
buf.copy_from_slice(b"SA10\0\0\0\0");
// TODO allow variable client name.
let s = "werder_d".as_bytes();
let n = s.len();
buf.fill(0);
buf[..n].copy_from_slice(s);
}
ClientNameRes(_) => {
error!("should not attempt to write ClientNameRes");
@@ -294,7 +304,10 @@ impl CaMsgTy {
}
HostName => {
// TODO allow variable host name. Null-extend always to 8 byte align.
buf.copy_from_slice(b"SA10\0\0\0\0");
let s = "sf-nube-11.psi.ch".as_bytes();
let n = s.len();
buf.fill(0);
buf[..n].copy_from_slice(s);
}
Search(e) => {
for x in &mut buf[..] {
@@ -346,7 +359,7 @@ impl CaMsg {
}
fn place_into(&self, buf: &mut [u8]) {
info!("place_into given {} bytes buffer", buf.len());
//info!("place_into given {} bytes buffer", buf.len());
if self.ty.payload_len() > 0x4000 - 16 {
error!("TODO emit for larger payloads");
panic!();
@@ -433,43 +446,116 @@ impl CaMsg {
}
}
1 => {
use netpod::Shape;
let ca_st = CaScalarType::from_ca_u16(hi.data_type)?;
let value = match ca_st {
CaScalarType::F64 => {
if payload.len() < 2 {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for enum {}",
payload.len()
)));
}
let v = f64::from_be_bytes(payload.try_into()?);
CaDataValue::Scalar(CaDataScalarValue::F64(v))
}
CaScalarType::Enum => {
if payload.len() < 2 {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for enum {}",
payload.len()
)));
}
let v = i16::from_be_bytes(payload[..2].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::I16(v))
}
CaScalarType::String => {
let mut ixn = payload.len();
for (i, &c) in payload.iter().enumerate() {
if c == 0 {
ixn = i;
break;
let ca_sh = Shape::from_ca_count(hi.data_count)?;
let value = match ca_sh {
Shape::Scalar => match ca_st {
CaScalarType::I32 => {
type ST = i32;
const STL: usize = std::mem::size_of::<ST>();
if payload.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for i32 {}",
payload.len()
)));
}
let v = ST::from_be_bytes(payload[..STL].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::I32(v))
}
//info!("try to read string from payload len {} ixn {}", payload.len(), ixn);
let v = String::from_utf8_lossy(&payload[..ixn]);
CaDataValue::Scalar(CaDataScalarValue::String(v.into()))
}
_ => {
warn!("TODO handle {ca_st:?}");
return Err(Error::with_msg_no_trace(format!("can not yet handle type {ca_st:?}")));
CaScalarType::F32 => {
type ST = f32;
const STL: usize = std::mem::size_of::<ST>();
if payload.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for f32 {}",
payload.len()
)));
}
let v = ST::from_be_bytes(payload[..STL].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::F32(v))
}
CaScalarType::F64 => {
type ST = f64;
const STL: usize = std::mem::size_of::<ST>();
if payload.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for f64 {}",
payload.len()
)));
}
let v = ST::from_be_bytes(payload[..STL].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::F64(v))
}
CaScalarType::Enum => {
type ST = i16;
const STL: usize = std::mem::size_of::<ST>();
if payload.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for i16 {}",
payload.len()
)));
}
let v = ST::from_be_bytes(payload[..STL].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::I16(v))
}
CaScalarType::String => {
// TODO constrain string length to the CA `data_count`.
let mut ixn = payload.len();
for (i, &c) in payload.iter().enumerate() {
if c == 0 {
ixn = i;
break;
}
}
//info!("try to read string from payload len {} ixn {}", payload.len(), ixn);
let v = String::from_utf8_lossy(&payload[..ixn]);
CaDataValue::Scalar(CaDataScalarValue::String(v.into()))
}
_ => {
warn!("TODO handle {ca_st:?}");
return Err(Error::with_msg_no_trace(format!(
"can not yet handle type scalar {ca_st:?}"
)));
}
},
Shape::Wave(n) => match ca_st {
CaScalarType::F32 => {
type ST = f32;
const STL: usize = std::mem::size_of::<ST>();
let nn = (n as usize).min(payload.len() / STL);
let mut a = Vec::with_capacity(nn);
let mut bb = &payload[..];
for _ in 0..nn {
let v = ST::from_be_bytes(bb[..STL].try_into()?);
bb = &bb[STL..];
a.push(v);
}
CaDataValue::Array(CaDataArrayValue::F32(a))
}
CaScalarType::F64 => {
type ST = f64;
const STL: usize = std::mem::size_of::<ST>();
let nn = (n as usize).min(payload.len() / STL);
let mut a = Vec::with_capacity(nn);
let mut bb = &payload[..];
for _ in 0..nn {
let v = ST::from_be_bytes(bb[..STL].try_into()?);
bb = &bb[STL..];
a.push(v);
}
CaDataValue::Array(CaDataArrayValue::F64(a))
}
_ => {
warn!("TODO handle {ca_st:?}");
return Err(Error::with_msg_no_trace(format!(
"can not yet handle type array {ca_st:?}"
)));
}
},
Shape::Image(_, _) => {
error!("Can not get Image from CA");
err::todoval()
}
};
let d = EventAddRes {
@@ -560,6 +646,7 @@ impl HeadInfo {
}
}
#[derive(Debug)]
enum CaState {
StdHead,
ExtHead(HeadInfo),
@@ -581,6 +668,7 @@ impl CaState {
pub struct CaProto {
tcp: TcpStream,
remote_addr_dbg: SocketAddrV4,
state: CaState,
buf: NetBuf,
outbuf: NetBuf,
@@ -588,9 +676,10 @@ pub struct CaProto {
}
impl CaProto {
pub fn new(tcp: TcpStream) -> Self {
pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4) -> Self {
Self {
tcp,
remote_addr_dbg,
state: CaState::StdHead,
buf: NetBuf::new(1024 * 128),
outbuf: NetBuf::new(1024 * 128),
@@ -612,7 +701,6 @@ impl CaProto {
fn out_msg_buf(&mut self) -> Option<(&CaMsg, &mut [u8])> {
if let Some(item) = self.out.front() {
info!("attempt to serialize outgoing message msg {:?}", item);
if let Ok(buf) = self.outbuf.write_buf(item.len()) {
Some((item, buf))
} else {
@@ -630,16 +718,13 @@ impl CaProto {
pin_mut!(w);
match w.poll_write(cx, b) {
Ready(k) => match k {
Ok(k) => {
info!("sent {} bytes {:?}", k, &self.outbuf.data()[..k]);
match self.outbuf.adv(k) {
Ok(()) => Ready(Ok(())),
Err(e) => {
error!("advance error {:?}", e);
Ready(Err(e))
}
Ok(k) => match self.outbuf.adv(k) {
Ok(()) => Ready(Ok(())),
Err(e) => {
error!("advance error {:?}", e);
Ready(Err(e))
}
}
},
Err(e) => {
error!("output write error {:?}", e);
Ready(Err(e.into()))
@@ -651,9 +736,6 @@ impl CaProto {
fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Option<Poll<CaItem>>, Error> {
use Poll::*;
if self.out.len() != 0 || self.outbuf.len() != 0 {
info!("loop_body out {} outbuf {}", self.out.len(), self.outbuf.len());
}
let output_res_1: Option<Poll<()>> = 'll1: loop {
if self.out.len() == 0 {
break None;
@@ -702,7 +784,12 @@ impl CaProto {
Ok(()) => {
let nf = rbuf.filled().len();
if nf == 0 {
info!("EOF");
info!(
"EOF peer {:?} {:?} {:?}",
self.tcp.peer_addr(),
self.remote_addr_dbg,
self.state
);
// TODO may need another state, if not yet done when input is EOF.
self.state = CaState::Done;
Ok(Some(Ready(CaItem::empty())))
@@ -750,8 +837,8 @@ impl CaProto {
break match &self.state {
CaState::StdHead => {
let hi = HeadInfo::from_netbuf(&mut self.buf)?;
if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 40 {
warn!("StdHead {hi:?}");
if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 2800 {
warn!("StdHead sees {hi:?}");
}
if hi.payload_size == 0xffff && hi.data_count == 0 {
self.state = CaState::ExtHead(hi);
@@ -797,17 +884,19 @@ impl Stream for CaProto {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if let CaState::Done = self.state {
return Ready(None);
} else {
loop {
break match Self::loop_body(self.as_mut(), cx) {
loop {
break if let CaState::Done = self.state {
Ready(None)
} else {
let k = Self::loop_body(self.as_mut(), cx);
match k {
Ok(Some(Ready(k))) => Ready(Some(Ok(k))),
Ok(Some(Pending)) => Pending,
Ok(None) => continue,
Err(e) => Ready(Some(Err(e))),
};
}
}
};
}
}
}

View File

@@ -5,6 +5,7 @@ use err::Error;
use scylla::prepared_statement::PreparedStatement;
use scylla::Session as ScySession;
use std::sync::Arc;
use tokio_postgres::Client as PgClient;
pub struct RegisterJob {
desc: ChannelDescDecoded,
@@ -23,42 +24,100 @@ pub struct RegisterChannel {
pub struct ChannelRegistry {
scy: Arc<ScySession>,
pg_client: Arc<PgClient>,
}
impl ChannelRegistry {
pub fn new(scy: Arc<ScySession>) -> Self {
Self { scy }
pub fn new(pg_client: Arc<PgClient>, scy: Arc<ScySession>) -> Self {
Self { pg_client, scy }
}
pub async fn get_series_id(&self, cd: ChannelDescDecoded) -> Result<Existence<SeriesId>, Error> {
crate::series::get_series_id(&self.scy, &cd).await
crate::series::get_series_id(&self.pg_client, &cd).await
}
}
pub struct DataStore {
pub scy: Arc<ScySession>,
pub qu_insert_series: Arc<PreparedStatement>,
pub qu_insert_ts_msp: Arc<PreparedStatement>,
pub qu_insert_scalar_i8: Arc<PreparedStatement>,
pub qu_insert_scalar_i16: Arc<PreparedStatement>,
pub qu_insert_scalar_i32: Arc<PreparedStatement>,
pub qu_insert_scalar_f32: Arc<PreparedStatement>,
pub qu_insert_scalar_f64: Arc<PreparedStatement>,
pub qu_insert_scalar_string: Arc<PreparedStatement>,
pub qu_insert_array_f32: Arc<PreparedStatement>,
pub qu_insert_array_f64: Arc<PreparedStatement>,
pub chan_reg: Arc<ChannelRegistry>,
}
impl DataStore {
pub async fn new(scy: Arc<ScySession>) -> Result<Self, Error> {
pub async fn new(pg_client: Arc<PgClient>, scy: Arc<ScySession>) -> Result<Self, Error> {
let q = scy
.prepare("insert into series (part, series, ts_msp, scalar_type, shape_dims) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_series = Arc::new(q);
let q = scy
.prepare("insert into ts_msp (series, ts_msp) values (?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_ts_msp = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_i8 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_i16 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_i32 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_f32 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_f64 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_string = Arc::new(q);
// array
let q = scy
.prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_f32 = Arc::new(q);
let q = scy
.prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_f64 = Arc::new(q);
let ret = Self {
chan_reg: Arc::new(ChannelRegistry::new(scy.clone())),
chan_reg: Arc::new(ChannelRegistry::new(pg_client, scy.clone())),
scy,
qu_insert_series,
qu_insert_ts_msp,
qu_insert_scalar_i8,
qu_insert_scalar_i16,
qu_insert_scalar_i32,
qu_insert_scalar_f32,
qu_insert_scalar_f64,
qu_insert_scalar_string,
qu_insert_array_f32,
qu_insert_array_f64,
};
Ok(ret)
}

View File

@@ -1,4 +1,4 @@
use crate::zmtp::ErrConv;
use crate::errconv::ErrConv;
use crate::zmtp::{CommonQueries, ZmtpFrame};
use err::Error;
use futures_core::Future;

43
netfetch/src/errconv.rs Normal file
View File

@@ -0,0 +1,43 @@
use err::Error;
use scylla::transport::errors::QueryError;
use scylla::transport::query_result::{FirstRowError, RowsExpectedError};
pub trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
}
impl<T> ErrConv<T> for Result<T, QueryError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, RowsExpectedError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, FirstRowError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, tokio_postgres::Error> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}

View File

@@ -1,8 +1,10 @@
pub mod bsread;
pub mod ca;
pub mod channelwriter;
pub mod errconv;
pub mod netbuf;
pub mod series;
pub mod store;
#[cfg(test)]
pub mod test;
pub mod zmtp;

View File

@@ -1,10 +1,11 @@
use crate::bsread::ChannelDescDecoded;
use crate::zmtp::ErrConv;
use crate::errconv::ErrConv;
use err::Error;
#[allow(unused)]
use log::*;
use scylla::Session as ScySession;
use std::time::Duration;
use tokio_postgres::Client as PgClient;
#[derive(Clone, Debug)]
pub enum Existence<T> {
@@ -12,7 +13,7 @@ pub enum Existence<T> {
Existing(T),
}
#[derive(Clone, Debug)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct SeriesId(u64);
impl SeriesId {
@@ -22,7 +23,9 @@ impl SeriesId {
}
// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration.
pub async fn get_series_id(scy: &ScySession, cd: &ChannelDescDecoded) -> Result<Existence<SeriesId>, Error> {
pub async fn get_series_id_scylla(scy: &ScySession, cd: &ChannelDescDecoded) -> Result<Existence<SeriesId>, Error> {
err::todo();
// TODO do not use, LWT in Scylla is currently buggy.
let facility = "scylla";
let channel_name = &cd.name;
let scalar_type = cd.scalar_type.to_scylla_i32();
@@ -88,3 +91,64 @@ pub async fn get_series_id(scy: &ScySession, cd: &ChannelDescDecoded) -> Result<
Ok(Existence::Existing(SeriesId(series)))
}
}
// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration.
pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Result<Existence<SeriesId>, Error> {
let facility = "scylla";
let channel_name = &cd.name;
let scalar_type = cd.scalar_type.to_scylla_i32();
let shape = cd.shape.to_scylla_vec();
let res = pg_client
.query(
"select series from series_by_channel where facility = $1 and channel = $2 and scalar_type = $3 and shape_dims = $4 and agg_kind = 0",
&[&facility, channel_name, &scalar_type, &shape],
)
.await
.err_conv()?;
let mut all = vec![];
for row in res {
let series: i64 = row.get(0);
let series = series as u64;
all.push(series);
}
let rn = all.len();
if rn == 0 {
use md5::Digest;
let mut h = md5::Md5::new();
h.update(facility.as_bytes());
h.update(channel_name.as_bytes());
h.update(format!("{:?} {:?}", scalar_type, shape).as_bytes());
let f = h.finalize();
let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
for _ in 0..2000 {
if series > i64::MAX as u64 {
series = 0;
}
let res = pg_client
.execute(
concat!(
"insert into series_by_channel",
" (series, facility, channel, scalar_type, shape_dims, agg_kind)",
" values ($1, $2, $3, $4, $5, 0)"
),
&[&(series as i64), &facility, channel_name, &scalar_type, &shape],
)
.await
.unwrap();
if res == 1 {
let series = Existence::Created(SeriesId(series));
return Ok(series);
} else {
error!("tried to insert but series exists...");
}
tokio::time::sleep(Duration::from_millis(20)).await;
series += 1;
}
Err(Error::with_msg_no_trace(format!("get_series_id can not create and insert series id {facility:?} {channel_name:?} {scalar_type:?} {shape:?}")))
} else {
let series = all[0] as u64;
let series = Existence::Existing(SeriesId(series));
debug!("get_series_id {facility:?} {channel_name:?} {scalar_type:?} {shape:?} {series:?}");
Ok(series)
}
}

0
netfetch/src/stats.rs Normal file
View File

80
netfetch/src/store.rs Normal file
View File

@@ -0,0 +1,80 @@
use crate::errconv::ErrConv;
use err::Error;
use futures_util::{Future, FutureExt};
use log::*;
use scylla::frame::value::ValueList;
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::QueryError;
use scylla::{QueryResult, Session as ScySession};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
pub struct ScyInsertFut {
#[allow(unused)]
scy: Arc<ScySession>,
#[allow(unused)]
query: Arc<PreparedStatement>,
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send>>,
polled: usize,
ts_create: Instant,
ts_poll_first: Instant,
}
impl ScyInsertFut {
const NAME: &'static str = "ScyInsertFut";
pub fn new<V>(scy: Arc<ScySession>, query: Arc<PreparedStatement>, values: V) -> Self
where
V: ValueList + Send + 'static,
{
let scy_ref: &ScySession = unsafe { &*(scy.as_ref() as &_ as *const _) };
let query_ref = unsafe { &*(query.as_ref() as &_ as *const _) };
let fut = scy_ref.execute(query_ref, values);
let fut = Box::pin(fut) as _;
let tsnow = Instant::now();
Self {
scy,
query,
fut,
polled: 0,
ts_create: tsnow,
ts_poll_first: tsnow,
}
}
}
impl Future for ScyInsertFut {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.polled == 0 {
self.ts_poll_first = Instant::now();
}
self.polled += 1;
loop {
break match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_res) => Ready(Ok(())),
Err(e) => {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_poll_first = tsnow.duration_since(self.ts_poll_first).as_secs_f32() * 1e3;
error!(
"{} polled {} dt_created {:6.2} ms dt_poll_first {:6.2} ms",
Self::NAME,
self.polled,
dt_created,
dt_poll_first
);
error!("{} done Err {:?}", Self::NAME, e);
Ready(Err(e).err_conv())
}
},
Pending => Pending,
};
}
}
}

View File

@@ -1,6 +1,7 @@
use crate::bsread::{BsreadMessage, ChannelDescDecoded, Parser};
use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB};
use crate::channelwriter::{ChannelWriter, ChannelWriterAll};
use crate::errconv::ErrConv;
use crate::netbuf::NetBuf;
use async_channel::{Receiver, Sender};
#[allow(unused)]
@@ -12,8 +13,6 @@ use log::*;
use netpod::timeunits::*;
use scylla::batch::{Batch, BatchType, Consistency};
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::QueryError;
use scylla::transport::query_result::{FirstRowError, RowsExpectedError};
use scylla::{Session as ScySession, SessionBuilder};
use serde_json::Value as JsVal;
use stats::CheckEvery;
@@ -28,37 +27,6 @@ use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
pub trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
}
impl<T> ErrConv<T> for Result<T, QueryError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, RowsExpectedError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, FirstRowError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
#[allow(unused)]
fn test_listen() -> Result<(), Error> {
use std::time::Duration;
@@ -398,6 +366,8 @@ impl BsreadClient {
let shape_dims = cd.shape.to_scylla_vec();
self.channel_writers.insert(series, Box::new(cw));
if !self.opts.skip_insert {
error!("TODO use PGSQL and existing function instead.");
err::todo();
// TODO insert correct facility name
self.scy
.query(