From 246c32a1c422b8b11312dbd505dab91cc36596c0 Mon Sep 17 00:00:00 2001
From: Dominik Werder
Date: Mon, 28 Jun 2021 16:25:27 +0200
Subject: [PATCH] Deployed on proscan, update docs
---
dbconn/src/scan.rs | 191 ++++++++++++++----------
h5out/Cargo.toml | 8 +
h5out/src/lib.rs | 196 +++++++++++++++++++++++++
httpret/src/lib.rs | 33 +++--
httpret/static/documentation/api4.html | 78 +++++-----
netfetch/src/lib.rs | 58 +-------
netfetch/src/test.rs | 56 +++++++
netfetch/src/zmtp.rs | 70 +++++++++
taskrun/src/lib.rs | 13 +-
9 files changed, 527 insertions(+), 176 deletions(-)
create mode 100644 h5out/Cargo.toml
create mode 100644 h5out/src/lib.rs
create mode 100644 netfetch/src/test.rs
create mode 100644 netfetch/src/zmtp.rs
diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs
index 58cfb7e..c3dc20e 100644
--- a/dbconn/src/scan.rs
+++ b/dbconn/src/scan.rs
@@ -326,8 +326,11 @@ pub async fn update_db_with_channel_names(
db_config: &Database,
) -> Result>, Error> {
let (tx, rx) = bounded(16);
+ let tx2 = tx.clone();
let db_config = db_config.clone();
- tokio::spawn(async move {
+ let block1 = async move {
+ //return Err(Error::with_msg("some test error1"));
+ //tx.send(Err(Error::with_msg("some test error2"))).await?;
let dbc = crate::create_connection(&db_config).await?;
let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?;
let c1 = Arc::new(RwLock::new(0u32));
@@ -374,7 +377,19 @@ pub async fn update_db_with_channel_names(
};
tx.send(Ok(ret)).await?;
Ok::<_, Error>(())
- });
+ };
+ let block2 = async move {
+ match block1.await {
+ Ok(_) => {}
+ Err(e) => match tx2.send(Err(e)).await {
+ Ok(_) => {}
+ Err(e) => {
+ error!("can not report error through channel: {:?}", e);
+ }
+ },
+ }
+ };
+ tokio::spawn(block2);
Ok(rx)
}
@@ -410,84 +425,95 @@ pub async fn update_db_with_all_channel_configs(
let (tx, rx) = bounded(16);
let tx = Arc::new(tx);
let tx2 = tx.clone();
- tokio::spawn(
- async move {
- let node_config = &node_config;
- let dbc = crate::create_connection(&node_config.node_config.cluster.database).await?;
- let dbc = Arc::new(dbc);
- let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?;
- let rows = dbc
- .query(
- "select rowid, facility, name from channels where facility = $1 order by facility, name",
- &[&node_disk_ident.facility],
- )
- .await?;
- let mut c1 = 0;
- dbc.query("begin", &[]).await?;
- let mut count_inserted = 0;
- let mut count_updated = 0;
- for row in rows {
- let rowid: i64 = row.try_get(0)?;
- let _facility: i64 = row.try_get(1)?;
- let channel: String = row.try_get(2)?;
- match update_db_with_channel_config(
- node_config,
- node_disk_ident,
- rowid,
- &channel,
- dbc.clone(),
- &mut count_inserted,
- &mut count_updated,
- )
- .await
- {
- /*Err(Error::ChannelConfigdirNotFound { .. }) => {
- warn!("can not find channel config {}", channel);
- crate::delay_io_medium().await;
- }*/
- Err(e) => {
- error!("{:?}", e);
- crate::delay_io_medium().await;
- }
- _ => {
- c1 += 1;
- if c1 % 200 == 0 {
- dbc.query("commit", &[]).await?;
- let msg = format!(
- "channel no {:6} inserted {:6} updated {:6}",
- c1, count_inserted, count_updated
- );
- let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
- tx.send(Ok(ret)).await?;
- dbc.query("begin", &[]).await?;
- }
- crate::delay_io_short().await;
+ let tx3 = tx.clone();
+ let block1 = async move {
+ let node_config = &node_config;
+ let dbc = crate::create_connection(&node_config.node_config.cluster.database).await?;
+ let dbc = Arc::new(dbc);
+ let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?;
+ let rows = dbc
+ .query(
+ "select rowid, facility, name from channels where facility = $1 order by facility, name",
+ &[&node_disk_ident.facility],
+ )
+ .await?;
+ let mut c1 = 0;
+ dbc.query("begin", &[]).await?;
+ let mut count_inserted = 0;
+ let mut count_updated = 0;
+ for row in rows {
+ let rowid: i64 = row.try_get(0)?;
+ let _facility: i64 = row.try_get(1)?;
+ let channel: String = row.try_get(2)?;
+ match update_db_with_channel_config(
+ node_config,
+ node_disk_ident,
+ rowid,
+ &channel,
+ dbc.clone(),
+ &mut count_inserted,
+ &mut count_updated,
+ )
+ .await
+ {
+ Err(e) => {
+ error!("{:?}", e);
+ crate::delay_io_medium().await;
+ }
+ Ok(UpdateChannelConfigResult::NotFound) => {
+ warn!("can not find channel config {}", channel);
+ crate::delay_io_medium().await;
+ }
+ Ok(UpdateChannelConfigResult::Done) => {
+ c1 += 1;
+ if c1 % 200 == 0 {
+ dbc.query("commit", &[]).await?;
+ let msg = format!(
+ "channel no {:6} inserted {:6} updated {:6}",
+ c1, count_inserted, count_updated
+ );
+ let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
+ tx.send(Ok(ret)).await?;
+ dbc.query("begin", &[]).await?;
}
+ crate::delay_io_short().await;
+ }
+ }
+ }
+ dbc.query("commit", &[]).await?;
+ let msg = format!(
+ "ALL DONE channel no {:6} inserted {:6} updated {:6}",
+ c1, count_inserted, count_updated
+ );
+ let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
+ tx.send(Ok(ret)).await?;
+ Ok::<_, Error>(())
+ }
+ .then({
+ |item| async move {
+ match item {
+ Ok(_) => {}
+ Err(e) => {
+ let msg = format!("Seeing error: {:?}", e);
+ let ret = UpdatedDbWithAllChannelConfigs { msg, count: 0 };
+ tx2.send(Ok(ret)).await?;
}
}
- dbc.query("commit", &[]).await?;
- let msg = format!(
- "ALL DONE channel no {:6} inserted {:6} updated {:6}",
- c1, count_inserted, count_updated
- );
- let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
- tx.send(Ok(ret)).await?;
Ok::<_, Error>(())
}
- .then({
- |item| async move {
- match item {
- Ok(_) => {}
- Err(e) => {
- let msg = format!("Seeing error: {:?}", e);
- let ret = UpdatedDbWithAllChannelConfigs { msg, count: 0 };
- tx2.send(Ok(ret)).await?;
- }
+ });
+ let block2 = async move {
+ match block1.await {
+ Ok(_) => {}
+ Err(e) => match tx3.send(Err(e)).await {
+ Ok(_) => {}
+ Err(e) => {
+ error!("can not deliver error through channel: {:?}", e);
}
- Ok::<_, Error>(())
- }
- }),
- );
+ },
+ }
+ };
+ tokio::spawn(block2);
Ok(rx)
}
@@ -497,6 +523,11 @@ pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result<(), E
Ok(())
}
+pub enum UpdateChannelConfigResult {
+ NotFound,
+ Done,
+}
+
/**
Parse the config of the given channel and update database.
*/
@@ -508,7 +539,7 @@ pub async fn update_db_with_channel_config(
dbc: Arc,
count_inserted: &mut usize,
count_updated: &mut usize,
-) -> Result<(), Error> {
+) -> Result {
let path = node_config
.node
.data_base_path
@@ -516,7 +547,11 @@ pub async fn update_db_with_channel_config(
.join(channel)
.join("latest")
.join("00000_Config");
- let meta = tokio::fs::metadata(&path).await?;
+ let meta = if let Ok(k) = tokio::fs::metadata(&path).await {
+ k
+ } else {
+ return Ok(UpdateChannelConfigResult::NotFound);
+ };
if meta.len() > 8 * 1024 * 1024 {
return Err(Error::with_msg("meta data too long"));
}
@@ -548,7 +583,7 @@ pub async fn update_db_with_channel_config(
};
if do_parse {
let buf = tokio::fs::read(&path).await?;
- let config = parse::channelconfig::parse_config(&buf)?;
+ let config = parse::channelconfig::parse_config(&buf)?.1;
match config_id {
None => {
dbc.query(
@@ -573,7 +608,7 @@ pub async fn update_db_with_channel_config(
}
}
}
- Ok(())
+ Ok(UpdateChannelConfigResult::Done)
}
pub async fn update_db_with_all_channel_datafiles(
diff --git a/h5out/Cargo.toml b/h5out/Cargo.toml
new file mode 100644
index 0000000..34b9ac4
--- /dev/null
+++ b/h5out/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "h5out"
+version = "0.0.1-a.0"
+authors = ["Dominik Werder "]
+edition = "2018"
+
+[dependencies]
+#serde = { version = "1.0", features = ["derive"] }
diff --git a/h5out/src/lib.rs b/h5out/src/lib.rs
new file mode 100644
index 0000000..8daf193
--- /dev/null
+++ b/h5out/src/lib.rs
@@ -0,0 +1,196 @@
+use io::{Cursor, Write};
+use std::fs::OpenOptions;
+use std::io;
+use std::io::SeekFrom;
+
+#[derive(Debug)]
+struct Error {}
+
+impl From for Error {
+ fn from(_k: io::Error) -> Self {
+ Self {}
+ }
+}
+
+struct Out {
+ cur: io::Cursor>,
+}
+
+impl Out {
+ fn new() -> Self {
+ Self {
+ cur: Cursor::new(vec![]),
+ }
+ }
+
+ fn write_u8(&mut self, k: u8) -> io::Result {
+ self.write(&k.to_le_bytes())
+ }
+
+ fn write_u16(&mut self, k: u16) -> io::Result {
+ self.write(&k.to_le_bytes())
+ }
+
+ fn write_u32(&mut self, k: u32) -> io::Result {
+ self.write(&k.to_le_bytes())
+ }
+
+ fn write_u64(&mut self, k: u64) -> io::Result {
+ self.write(&k.to_le_bytes())
+ }
+}
+
+impl io::Seek for Out {
+ fn seek(&mut self, pos: SeekFrom) -> io::Result {
+ self.cur.seek(pos)
+ }
+}
+
+impl io::Write for Out {
+ fn write(&mut self, buf: &[u8]) -> io::Result {
+ self.cur.write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.cur.flush()
+ }
+}
+
+#[test]
+fn emit() {
+ write_h5().unwrap();
+}
+
+fn write_h5() -> Result<(), Error> {
+ let mut out = Out::new();
+ write_superblock(&mut out)?;
+ write_local_heap(&mut out)?;
+ write_root_object_header(&mut out)?;
+ write_file(&out)?;
+ let mut child = std::process::Command::new("h5debug").arg("f.h5").spawn()?;
+ child.wait()?;
+ Ok(())
+}
+
+fn write_file(out: &Out) -> Result<(), Error> {
+ eprintln!("Write {} bytes", out.cur.get_ref().len());
+ let mut f = OpenOptions::new()
+ .write(true)
+ .truncate(true)
+ .create(true)
+ .open("f.h5")?;
+ f.write(out.cur.get_ref())?;
+ Ok(())
+}
+
+fn write_padding(out: &mut Out) -> Result<(), Error> {
+ let n = out.cur.get_ref().len();
+ let m = n % 8;
+ if m != 0 {
+ for _ in 0..(8 - m) {
+ out.write_u8(0)?;
+ }
+ }
+ Ok(())
+}
+
+fn write_superblock(out: &mut Out) -> Result<(), Error> {
+ let super_ver = 0;
+ let free_ver = 0;
+ let root_group_ver = 0;
+ let shared_header_ver = 0;
+ let group_leaf_k = 4;
+ let group_int_k = 16;
+ let base_addr = 0;
+ let free_index_addr = u64::MAX;
+ let eof = 4242;
+ write_padding(out)?;
+ out.write(b"\x89HDF\r\n\x1a\n")?;
+ out.write_u8(super_ver)?;
+ out.write_u8(free_ver)?;
+ out.write_u8(root_group_ver)?;
+ out.write_u8(0)?;
+ out.write_u8(shared_header_ver)?;
+ out.write_u8(8)?;
+ out.write_u8(8)?;
+ out.write_u8(0)?;
+ out.write_u16(group_leaf_k)?;
+ out.write_u16(group_int_k)?;
+ let consistency = 0;
+ out.write_u32(consistency)?;
+ out.write_u64(base_addr)?;
+ out.write_u64(free_index_addr)?;
+ out.write_u64(eof)?;
+ let driver = u64::MAX;
+ out.write_u64(driver)?;
+ // root sym tab entry:
+ {
+ let link_name_off = 0;
+ let obj_header_addr = 1152;
+ let cache_type = 1;
+ out.write_u64(link_name_off)?;
+ out.write_u64(obj_header_addr)?;
+ out.write_u32(cache_type)?;
+ // reserved:
+ out.write_u32(0)?;
+ // scratch pad 16 bytes:
+ out.write_u64(0)?;
+ out.write_u64(0)?;
+ }
+ Ok(())
+}
+
+fn write_root_object_header(out: &mut Out) -> Result<(), Error> {
+ write_padding(out)?;
+ let pos0 = out.cur.get_ref().len() as u64;
+ eprintln!("write_root_object_header start at pos0 {}", pos0);
+ let ver = 1;
+ let nmsg = 1;
+ let hard_link_count = 1;
+ let header_msgs_data_len = 0;
+ out.write_u8(ver)?;
+ out.write_u8(0)?;
+ out.write_u16(nmsg)?;
+ out.write_u32(hard_link_count)?;
+ out.write_u32(header_msgs_data_len)?;
+ out.write_u32(0)?;
+ {
+ // Group Info
+ let msg_type = 0xa;
+ let msg_len = 128;
+ let flags = 0;
+ out.write_u32(msg_type)?;
+ out.write_u32(msg_len)?;
+ out.write_u8(flags)?;
+ out.write_u8(0)?;
+ out.write_u8(0)?;
+ out.write_u8(0)?;
+ }
+ Ok(())
+}
+
+fn write_local_heap(out: &mut Out) -> Result<(), Error> {
+ write_padding(out)?;
+ let pos0 = out.cur.get_ref().len() as u64;
+ eprintln!("write_local_heap start at pos0 {}", pos0);
+ let ver = 0;
+ let seg_size = 1024;
+ let free_list_off = u64::MAX;
+ let seg_addr = pos0 + 32;
+ out.write(b"HEAP")?;
+ out.write_u8(ver)?;
+ out.write_u8(0)?;
+ out.write_u8(0)?;
+ out.write_u8(0)?;
+ out.write_u64(seg_size)?;
+ out.write_u64(free_list_off)?;
+ out.write_u64(seg_addr)?;
+ out.write(&[0; 1024])?;
+ {
+ let h = out.cur.position();
+ out.cur.set_position(h - 1024);
+ out.write(b"somename")?;
+ out.cur.set_position(h);
+ }
+ Ok(())
+}
diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs
index 90d7917..ba9c748 100644
--- a/httpret/src/lib.rs
+++ b/httpret/src/lib.rs
@@ -513,17 +513,28 @@ pub async fn update_db_with_channel_names(
};
let res =
dbconn::scan::update_db_with_channel_names(node_config.clone(), &node_config.node_config.cluster.database)
- .await?;
- let ret = response(StatusCode::OK)
- .header(http::header::CONTENT_TYPE, APP_JSON_LINES)
- .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
- Ok(mut item) => {
- item.push('\n');
- Ok(item)
- }
- Err(e) => Err(e),
- })))?;
- Ok(ret)
+ .await;
+ match res {
+ Ok(res) => {
+ let ret = response(StatusCode::OK)
+ .header(http::header::CONTENT_TYPE, APP_JSON_LINES)
+ .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
+ Ok(mut item) => {
+ item.push('\n');
+ Ok(item)
+ }
+ Err(e) => Err(e),
+ })))?;
+ Ok(ret)
+ }
+ Err(e) => {
+ let p = serde_json::to_string(&e)?;
+ let res = response(StatusCode::OK)
+ .header(http::header::CONTENT_TYPE, APP_JSON_LINES)
+ .body(Body::from(p))?;
+ Ok(res)
+ }
+ }
}
pub async fn update_db_with_channel_names_3(
diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html
index 0389819..d709589 100644
--- a/httpret/static/documentation/api4.html
+++ b/httpret/static/documentation/api4.html
@@ -14,15 +14,7 @@
Databuffer API 4 Documentation
Documented here are the endpoints for databuffer API 4. The endpoints of the "original" unversioned API is documented at
-this location .
-
-Available backends
-Currently available:
-
- sf-databuffer
- hipa-archive
- gls-archive
-
+ this location .
Timestamp format
@@ -39,14 +31,15 @@ Currently available:
Formally: tsAbsolute = tsAnchor * 109 + tsOffMs * 106 + tsOffNs
Two reasons lead to this choice of timestamp format:
- Javascript can not represent the full nanosecond-resolution timestamps in a single numeric variable.
- The lowest 6 digits of the nanosecond timestamp are anyway abused by the timing system to emit a pulse-id.
+ Javascript can not represent the full nanosecond-resolution timestamps in a single numeric variable.
+ The lowest 6 digits of the nanosecond timestamp are anyway abused by the timing system to emit a pulse-id.
API functions
Currently available functionality:
+curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/backends'
+
+Example response
+{
+ "backends": [
+ "sf-databuffer",
+ "hipa-archive",
+ "gls-archive",
+ "proscan-archive"
+ ]
+}
+
+
+
Search channel
Method: GET
@@ -72,8 +86,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel
Example response:
-
-{
+{
"channels": [
{
"name": "S10MA01-DBPM120:Y2",
@@ -94,8 +107,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel
"description": ""
}
]
-}
-
+}
The search constraints are AND'd.
@@ -152,7 +164,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channel
Finalised range
If the server can determine that no more data will be added to the requested time range
-then it will add the flag finalisedRange: true to the response.
+ then it will add the flag finalisedRange: true to the response.
@@ -163,17 +175,17 @@ then it will add the flag finalisedRange: true to the response.
URL: https://data-api.psi.ch/api/4/binned
Query parameters:
- channelBackend (e.g. "sf-databuffer")
- channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")
- begDate (e.g. "2021-05-26T07:10:00.000Z")
- endDate (e.g. "2021-05-26T07:16:00.000Z")
- binCount (number of requested bins in time-dimension, e.g. "6")
- binningScheme (optional)
-
- if not specified: waveform gets first binned to a scalar.
- "binningScheme=binnedX&binnedXcount=13": waveform gets first binned to 13 bins in X-dimension (waveform-dimension).
- "binningScheme=binnedX&binnedXcount=0": waveform is not binned in X-dimension but kept at full length.
-
+ channelBackend (e.g. "sf-databuffer")
+ channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")
+ begDate (e.g. "2021-05-26T07:10:00.000Z")
+ endDate (e.g. "2021-05-26T07:16:00.000Z")
+ binCount (number of requested bins in time-dimension, e.g. "6")
+ binningScheme (optional)
+
+ if not specified: waveform gets first binned to a scalar.
+ "binningScheme=binnedX&binnedXcount=13": waveform gets first binned to 13 bins in X-dimension (waveform-dimension).
+ "binningScheme=binnedX&binnedXcount=0": waveform is not binned in X-dimension but kept at full length.
+
Request header: "Accept" must be "application/json"
@@ -185,11 +197,11 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel
Partial result
If the requested range takes longer time to retrieve, then a partial result with at least one bin is returned.
-The partial result will contain the necessary information to send another request with a range that
-starts with the first not-yet-retrieved bin.
-This information is provided by the continueAt and missingBins fields.
-This enables the user agent to start the presentation to the user while updating the user interface
-as new bins are received.
+ The partial result will contain the necessary information to send another request with a range that
+ starts with the first not-yet-retrieved bin.
+ This information is provided by the continueAt and missingBins fields.
+ This enables the user agent to start the presentation to the user while updating the user interface
+ as new bins are received.
Example response (without usage of binningScheme):
{
@@ -354,7 +366,7 @@ as new bins are received.
Finalised range
If the server can determine that no more data will be added to the requested time range
-then it will add the flag finalisedRange: true to the response.
+ then it will add the flag finalisedRange: true to the response.
diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs
index 8fcc6e1..5e3e952 100644
--- a/netfetch/src/lib.rs
+++ b/netfetch/src/lib.rs
@@ -1,12 +1,17 @@
use async_channel::{bounded, Receiver};
use bytes::{BufMut, BytesMut};
-use err::Error;
use futures_util::FutureExt;
-use netpod::NodeConfigCached;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use err::Error;
+use netpod::NodeConfigCached;
+
+#[cfg(test)]
+pub mod test;
+pub mod zmtp;
+
#[derive(Debug, Serialize, Deserialize)]
pub struct Message {
cmd: u16,
@@ -21,55 +26,6 @@ pub enum FetchItem {
Message(Message),
}
-#[cfg(test)]
-mod test {
- use futures_util::StreamExt;
- use netpod::log::*;
- use netpod::{Cluster, Database, Node, NodeConfig, NodeConfigCached};
- use std::collections::BTreeMap;
- use std::iter::FromIterator;
-
- #[test]
- fn ca_connect_1() {
- taskrun::run(async {
- let it = vec![(String::new(), String::new())].into_iter();
- let pairs = BTreeMap::from_iter(it);
- let node_config = NodeConfigCached {
- node: Node {
- host: "".into(),
- bin_grain_kind: 0,
- port: 123,
- port_raw: 123,
- backend: "".into(),
- split: 0,
- data_base_path: "".into(),
- listen: "".into(),
- ksprefix: "".into(),
- },
- node_config: NodeConfig {
- name: "".into(),
- cluster: Cluster {
- nodes: vec![],
- database: Database {
- host: "".into(),
- name: "".into(),
- user: "".into(),
- pass: "".into(),
- },
- },
- },
- ix: 0,
- };
- let mut rx = super::ca_connect_1(pairs, &node_config).await?;
- while let Some(item) = rx.next().await {
- info!("got next: {:?}", item);
- }
- Ok(())
- })
- .unwrap();
- }
-}
-
pub async fn ca_connect_1(
_pairs: BTreeMap,
_node_config: &NodeConfigCached,
diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs
new file mode 100644
index 0000000..7a8198c
--- /dev/null
+++ b/netfetch/src/test.rs
@@ -0,0 +1,56 @@
+use futures_util::StreamExt;
+use netpod::log::*;
+use netpod::{Cluster, Database, Node, NodeConfig, NodeConfigCached};
+use std::collections::BTreeMap;
+use std::iter::FromIterator;
+
+#[test]
+fn ca_connect_1() {
+ taskrun::run(async {
+ let it = vec![(String::new(), String::new())].into_iter();
+ let pairs = BTreeMap::from_iter(it);
+ let node_config = NodeConfigCached {
+ node: Node {
+ host: "".into(),
+ bin_grain_kind: 0,
+ port: 123,
+ port_raw: 123,
+ backend: "".into(),
+ split: 0,
+ data_base_path: "".into(),
+ listen: "".into(),
+ ksprefix: "".into(),
+ },
+ node_config: NodeConfig {
+ name: "".into(),
+ cluster: Cluster {
+ nodes: vec![],
+ database: Database {
+ host: "".into(),
+ name: "".into(),
+ user: "".into(),
+ pass: "".into(),
+ },
+ },
+ },
+ ix: 0,
+ };
+ let mut rx = super::ca_connect_1(pairs, &node_config).await?;
+ while let Some(item) = rx.next().await {
+ info!("got next: {:?}", item);
+ }
+ Ok(())
+ })
+ .unwrap();
+}
+
+#[test]
+fn zmtp_00() {
+ taskrun::run(async {
+ let it = vec![(String::new(), String::new())].into_iter();
+ let _pairs = BTreeMap::from_iter(it);
+ crate::zmtp::zmtp_00().await?;
+ Ok(())
+ })
+ .unwrap();
+}
diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs
new file mode 100644
index 0000000..8ba9785
--- /dev/null
+++ b/netfetch/src/zmtp.rs
@@ -0,0 +1,70 @@
+use err::Error;
+use futures_core::Stream;
+use futures_util::{pin_mut, StreamExt};
+use netpod::log::*;
+use std::mem;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncRead, ReadBuf};
+use tokio::net::TcpStream;
+
+pub async fn zmtp_00() -> Result<(), Error> {
+ // PV:BSREADCONFIG
+ let addr = "S10-CPPM-MOT0991:9999";
+ let conn = tokio::net::TcpStream::connect(addr).await?;
+ let mut zmtp = Zmtp::new(conn);
+ while let Some(ev) = zmtp.next().await {
+ info!("got zmtp event: {:?}", ev);
+ }
+ Ok(())
+}
+
+enum ConnState {
+ Init,
+}
+
+struct Zmtp {
+ conn: TcpStream,
+ conn_state: ConnState,
+ buf1: Vec,
+ need_min: usize,
+}
+
+impl Zmtp {
+ fn new(conn: TcpStream) -> Self {
+ Self {
+ conn,
+ conn_state: ConnState::Init,
+ buf1: vec![0; 1024],
+ need_min: 4,
+ }
+ }
+}
+
+#[derive(Debug)]
+struct ZmtpEvent {}
+
+impl Stream for Zmtp {
+ type Item = Result;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> {
+ use Poll::*;
+ loop {
+ break if let ConnState::Init = self.conn_state {
+ // can it be that we already have enough bytes received in the buffer?
+ let mut buf1 = mem::replace(&mut self.buf1, vec![]);
+ let mut rbuf = ReadBuf::new(&mut buf1);
+ let w = &mut self.conn;
+ pin_mut!(w);
+ let m1 = w.poll_read(cx, &mut rbuf);
+ self.buf1 = buf1;
+ match m1 {
+ Ready(item) => Pending,
+ Pending => Pending,
+ }
+ } else {
+ Pending
+ };
+ }
+ }
+}
diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs
index 3176708..9818f35 100644
--- a/taskrun/src/lib.rs
+++ b/taskrun/src/lib.rs
@@ -1,10 +1,17 @@
-use err::Error;
use std::future::Future;
use std::panic;
use std::sync::Mutex;
+
use tokio::task::JoinHandle;
-#[allow(unused_imports)]
-use tracing::{debug, error, info, trace, warn};
+
+use err::Error;
+
+use crate::log::*;
+
+pub mod log {
+ #[allow(unused_imports)]
+ pub use tracing::{debug, error, info, trace, warn};
+}
pub fn run>>(f: F) -> Result {
tracing_init();