Deployed on proscan, update docs

This commit is contained in:
Dominik Werder
2021-06-28 16:25:27 +02:00
parent 4b2048c103
commit 246c32a1c4
9 changed files with 527 additions and 176 deletions

View File

@@ -326,8 +326,11 @@ pub async fn update_db_with_channel_names(
db_config: &Database,
) -> Result<Receiver<Result<UpdatedDbWithChannelNames, Error>>, Error> {
let (tx, rx) = bounded(16);
let tx2 = tx.clone();
let db_config = db_config.clone();
tokio::spawn(async move {
let block1 = async move {
//return Err(Error::with_msg("some test error1"));
//tx.send(Err(Error::with_msg("some test error2"))).await?;
let dbc = crate::create_connection(&db_config).await?;
let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?;
let c1 = Arc::new(RwLock::new(0u32));
@@ -374,7 +377,19 @@ pub async fn update_db_with_channel_names(
};
tx.send(Ok(ret)).await?;
Ok::<_, Error>(())
});
};
let block2 = async move {
match block1.await {
Ok(_) => {}
Err(e) => match tx2.send(Err(e)).await {
Ok(_) => {}
Err(e) => {
error!("can not report error through channel: {:?}", e);
}
},
}
};
tokio::spawn(block2);
Ok(rx)
}
@@ -410,84 +425,95 @@ pub async fn update_db_with_all_channel_configs(
let (tx, rx) = bounded(16);
let tx = Arc::new(tx);
let tx2 = tx.clone();
tokio::spawn(
async move {
let node_config = &node_config;
let dbc = crate::create_connection(&node_config.node_config.cluster.database).await?;
let dbc = Arc::new(dbc);
let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?;
let rows = dbc
.query(
"select rowid, facility, name from channels where facility = $1 order by facility, name",
&[&node_disk_ident.facility],
)
.await?;
let mut c1 = 0;
dbc.query("begin", &[]).await?;
let mut count_inserted = 0;
let mut count_updated = 0;
for row in rows {
let rowid: i64 = row.try_get(0)?;
let _facility: i64 = row.try_get(1)?;
let channel: String = row.try_get(2)?;
match update_db_with_channel_config(
node_config,
node_disk_ident,
rowid,
&channel,
dbc.clone(),
&mut count_inserted,
&mut count_updated,
)
.await
{
/*Err(Error::ChannelConfigdirNotFound { .. }) => {
warn!("can not find channel config {}", channel);
crate::delay_io_medium().await;
}*/
Err(e) => {
error!("{:?}", e);
crate::delay_io_medium().await;
}
_ => {
c1 += 1;
if c1 % 200 == 0 {
dbc.query("commit", &[]).await?;
let msg = format!(
"channel no {:6} inserted {:6} updated {:6}",
c1, count_inserted, count_updated
);
let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
tx.send(Ok(ret)).await?;
dbc.query("begin", &[]).await?;
}
crate::delay_io_short().await;
let tx3 = tx.clone();
let block1 = async move {
let node_config = &node_config;
let dbc = crate::create_connection(&node_config.node_config.cluster.database).await?;
let dbc = Arc::new(dbc);
let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?;
let rows = dbc
.query(
"select rowid, facility, name from channels where facility = $1 order by facility, name",
&[&node_disk_ident.facility],
)
.await?;
let mut c1 = 0;
dbc.query("begin", &[]).await?;
let mut count_inserted = 0;
let mut count_updated = 0;
for row in rows {
let rowid: i64 = row.try_get(0)?;
let _facility: i64 = row.try_get(1)?;
let channel: String = row.try_get(2)?;
match update_db_with_channel_config(
node_config,
node_disk_ident,
rowid,
&channel,
dbc.clone(),
&mut count_inserted,
&mut count_updated,
)
.await
{
Err(e) => {
error!("{:?}", e);
crate::delay_io_medium().await;
}
Ok(UpdateChannelConfigResult::NotFound) => {
warn!("can not find channel config {}", channel);
crate::delay_io_medium().await;
}
Ok(UpdateChannelConfigResult::Done) => {
c1 += 1;
if c1 % 200 == 0 {
dbc.query("commit", &[]).await?;
let msg = format!(
"channel no {:6} inserted {:6} updated {:6}",
c1, count_inserted, count_updated
);
let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
tx.send(Ok(ret)).await?;
dbc.query("begin", &[]).await?;
}
crate::delay_io_short().await;
}
}
}
dbc.query("commit", &[]).await?;
let msg = format!(
"ALL DONE channel no {:6} inserted {:6} updated {:6}",
c1, count_inserted, count_updated
);
let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
tx.send(Ok(ret)).await?;
Ok::<_, Error>(())
}
.then({
|item| async move {
match item {
Ok(_) => {}
Err(e) => {
let msg = format!("Seeing error: {:?}", e);
let ret = UpdatedDbWithAllChannelConfigs { msg, count: 0 };
tx2.send(Ok(ret)).await?;
}
}
dbc.query("commit", &[]).await?;
let msg = format!(
"ALL DONE channel no {:6} inserted {:6} updated {:6}",
c1, count_inserted, count_updated
);
let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
tx.send(Ok(ret)).await?;
Ok::<_, Error>(())
}
.then({
|item| async move {
match item {
Ok(_) => {}
Err(e) => {
let msg = format!("Seeing error: {:?}", e);
let ret = UpdatedDbWithAllChannelConfigs { msg, count: 0 };
tx2.send(Ok(ret)).await?;
}
});
let block2 = async move {
match block1.await {
Ok(_) => {}
Err(e) => match tx3.send(Err(e)).await {
Ok(_) => {}
Err(e) => {
error!("can not deliver error through channel: {:?}", e);
}
Ok::<_, Error>(())
}
}),
);
},
}
};
tokio::spawn(block2);
Ok(rx)
}
@@ -497,6 +523,11 @@ pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result<(), E
Ok(())
}
pub enum UpdateChannelConfigResult {
NotFound,
Done,
}
/**
Parse the config of the given channel and update database.
*/
@@ -508,7 +539,7 @@ pub async fn update_db_with_channel_config(
dbc: Arc<Client>,
count_inserted: &mut usize,
count_updated: &mut usize,
) -> Result<(), Error> {
) -> Result<UpdateChannelConfigResult, Error> {
let path = node_config
.node
.data_base_path
@@ -516,7 +547,11 @@ pub async fn update_db_with_channel_config(
.join(channel)
.join("latest")
.join("00000_Config");
let meta = tokio::fs::metadata(&path).await?;
let meta = if let Ok(k) = tokio::fs::metadata(&path).await {
k
} else {
return Ok(UpdateChannelConfigResult::NotFound);
};
if meta.len() > 8 * 1024 * 1024 {
return Err(Error::with_msg("meta data too long"));
}
@@ -548,7 +583,7 @@ pub async fn update_db_with_channel_config(
};
if do_parse {
let buf = tokio::fs::read(&path).await?;
let config = parse::channelconfig::parse_config(&buf)?;
let config = parse::channelconfig::parse_config(&buf)?.1;
match config_id {
None => {
dbc.query(
@@ -573,7 +608,7 @@ pub async fn update_db_with_channel_config(
}
}
}
Ok(())
Ok(UpdateChannelConfigResult::Done)
}
pub async fn update_db_with_all_channel_datafiles(

8
h5out/Cargo.toml Normal file
View File

@@ -0,0 +1,8 @@
[package]
name = "h5out"
version = "0.0.1-a.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2018"
[dependencies]
#serde = { version = "1.0", features = ["derive"] }

196
h5out/src/lib.rs Normal file
View File

@@ -0,0 +1,196 @@
use io::{Cursor, Write};
use std::fs::OpenOptions;
use std::io;
use std::io::SeekFrom;
#[derive(Debug)]
struct Error {}
impl From<io::Error> for Error {
fn from(_k: io::Error) -> Self {
Self {}
}
}
struct Out {
cur: io::Cursor<Vec<u8>>,
}
impl Out {
fn new() -> Self {
Self {
cur: Cursor::new(vec![]),
}
}
fn write_u8(&mut self, k: u8) -> io::Result<usize> {
self.write(&k.to_le_bytes())
}
fn write_u16(&mut self, k: u16) -> io::Result<usize> {
self.write(&k.to_le_bytes())
}
fn write_u32(&mut self, k: u32) -> io::Result<usize> {
self.write(&k.to_le_bytes())
}
fn write_u64(&mut self, k: u64) -> io::Result<usize> {
self.write(&k.to_le_bytes())
}
}
impl io::Seek for Out {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.cur.seek(pos)
}
}
impl io::Write for Out {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.cur.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.cur.flush()
}
}
#[test]
fn emit() {
write_h5().unwrap();
}
fn write_h5() -> Result<(), Error> {
let mut out = Out::new();
write_superblock(&mut out)?;
write_local_heap(&mut out)?;
write_root_object_header(&mut out)?;
write_file(&out)?;
let mut child = std::process::Command::new("h5debug").arg("f.h5").spawn()?;
child.wait()?;
Ok(())
}
fn write_file(out: &Out) -> Result<(), Error> {
eprintln!("Write {} bytes", out.cur.get_ref().len());
let mut f = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open("f.h5")?;
f.write(out.cur.get_ref())?;
Ok(())
}
fn write_padding(out: &mut Out) -> Result<(), Error> {
let n = out.cur.get_ref().len();
let m = n % 8;
if m != 0 {
for _ in 0..(8 - m) {
out.write_u8(0)?;
}
}
Ok(())
}
fn write_superblock(out: &mut Out) -> Result<(), Error> {
let super_ver = 0;
let free_ver = 0;
let root_group_ver = 0;
let shared_header_ver = 0;
let group_leaf_k = 4;
let group_int_k = 16;
let base_addr = 0;
let free_index_addr = u64::MAX;
let eof = 4242;
write_padding(out)?;
out.write(b"\x89HDF\r\n\x1a\n")?;
out.write_u8(super_ver)?;
out.write_u8(free_ver)?;
out.write_u8(root_group_ver)?;
out.write_u8(0)?;
out.write_u8(shared_header_ver)?;
out.write_u8(8)?;
out.write_u8(8)?;
out.write_u8(0)?;
out.write_u16(group_leaf_k)?;
out.write_u16(group_int_k)?;
let consistency = 0;
out.write_u32(consistency)?;
out.write_u64(base_addr)?;
out.write_u64(free_index_addr)?;
out.write_u64(eof)?;
let driver = u64::MAX;
out.write_u64(driver)?;
// root sym tab entry:
{
let link_name_off = 0;
let obj_header_addr = 1152;
let cache_type = 1;
out.write_u64(link_name_off)?;
out.write_u64(obj_header_addr)?;
out.write_u32(cache_type)?;
// reserved:
out.write_u32(0)?;
// scratch pad 16 bytes:
out.write_u64(0)?;
out.write_u64(0)?;
}
Ok(())
}
fn write_root_object_header(out: &mut Out) -> Result<(), Error> {
write_padding(out)?;
let pos0 = out.cur.get_ref().len() as u64;
eprintln!("write_root_object_header start at pos0 {}", pos0);
let ver = 1;
let nmsg = 1;
let hard_link_count = 1;
let header_msgs_data_len = 0;
out.write_u8(ver)?;
out.write_u8(0)?;
out.write_u16(nmsg)?;
out.write_u32(hard_link_count)?;
out.write_u32(header_msgs_data_len)?;
out.write_u32(0)?;
{
// Group Info
let msg_type = 0xa;
let msg_len = 128;
let flags = 0;
out.write_u32(msg_type)?;
out.write_u32(msg_len)?;
out.write_u8(flags)?;
out.write_u8(0)?;
out.write_u8(0)?;
out.write_u8(0)?;
}
Ok(())
}
fn write_local_heap(out: &mut Out) -> Result<(), Error> {
write_padding(out)?;
let pos0 = out.cur.get_ref().len() as u64;
eprintln!("write_local_heap start at pos0 {}", pos0);
let ver = 0;
let seg_size = 1024;
let free_list_off = u64::MAX;
let seg_addr = pos0 + 32;
out.write(b"HEAP")?;
out.write_u8(ver)?;
out.write_u8(0)?;
out.write_u8(0)?;
out.write_u8(0)?;
out.write_u64(seg_size)?;
out.write_u64(free_list_off)?;
out.write_u64(seg_addr)?;
out.write(&[0; 1024])?;
{
let h = out.cur.position();
out.cur.set_position(h - 1024);
out.write(b"somename")?;
out.cur.set_position(h);
}
Ok(())
}

View File

@@ -513,17 +513,28 @@ pub async fn update_db_with_channel_names(
};
let res =
dbconn::scan::update_db_with_channel_names(node_config.clone(), &node_config.node_config.cluster.database)
.await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
Ok(mut item) => {
item.push('\n');
Ok(item)
}
Err(e) => Err(e),
})))?;
Ok(ret)
.await;
match res {
Ok(res) => {
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
Ok(mut item) => {
item.push('\n');
Ok(item)
}
Err(e) => Err(e),
})))?;
Ok(ret)
}
Err(e) => {
let p = serde_json::to_string(&e)?;
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::from(p))?;
Ok(res)
}
}
}
pub async fn update_db_with_channel_names_3(

View File

@@ -14,15 +14,7 @@
<h1>Databuffer API 4 Documentation</h1>
<p>Documented here are the endpoints for databuffer API 4. The endpoints of the "original" unversioned API is documented at
<a href="https://git.psi.ch/sf_daq/ch.psi.daq.databuffer/blob/master/ch.psi.daq.queryrest/Readme.md">this location</a>.</p>
<h2>Available backends</h2>
Currently available:
<ul>
<li>sf-databuffer</li>
<li>hipa-archive</li>
<li>gls-archive</li>
</ul>
<a href="https://git.psi.ch/sf_daq/ch.psi.daq.databuffer/blob/master/ch.psi.daq.queryrest/Readme.md">this location</a>.</p>
<h2>Timestamp format</h2>
@@ -39,14 +31,15 @@ Currently available:
<p>Formally: tsAbsolute = tsAnchor * 10<sup>9</sup> + tsOffMs * 10<sup>6</sup> + tsOffNs</p>
<p>Two reasons lead to this choice of timestamp format:</p>
<ul>
<li>Javascript can not represent the full nanosecond-resolution timestamps in a single numeric variable.</li>
<li>The lowest 6 digits of the nanosecond timestamp are anyway abused by the timing system to emit a pulse-id.</li>
<li>Javascript can not represent the full nanosecond-resolution timestamps in a single numeric variable.</li>
<li>The lowest 6 digits of the nanosecond timestamp are anyway abused by the timing system to emit a pulse-id.</li>
</ul>
<h2>API functions</h2>
<p>Currently available functionality:</p>
<ul>
<li><a href="#list-backends">List available backends</a></li>
<li><a href="#search-channel">Search channel</a></li>
<li><a href="#query-binned">Query binned data</a></li>
<li><a href="#query-events">Query unbinned event data</a></li>
@@ -54,6 +47,27 @@ Currently available:
<a id="list-backends"></a>
<h2>List available backends</h2>
<p><strong>Method:</strong> GET</p>
<p><strong>URL:</strong> https://data-api.psi.ch/api/4/backends</p>
<p><strong>Request header:</strong> "Accept" must be "application/json"</p>
<h4>CURL example:</h4>
<pre>
curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/backends'
</pre>
<h4>Example response</h4>
<pre>{
"backends": [
"sf-databuffer",
"hipa-archive",
"gls-archive",
"proscan-archive"
]
}</pre>
<a id="search-channel"></a>
<h2>Search channel</h2>
<p><strong>Method:</strong> GET</p>
@@ -72,8 +86,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel
</pre>
<h4>Example response:</h4>
<pre>
{
<pre>{
"channels": [
{
"name": "S10MA01-DBPM120:Y2",
@@ -94,8 +107,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel
"description": ""
}
]
}
</pre>
}</pre>
<p>The search constraints are AND'd.</p>
@@ -152,7 +164,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channel
<h4>Finalised range</h4>
<p>If the server can determine that no more data will be added to the requested time range
then it will add the flag <strong>finalisedRange: true</strong> to the response.</p>
then it will add the flag <strong>finalisedRange: true</strong> to the response.</p>
@@ -163,17 +175,17 @@ then it will add the flag <strong>finalisedRange: true</strong> to the response.
<p><strong>URL:</strong> https://data-api.psi.ch/api/4/binned</p>
<p><strong>Query parameters:</strong></p>
<ul>
<li>channelBackend (e.g. "sf-databuffer")</li>
<li>channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")</li>
<li>begDate (e.g. "2021-05-26T07:10:00.000Z")</li>
<li>endDate (e.g. "2021-05-26T07:16:00.000Z")</li>
<li>binCount (number of requested bins in time-dimension, e.g. "6")</li>
<li>binningScheme (optional)</li>
<ul>
<li>if not specified: waveform gets first binned to a scalar.</li>
<li>"binningScheme=binnedX&binnedXcount=13": waveform gets first binned to 13 bins in X-dimension (waveform-dimension).</li>
<li>"binningScheme=binnedX&binnedXcount=0": waveform is not binned in X-dimension but kept at full length.</li>
</ul>
<li>channelBackend (e.g. "sf-databuffer")</li>
<li>channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")</li>
<li>begDate (e.g. "2021-05-26T07:10:00.000Z")</li>
<li>endDate (e.g. "2021-05-26T07:16:00.000Z")</li>
<li>binCount (number of requested bins in time-dimension, e.g. "6")</li>
<li>binningScheme (optional)</li>
<ul>
<li>if not specified: waveform gets first binned to a scalar.</li>
<li>"binningScheme=binnedX&binnedXcount=13": waveform gets first binned to 13 bins in X-dimension (waveform-dimension).</li>
<li>"binningScheme=binnedX&binnedXcount=0": waveform is not binned in X-dimension but kept at full length.</li>
</ul>
</ul>
<p><strong>Request header:</strong> "Accept" must be "application/json"</p>
@@ -185,11 +197,11 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel
<h4>Partial result</h4>
<p>If the requested range takes longer time to retrieve, then a partial result with at least one bin is returned.
The partial result will contain the necessary information to send another request with a range that
starts with the first not-yet-retrieved bin.
This information is provided by the <strong>continueAt</strong> and <strong>missingBins</strong> fields.
This enables the user agent to start the presentation to the user while updating the user interface
as new bins are received.</p>
The partial result will contain the necessary information to send another request with a range that
starts with the first not-yet-retrieved bin.
This information is provided by the <strong>continueAt</strong> and <strong>missingBins</strong> fields.
This enables the user agent to start the presentation to the user while updating the user interface
as new bins are received.</p>
<h4>Example response (without usage of binningScheme):</h4>
<pre>{
@@ -354,7 +366,7 @@ as new bins are received.</p>
<h4>Finalised range</h4>
<p>If the server can determine that no more data will be added to the requested time range
then it will add the flag <strong>finalisedRange: true</strong> to the response.</p>
then it will add the flag <strong>finalisedRange: true</strong> to the response.</p>

View File

@@ -1,12 +1,17 @@
use async_channel::{bounded, Receiver};
use bytes::{BufMut, BytesMut};
use err::Error;
use futures_util::FutureExt;
use netpod::NodeConfigCached;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use err::Error;
use netpod::NodeConfigCached;
#[cfg(test)]
pub mod test;
pub mod zmtp;
#[derive(Debug, Serialize, Deserialize)]
pub struct Message {
cmd: u16,
@@ -21,55 +26,6 @@ pub enum FetchItem {
Message(Message),
}
#[cfg(test)]
mod test {
use futures_util::StreamExt;
use netpod::log::*;
use netpod::{Cluster, Database, Node, NodeConfig, NodeConfigCached};
use std::collections::BTreeMap;
use std::iter::FromIterator;
#[test]
fn ca_connect_1() {
taskrun::run(async {
let it = vec![(String::new(), String::new())].into_iter();
let pairs = BTreeMap::from_iter(it);
let node_config = NodeConfigCached {
node: Node {
host: "".into(),
bin_grain_kind: 0,
port: 123,
port_raw: 123,
backend: "".into(),
split: 0,
data_base_path: "".into(),
listen: "".into(),
ksprefix: "".into(),
},
node_config: NodeConfig {
name: "".into(),
cluster: Cluster {
nodes: vec![],
database: Database {
host: "".into(),
name: "".into(),
user: "".into(),
pass: "".into(),
},
},
},
ix: 0,
};
let mut rx = super::ca_connect_1(pairs, &node_config).await?;
while let Some(item) = rx.next().await {
info!("got next: {:?}", item);
}
Ok(())
})
.unwrap();
}
}
pub async fn ca_connect_1(
_pairs: BTreeMap<String, String>,
_node_config: &NodeConfigCached,

56
netfetch/src/test.rs Normal file
View File

@@ -0,0 +1,56 @@
use futures_util::StreamExt;
use netpod::log::*;
use netpod::{Cluster, Database, Node, NodeConfig, NodeConfigCached};
use std::collections::BTreeMap;
use std::iter::FromIterator;
#[test]
fn ca_connect_1() {
taskrun::run(async {
let it = vec![(String::new(), String::new())].into_iter();
let pairs = BTreeMap::from_iter(it);
let node_config = NodeConfigCached {
node: Node {
host: "".into(),
bin_grain_kind: 0,
port: 123,
port_raw: 123,
backend: "".into(),
split: 0,
data_base_path: "".into(),
listen: "".into(),
ksprefix: "".into(),
},
node_config: NodeConfig {
name: "".into(),
cluster: Cluster {
nodes: vec![],
database: Database {
host: "".into(),
name: "".into(),
user: "".into(),
pass: "".into(),
},
},
},
ix: 0,
};
let mut rx = super::ca_connect_1(pairs, &node_config).await?;
while let Some(item) = rx.next().await {
info!("got next: {:?}", item);
}
Ok(())
})
.unwrap();
}
#[test]
fn zmtp_00() {
taskrun::run(async {
let it = vec![(String::new(), String::new())].into_iter();
let _pairs = BTreeMap::from_iter(it);
crate::zmtp::zmtp_00().await?;
Ok(())
})
.unwrap();
}

70
netfetch/src/zmtp.rs Normal file
View File

@@ -0,0 +1,70 @@
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use netpod::log::*;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
use tokio::net::TcpStream;
pub async fn zmtp_00() -> Result<(), Error> {
// PV:BSREADCONFIG
let addr = "S10-CPPM-MOT0991:9999";
let conn = tokio::net::TcpStream::connect(addr).await?;
let mut zmtp = Zmtp::new(conn);
while let Some(ev) = zmtp.next().await {
info!("got zmtp event: {:?}", ev);
}
Ok(())
}
enum ConnState {
Init,
}
struct Zmtp {
conn: TcpStream,
conn_state: ConnState,
buf1: Vec<u8>,
need_min: usize,
}
impl Zmtp {
fn new(conn: TcpStream) -> Self {
Self {
conn,
conn_state: ConnState::Init,
buf1: vec![0; 1024],
need_min: 4,
}
}
}
#[derive(Debug)]
struct ZmtpEvent {}
impl Stream for Zmtp {
type Item = Result<ZmtpEvent, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if let ConnState::Init = self.conn_state {
// can it be that we already have enough bytes received in the buffer?
let mut buf1 = mem::replace(&mut self.buf1, vec![]);
let mut rbuf = ReadBuf::new(&mut buf1);
let w = &mut self.conn;
pin_mut!(w);
let m1 = w.poll_read(cx, &mut rbuf);
self.buf1 = buf1;
match m1 {
Ready(item) => Pending,
Pending => Pending,
}
} else {
Pending
};
}
}
}

View File

@@ -1,10 +1,17 @@
use err::Error;
use std::future::Future;
use std::panic;
use std::sync::Mutex;
use tokio::task::JoinHandle;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
use err::Error;
use crate::log::*;
pub mod log {
#[allow(unused_imports)]
pub use tracing::{debug, error, info, trace, warn};
}
pub fn run<T, F: std::future::Future<Output = Result<T, Error>>>(f: F) -> Result<T, Error> {
tracing_init();