Reduce log chatter
This commit is contained in:
@@ -118,7 +118,7 @@ pub fn list_all_channels(node: &ChannelArchiver) -> Receiver<Result<ListChannelI
|
|||||||
while let Some(item) = rx.next().await {
|
while let Some(item) = rx.next().await {
|
||||||
match item {
|
match item {
|
||||||
Ok(StreamItem::Stats(item)) => {
|
Ok(StreamItem::Stats(item)) => {
|
||||||
info!("stats: {:?}", item);
|
debug!("stats: {:?}", item);
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
@@ -179,9 +179,7 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
JsVal(jsval) => {
|
JsVal(jsval) => {
|
||||||
if true {
|
debug!("jsval: {}", serde_json::to_string(&jsval)?);
|
||||||
info!("jsval: {}", serde_json::to_string(&jsval)?);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -208,9 +206,7 @@ pub async fn channel_config(q: &ChannelConfigQuery, conf: &ChannelArchiver) -> R
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
JsVal(jsval) => {
|
JsVal(jsval) => {
|
||||||
if true {
|
debug!("jsval: {}", serde_json::to_string(&jsval)?);
|
||||||
info!("jsval: {}", serde_json::to_string(&jsval)?);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -285,7 +281,8 @@ mod test {
|
|||||||
let mut data_file = open_read(data_path, stats).await?;
|
let mut data_file = open_read(data_path, stats).await?;
|
||||||
let datafile_header = read_datafile_header(&mut data_file, datablock.data_header_pos(), stats).await?;
|
let datafile_header = read_datafile_header(&mut data_file, datablock.data_header_pos(), stats).await?;
|
||||||
let events = read_data_1(&mut data_file, &datafile_header, range.clone(), false, stats).await?;
|
let events = read_data_1(&mut data_file, &datafile_header, range.clone(), false, stats).await?;
|
||||||
info!("read events: {:?}", events);
|
debug!("read events: {:?}", events);
|
||||||
|
// TODO assert more
|
||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
Ok(taskrun::run(fut).unwrap())
|
Ok(taskrun::run(fut).unwrap())
|
||||||
|
|||||||
@@ -513,7 +513,7 @@ pub async fn read_data_1(
|
|||||||
evs.values.push(value);
|
evs.values.push(value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!("parsed block with {} / {} events", ntot, evs.tss.len());
|
debug!("parsed block with {} / {} events", ntot, evs.tss.len());
|
||||||
let evs = ScalarPlainEvents::Double(evs);
|
let evs = ScalarPlainEvents::Double(evs);
|
||||||
let plain = PlainEvents::Scalar(evs);
|
let plain = PlainEvents::Scalar(evs);
|
||||||
let item = EventsItem::Plain(plain);
|
let item = EventsItem::Plain(plain);
|
||||||
|
|||||||
@@ -322,9 +322,9 @@ mod test {
|
|||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use items::eventsitem::EventsItem;
|
use items::eventsitem::EventsItem;
|
||||||
use items::{LogItem, Sitemty, StatsItem, StreamItem};
|
use items::{LogItem, Sitemty, StatsItem, StreamItem};
|
||||||
|
use netpod::log::*;
|
||||||
use netpod::timeunits::SEC;
|
use netpod::timeunits::SEC;
|
||||||
use netpod::{log::*, RangeFilterStats};
|
use netpod::{Channel, NanoRange, RangeFilterStats};
|
||||||
use netpod::{Channel, NanoRange};
|
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@@ -358,7 +358,12 @@ mod test {
|
|||||||
let filtered = RangeFilter::<_, EventsItem>::new(datablocks, range, expand);
|
let filtered = RangeFilter::<_, EventsItem>::new(datablocks, range, expand);
|
||||||
let mut stream = filtered;
|
let mut stream = filtered;
|
||||||
while let Some(block) = stream.next().await {
|
while let Some(block) = stream.next().await {
|
||||||
info!("DatablockStream yields: {:?}", block);
|
match block {
|
||||||
|
Ok(_) => {
|
||||||
|
//TODO assert more
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -544,7 +544,13 @@ pub async fn index_files_index_ref<P: Into<PathBuf> + Send>(
|
|||||||
let index_files_index_path = index_files_index_path.into();
|
let index_files_index_path = index_files_index_path.into();
|
||||||
let index_files_index = {
|
let index_files_index = {
|
||||||
let timed1 = Timed::new("slurp_index_bytes");
|
let timed1 = Timed::new("slurp_index_bytes");
|
||||||
let mut index_files_index = open_read(index_files_index_path, stats).await?;
|
let mut index_files_index = match open_read(index_files_index_path.clone(), stats).await {
|
||||||
|
Ok(k) => Ok(k),
|
||||||
|
Err(e) => {
|
||||||
|
warn!("can not open indexfile {:?}", index_files_index_path);
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}?;
|
||||||
let mut buf = vec![0; 1024 * 1024 * 50];
|
let mut buf = vec![0; 1024 * 1024 * 50];
|
||||||
let mut ntot = 0;
|
let mut ntot = 0;
|
||||||
loop {
|
loop {
|
||||||
|
|||||||
@@ -125,7 +125,6 @@ impl IndexFileBasics {
|
|||||||
rb.fill_min(min0).await?;
|
rb.fill_min(min0).await?;
|
||||||
let buf = rb.data();
|
let buf = rb.data();
|
||||||
let entry = parse_name_hash_channel_entry(buf, self.hver.as_ref())?;
|
let entry = parse_name_hash_channel_entry(buf, self.hver.as_ref())?;
|
||||||
info!("parsed entry {:?}", entry);
|
|
||||||
pos = entry.next;
|
pos = entry.next;
|
||||||
entries.push(entry);
|
entries.push(entry);
|
||||||
}
|
}
|
||||||
@@ -1155,7 +1154,8 @@ mod test {
|
|||||||
let mut i1 = 0;
|
let mut i1 = 0;
|
||||||
let mut ts_max = Nanos::from_ns(0);
|
let mut ts_max = Nanos::from_ns(0);
|
||||||
while let Some(rec) = iter.next().await? {
|
while let Some(rec) = iter.next().await? {
|
||||||
info!("GOT RECORD: {:?} {:?}", rec.beg, rec.target);
|
// TODO assert
|
||||||
|
debug!("GOT RECORD: {:?} {:?}", rec.beg, rec.target);
|
||||||
if rec.beg <= ts_max {
|
if rec.beg <= ts_max {
|
||||||
return Err(Error::with_msg_no_trace("BAD ORDER"));
|
return Err(Error::with_msg_no_trace("BAD ORDER"));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ fn read_pb_00() -> Result<(), Error> {
|
|||||||
.join("X:2021_01.pb");
|
.join("X:2021_01.pb");
|
||||||
let f1 = tokio::fs::read(path).await?;
|
let f1 = tokio::fs::read(path).await?;
|
||||||
let mut j1 = 0;
|
let mut j1 = 0;
|
||||||
|
// TODO assert more
|
||||||
loop {
|
loop {
|
||||||
let mut i2 = usize::MAX;
|
let mut i2 = usize::MAX;
|
||||||
for (i1, &k) in f1[j1..].iter().enumerate() {
|
for (i1, &k) in f1[j1..].iter().enumerate() {
|
||||||
@@ -32,17 +33,17 @@ fn read_pb_00() -> Result<(), Error> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if i2 != usize::MAX {
|
if i2 != usize::MAX {
|
||||||
info!("got NL {} .. {}", j1, i2);
|
debug!("got NL {} .. {}", j1, i2);
|
||||||
let m = unescape_archapp_msg(&f1[j1..i2], vec![])?;
|
let m = unescape_archapp_msg(&f1[j1..i2], vec![])?;
|
||||||
if j1 == 0 {
|
if j1 == 0 {
|
||||||
let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m).unwrap();
|
let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m).unwrap();
|
||||||
info!("got payload_info: {:?}", payload_info);
|
debug!("got payload_info: {:?}", payload_info);
|
||||||
} else {
|
} else {
|
||||||
let scalar_double = crate::generated::EPICSEvent::ScalarDouble::parse_from_bytes(&m).unwrap();
|
let scalar_double = crate::generated::EPICSEvent::ScalarDouble::parse_from_bytes(&m).unwrap();
|
||||||
info!("got scalar_double: {:?}", scalar_double);
|
debug!("got scalar_double: {:?}", scalar_double);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
info!("no more packets");
|
debug!("no more packets");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
j1 = i2 + 1;
|
j1 = i2 + 1;
|
||||||
|
|||||||
@@ -23,6 +23,6 @@ impl Drop for Timed {
|
|||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
let ts2 = Instant::now();
|
let ts2 = Instant::now();
|
||||||
let dt = ts2.duration_since(self.ts1);
|
let dt = ts2.duration_since(self.ts1);
|
||||||
info!("Timed {} {:?}", self.name, dt);
|
debug!("Timed {} {:?}", self.name, dt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -110,7 +110,7 @@ where
|
|||||||
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?;
|
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?;
|
||||||
query.append_to_url(&mut url);
|
query.append_to_url(&mut url);
|
||||||
let url = url;
|
let url = url;
|
||||||
info!("get_binned_channel get {}", url);
|
debug!("get_binned_channel get {}", url);
|
||||||
let req = hyper::Request::builder()
|
let req = hyper::Request::builder()
|
||||||
.method(http::Method::GET)
|
.method(http::Method::GET)
|
||||||
.uri(url.to_string())
|
.uri(url.to_string())
|
||||||
@@ -126,7 +126,7 @@ where
|
|||||||
let res = consume_binned_response::<NTY, _>(s2).await?;
|
let res = consume_binned_response::<NTY, _>(s2).await?;
|
||||||
let t2 = chrono::Utc::now();
|
let t2 = chrono::Utc::now();
|
||||||
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
||||||
info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms);
|
debug!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms);
|
||||||
if !res.is_valid() {
|
if !res.is_valid() {
|
||||||
Err(Error::with_msg(format!("invalid response: {:?}", res)))
|
Err(Error::with_msg(format!("invalid response: {:?}", res)))
|
||||||
} else if res.range_complete_count == 0 && expect_range_complete {
|
} else if res.range_complete_count == 0 && expect_range_complete {
|
||||||
@@ -186,7 +186,8 @@ where
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
StreamItem::Stats(item) => {
|
StreamItem::Stats(item) => {
|
||||||
info!("Stats: {:?}", item);
|
// TODO collect somewhere
|
||||||
|
debug!("Stats: {:?}", item);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
StreamItem::DataItem(frame) => {
|
StreamItem::DataItem(frame) => {
|
||||||
@@ -200,10 +201,7 @@ where
|
|||||||
Streamlog::emit(&item);
|
Streamlog::emit(&item);
|
||||||
Some(Ok(StreamItem::Log(item)))
|
Some(Ok(StreamItem::Log(item)))
|
||||||
}
|
}
|
||||||
item => {
|
item => Some(Ok(item)),
|
||||||
info!("TEST GOT ITEM {:?}", item);
|
|
||||||
Some(Ok(item))
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("TEST GOT ERROR FRAME: {:?}", e);
|
error!("TEST GOT ERROR FRAME: {:?}", e);
|
||||||
@@ -253,6 +251,6 @@ where
|
|||||||
ready(g)
|
ready(g)
|
||||||
});
|
});
|
||||||
let ret = s1.await;
|
let ret = s1.await;
|
||||||
info!("BinnedResponse: {:?}", ret);
|
debug!("BinnedResponse: {:?}", ret);
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -98,7 +98,7 @@ async fn get_binned_json_common(
|
|||||||
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?;
|
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?;
|
||||||
query.append_to_url(&mut url);
|
query.append_to_url(&mut url);
|
||||||
let url = url;
|
let url = url;
|
||||||
info!("get_binned_json_common get {}", url);
|
debug!("get_binned_json_common get {}", url);
|
||||||
let req = hyper::Request::builder()
|
let req = hyper::Request::builder()
|
||||||
.method(http::Method::GET)
|
.method(http::Method::GET)
|
||||||
.uri(url.to_string())
|
.uri(url.to_string())
|
||||||
@@ -112,11 +112,11 @@ async fn get_binned_json_common(
|
|||||||
let res = hyper::body::to_bytes(res.into_body()).await?;
|
let res = hyper::body::to_bytes(res.into_body()).await?;
|
||||||
let t2 = chrono::Utc::now();
|
let t2 = chrono::Utc::now();
|
||||||
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
||||||
info!("get_binned_json_common DONE time {} ms", ms);
|
debug!("get_binned_json_common DONE time {} ms", ms);
|
||||||
let res = String::from_utf8_lossy(&res).to_string();
|
let res = String::from_utf8_lossy(&res).to_string();
|
||||||
//info!("get_binned_json_common res: {}", res);
|
|
||||||
let res: serde_json::Value = serde_json::from_str(res.as_str())?;
|
let res: serde_json::Value = serde_json::from_str(res.as_str())?;
|
||||||
info!(
|
// TODO assert more
|
||||||
|
debug!(
|
||||||
"result from endpoint: --------------\n{}\n--------------",
|
"result from endpoint: --------------\n{}\n--------------",
|
||||||
serde_json::to_string_pretty(&res)?
|
serde_json::to_string_pretty(&res)?
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ where
|
|||||||
let mut url = Url::parse(&format!("http://{}:{}", hp.host, hp.port))?;
|
let mut url = Url::parse(&format!("http://{}:{}", hp.host, hp.port))?;
|
||||||
query.append_to_url(&mut url);
|
query.append_to_url(&mut url);
|
||||||
let url = url;
|
let url = url;
|
||||||
info!("get_plain_events get {}", url);
|
debug!("get_plain_events get {}", url);
|
||||||
let req = hyper::Request::builder()
|
let req = hyper::Request::builder()
|
||||||
.method(http::Method::GET)
|
.method(http::Method::GET)
|
||||||
.uri(url.to_string())
|
.uri(url.to_string())
|
||||||
@@ -83,7 +83,8 @@ where
|
|||||||
let res = consume_plain_events_binary::<NTY, _>(s2).await?;
|
let res = consume_plain_events_binary::<NTY, _>(s2).await?;
|
||||||
let t2 = chrono::Utc::now();
|
let t2 = chrono::Utc::now();
|
||||||
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
||||||
info!("time {} ms", ms);
|
// TODO add timeout
|
||||||
|
debug!("time {} ms", ms);
|
||||||
if !res.is_valid() {
|
if !res.is_valid() {
|
||||||
Ok(res)
|
Ok(res)
|
||||||
} else {
|
} else {
|
||||||
@@ -139,7 +140,7 @@ where
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
StreamItem::Stats(item) => {
|
StreamItem::Stats(item) => {
|
||||||
info!("Stats: {:?}", item);
|
debug!("Stats: {:?}", item);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
StreamItem::DataItem(frame) => {
|
StreamItem::DataItem(frame) => {
|
||||||
@@ -154,10 +155,7 @@ where
|
|||||||
Streamlog::emit(&item);
|
Streamlog::emit(&item);
|
||||||
Some(Ok(StreamItem::Log(item)))
|
Some(Ok(StreamItem::Log(item)))
|
||||||
}
|
}
|
||||||
item => {
|
item => Some(Ok(item)),
|
||||||
info!("TEST GOT ITEM {:?}", item);
|
|
||||||
Some(Ok(item))
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("TEST GOT ERROR FRAME: {:?}", e);
|
error!("TEST GOT ERROR FRAME: {:?}", e);
|
||||||
@@ -208,7 +206,7 @@ where
|
|||||||
ready(g)
|
ready(g)
|
||||||
});
|
});
|
||||||
let ret = s1.await;
|
let ret = s1.await;
|
||||||
info!("result: {:?}", ret);
|
debug!("result: {:?}", ret);
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -275,7 +273,7 @@ async fn get_plain_events_json(
|
|||||||
let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?;
|
let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?;
|
||||||
query.append_to_url(&mut url);
|
query.append_to_url(&mut url);
|
||||||
let url = url;
|
let url = url;
|
||||||
info!("get_plain_events get {}", url);
|
debug!("get_plain_events get {}", url);
|
||||||
let req = hyper::Request::builder()
|
let req = hyper::Request::builder()
|
||||||
.method(http::Method::GET)
|
.method(http::Method::GET)
|
||||||
.uri(url.to_string())
|
.uri(url.to_string())
|
||||||
@@ -288,10 +286,11 @@ async fn get_plain_events_json(
|
|||||||
}
|
}
|
||||||
let buf = hyper::body::to_bytes(res.into_body()).await?;
|
let buf = hyper::body::to_bytes(res.into_body()).await?;
|
||||||
let s = String::from_utf8_lossy(&buf);
|
let s = String::from_utf8_lossy(&buf);
|
||||||
let res: JsonValue = serde_json::from_str(&s)?;
|
let _res: JsonValue = serde_json::from_str(&s)?;
|
||||||
info!("GOT: {}", serde_json::to_string_pretty(&res)?);
|
// TODO assert more
|
||||||
let t2 = chrono::Utc::now();
|
let t2 = chrono::Utc::now();
|
||||||
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
||||||
info!("time {} ms", ms);
|
// TODO add timeout
|
||||||
|
debug!("time {} ms", ms);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -175,7 +175,7 @@ async fn get_json_common(
|
|||||||
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?;
|
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?;
|
||||||
query.append_to_url(&mut url);
|
query.append_to_url(&mut url);
|
||||||
let url = url;
|
let url = url;
|
||||||
info!("get_json_common get {}", url);
|
debug!("get_json_common get {}", url);
|
||||||
let req = hyper::Request::builder()
|
let req = hyper::Request::builder()
|
||||||
.method(http::Method::GET)
|
.method(http::Method::GET)
|
||||||
.uri(url.to_string())
|
.uri(url.to_string())
|
||||||
@@ -189,11 +189,12 @@ async fn get_json_common(
|
|||||||
let res = hyper::body::to_bytes(res.into_body()).await?;
|
let res = hyper::body::to_bytes(res.into_body()).await?;
|
||||||
let t2 = chrono::Utc::now();
|
let t2 = chrono::Utc::now();
|
||||||
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
|
||||||
info!("get_json_common DONE time {} ms", ms);
|
// TODO add timeout
|
||||||
|
debug!("get_json_common DONE time {} ms", ms);
|
||||||
let res = String::from_utf8_lossy(&res).to_string();
|
let res = String::from_utf8_lossy(&res).to_string();
|
||||||
//info!("get_json_common res: {}", res);
|
|
||||||
let res: serde_json::Value = serde_json::from_str(res.as_str())?;
|
let res: serde_json::Value = serde_json::from_str(res.as_str())?;
|
||||||
info!(
|
// TODO assert these:
|
||||||
|
debug!(
|
||||||
"result from endpoint: --------------\n{}\n--------------",
|
"result from endpoint: --------------\n{}\n--------------",
|
||||||
serde_json::to_string_pretty(&res)?
|
serde_json::to_string_pretty(&res)?
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -2,7 +2,10 @@
|
|||||||
name = "disk"
|
name = "disk"
|
||||||
version = "0.0.1-a.1"
|
version = "0.0.1-a.1"
|
||||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||||
edition = "2018"
|
edition = "2021"
|
||||||
|
|
||||||
|
[lib]
|
||||||
|
path = "src/disk.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
|||||||
@@ -82,7 +82,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec {
|
|||||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
||||||
let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
|
let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
|
||||||
Ok(Some(pre_range)) => {
|
Ok(Some(pre_range)) => {
|
||||||
info!("BinnedBinaryChannelExec found pre_range: {:?}", pre_range);
|
debug!("BinnedBinaryChannelExec found pre_range: {:?}", pre_range);
|
||||||
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
|
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
|
||||||
let msg = format!(
|
let msg = format!(
|
||||||
"BinnedBinaryChannelExec incompatible ranges:\npre_range: {:?}\nrange: {:?}",
|
"BinnedBinaryChannelExec incompatible ranges:\npre_range: {:?}\nrange: {:?}",
|
||||||
@@ -109,7 +109,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec {
|
|||||||
Ok(Box::pin(s) as Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>)
|
Ok(Box::pin(s) as Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>)
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
info!(
|
debug!(
|
||||||
"BinnedBinaryChannelExec no covering range for prebinned, merge from remotes instead {:?}",
|
"BinnedBinaryChannelExec no covering range for prebinned, merge from remotes instead {:?}",
|
||||||
range
|
range
|
||||||
);
|
);
|
||||||
@@ -326,7 +326,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
|
|||||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
||||||
let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
|
let souter = match PreBinnedPatchRange::covering_range(self.query.range().clone(), self.query.bin_count()) {
|
||||||
Ok(Some(pre_range)) => {
|
Ok(Some(pre_range)) => {
|
||||||
info!("BinnedJsonChannelExec found pre_range: {:?}", pre_range);
|
debug!("BinnedJsonChannelExec found pre_range: {:?}", pre_range);
|
||||||
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
|
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
|
||||||
let msg = format!(
|
let msg = format!(
|
||||||
"BinnedJsonChannelExec incompatible ranges:\npre_range: {:?}\nrange: {:?}",
|
"BinnedJsonChannelExec incompatible ranges:\npre_range: {:?}\nrange: {:?}",
|
||||||
@@ -354,7 +354,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
|
|||||||
Ok(Box::pin(s) as Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>)
|
Ok(Box::pin(s) as Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>)
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
info!(
|
debug!(
|
||||||
"BinnedJsonChannelExec no covering range for prebinned, merge from remotes instead {:?}",
|
"BinnedJsonChannelExec no covering range for prebinned, merge from remotes instead {:?}",
|
||||||
range
|
range
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -342,7 +342,7 @@ impl FromUrl for BinnedQuery {
|
|||||||
.parse()
|
.parse()
|
||||||
.map_err(|e| Error::with_msg(format!("can not parse doLog {:?}", e)))?,
|
.map_err(|e| Error::with_msg(format!("can not parse doLog {:?}", e)))?,
|
||||||
};
|
};
|
||||||
info!("BinnedQuery::from_url {:?}", ret);
|
debug!("BinnedQuery::from_url {:?}", ret);
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -355,7 +355,7 @@ where
|
|||||||
Ok(item) => match item {
|
Ok(item) => match item {
|
||||||
StreamItem::Log(item) => {
|
StreamItem::Log(item) => {
|
||||||
if do_log {
|
if do_log {
|
||||||
info!("collect_plain_events_json log {:?}", item);
|
debug!("collect_plain_events_json log {:?}", item);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
StreamItem::Stats(item) => match item {
|
StreamItem::Stats(item) => match item {
|
||||||
@@ -396,7 +396,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
let ret = serde_json::to_value(collector.result()?)?;
|
let ret = serde_json::to_value(collector.result()?)?;
|
||||||
info!("Total duration: {:?}", total_duration);
|
debug!("Total duration: {:?}", total_duration);
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -247,7 +247,7 @@ async fn open_files_inner(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
let h = OpenedFileSet { timebin: tb, files: a };
|
let h = OpenedFileSet { timebin: tb, files: a };
|
||||||
info!(
|
debug!(
|
||||||
"----- open_files_inner giving OpenedFileSet with {} files",
|
"----- open_files_inner giving OpenedFileSet with {} files",
|
||||||
h.files.len()
|
h.files.len()
|
||||||
);
|
);
|
||||||
@@ -351,13 +351,13 @@ async fn open_expanded_files_inner(
|
|||||||
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
|
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
|
||||||
let w = position_file(&path, range, true, false).await?;
|
let w = position_file(&path, range, true, false).await?;
|
||||||
if w.found {
|
if w.found {
|
||||||
info!("----- open_expanded_files_inner w.found for {:?}", path);
|
debug!("----- open_expanded_files_inner w.found for {:?}", path);
|
||||||
a.push(w.file);
|
a.push(w.file);
|
||||||
found_pre = true;
|
found_pre = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let h = OpenedFileSet { timebin: tb, files: a };
|
let h = OpenedFileSet { timebin: tb, files: a };
|
||||||
info!(
|
debug!(
|
||||||
"----- open_expanded_files_inner giving OpenedFileSet with {} files",
|
"----- open_expanded_files_inner giving OpenedFileSet with {} files",
|
||||||
h.files.len()
|
h.files.len()
|
||||||
);
|
);
|
||||||
@@ -387,7 +387,8 @@ async fn open_expanded_files_inner(
|
|||||||
p1 += 1;
|
p1 += 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
info!("Could not find some event before the requested range, fall back to standard file list.");
|
// TODO emit statsfor this or log somewhere?
|
||||||
|
debug!("Could not find some event before the requested range, fall back to standard file list.");
|
||||||
// Try to locate files according to non-expand-algorithm.
|
// Try to locate files according to non-expand-algorithm.
|
||||||
open_files_inner(chtx, range, &channel_config, node).await?;
|
open_files_inner(chtx, range, &channel_config, node).await?;
|
||||||
}
|
}
|
||||||
@@ -423,7 +424,7 @@ fn expanded_file_list() {
|
|||||||
while let Some(file) = files.next().await {
|
while let Some(file) = files.next().await {
|
||||||
match file {
|
match file {
|
||||||
Ok(k) => {
|
Ok(k) => {
|
||||||
info!("opened file: {:?}", k);
|
debug!("opened file: {:?}", k);
|
||||||
paths.push(k.files);
|
paths.push(k.files);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|||||||
@@ -300,10 +300,10 @@ impl NeedMinBuffer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO remove this again
|
// TODO collect somewhere else
|
||||||
impl Drop for NeedMinBuffer {
|
impl Drop for NeedMinBuffer {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
info!("NeedMinBuffer Drop Stats:\nbuf_len_histo: {:?}", self.buf_len_histo);
|
debug!("NeedMinBuffer Drop Stats:\nbuf_len_histo: {:?}", self.buf_len_histo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -355,7 +355,8 @@ impl Stream for NeedMinBuffer {
|
|||||||
Ready(Some(Err(e.into())))
|
Ready(Some(Err(e.into())))
|
||||||
}
|
}
|
||||||
Ready(None) => {
|
Ready(None) => {
|
||||||
info!("NeedMinBuffer histo: {:?}", self.buf_len_histo);
|
// TODO collect somewhere
|
||||||
|
debug!("NeedMinBuffer histo: {:?}", self.buf_len_histo);
|
||||||
Ready(None)
|
Ready(None)
|
||||||
}
|
}
|
||||||
Pending => Pending,
|
Pending => Pending,
|
||||||
@@ -114,7 +114,7 @@ impl Stream for EventChunkerMultifile {
|
|||||||
let file = ofs.files.pop().unwrap();
|
let file = ofs.files.pop().unwrap();
|
||||||
let path = file.path;
|
let path = file.path;
|
||||||
let msg = format!("handle OFS {:?}", ofs);
|
let msg = format!("handle OFS {:?}", ofs);
|
||||||
info!("{}", msg);
|
debug!("{}", msg);
|
||||||
let item = LogItem::quick(Level::INFO, msg);
|
let item = LogItem::quick(Level::INFO, msg);
|
||||||
match file.file {
|
match file.file {
|
||||||
Some(file) => {
|
Some(file) => {
|
||||||
@@ -141,12 +141,12 @@ impl Stream for EventChunkerMultifile {
|
|||||||
Ready(Some(Ok(StreamItem::Log(item))))
|
Ready(Some(Ok(StreamItem::Log(item))))
|
||||||
} else if ofs.files.len() == 0 {
|
} else if ofs.files.len() == 0 {
|
||||||
let msg = format!("handle OFS {:?} NO FILES", ofs);
|
let msg = format!("handle OFS {:?} NO FILES", ofs);
|
||||||
info!("{}", msg);
|
debug!("{}", msg);
|
||||||
let item = LogItem::quick(Level::INFO, msg);
|
let item = LogItem::quick(Level::INFO, msg);
|
||||||
Ready(Some(Ok(StreamItem::Log(item))))
|
Ready(Some(Ok(StreamItem::Log(item))))
|
||||||
} else {
|
} else {
|
||||||
let msg = format!("handle OFS MERGED {:?}", ofs);
|
let msg = format!("handle OFS MERGED {:?}", ofs);
|
||||||
warn!("{}", msg);
|
debug!("{}", msg);
|
||||||
let item = LogItem::quick(Level::INFO, msg);
|
let item = LogItem::quick(Level::INFO, msg);
|
||||||
let mut chunkers = vec![];
|
let mut chunkers = vec![];
|
||||||
for of in ofs.files {
|
for of in ofs.files {
|
||||||
@@ -255,7 +255,8 @@ mod test {
|
|||||||
Ok(item) => match item {
|
Ok(item) => match item {
|
||||||
StreamItem::DataItem(item) => match item {
|
StreamItem::DataItem(item) => match item {
|
||||||
RangeCompletableItem::Data(item) => {
|
RangeCompletableItem::Data(item) => {
|
||||||
info!("item: {:?}", item.tss.iter().map(|x| x / MS).collect::<Vec<_>>());
|
// TODO assert more
|
||||||
|
debug!("item: {:?}", item.tss.iter().map(|x| x / MS).collect::<Vec<_>>());
|
||||||
event_count += item.tss.len();
|
event_count += item.tss.len();
|
||||||
for ts in item.tss {
|
for ts in item.tss {
|
||||||
tss.push(ts);
|
tss.push(ts);
|
||||||
@@ -280,7 +281,8 @@ mod test {
|
|||||||
end: DAY + MS * 100,
|
end: DAY + MS * 100,
|
||||||
};
|
};
|
||||||
let res = read_expanded_for_range(range, 0)?;
|
let res = read_expanded_for_range(range, 0)?;
|
||||||
info!("got {:?}", res.1);
|
// TODO assert more
|
||||||
|
debug!("got {:?}", res.1);
|
||||||
if res.0 != 3 {
|
if res.0 != 3 {
|
||||||
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
|
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,10 +46,10 @@ pub struct EventChunker {
|
|||||||
unordered_warn_count: usize,
|
unordered_warn_count: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO remove again, use it explicitly
|
|
||||||
impl Drop for EventChunker {
|
impl Drop for EventChunker {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
info!(
|
// TODO collect somewhere
|
||||||
|
debug!(
|
||||||
"EventChunker Drop Stats:\ndecomp_dt_histo: {:?}\nitem_len_emit_histo: {:?}",
|
"EventChunker Drop Stats:\ndecomp_dt_histo: {:?}\nitem_len_emit_histo: {:?}",
|
||||||
self.decomp_dt_histo, self.item_len_emit_histo
|
self.decomp_dt_histo, self.item_len_emit_histo
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -39,10 +39,10 @@ pub struct MergedStream<S, ITY> {
|
|||||||
stats_items: VecDeque<StatsItem>,
|
stats_items: VecDeque<StatsItem>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO get rid, log info explicitly.
|
|
||||||
impl<S, ITY> Drop for MergedStream<S, ITY> {
|
impl<S, ITY> Drop for MergedStream<S, ITY> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
info!(
|
// TODO collect somewhere
|
||||||
|
debug!(
|
||||||
"MergedStream Drop Stats:\nbatch_len_emit_histo: {:?}",
|
"MergedStream Drop Stats:\nbatch_len_emit_histo: {:?}",
|
||||||
self.batch_len_emit_histo
|
self.batch_len_emit_histo
|
||||||
);
|
);
|
||||||
@@ -204,7 +204,7 @@ where
|
|||||||
for ii in 0..batch.len() {
|
for ii in 0..batch.len() {
|
||||||
aa.push(batch.ts(ii));
|
aa.push(batch.ts(ii));
|
||||||
}
|
}
|
||||||
info!("MergedBlobsStream A emits {} events tss {:?}", batch.len(), aa);
|
debug!("MergedBlobsStream A emits {} events tss {:?}", batch.len(), aa);
|
||||||
};
|
};
|
||||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch)))))
|
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch)))))
|
||||||
} else {
|
} else {
|
||||||
@@ -265,7 +265,7 @@ where
|
|||||||
for ii in 0..batch.len() {
|
for ii in 0..batch.len() {
|
||||||
aa.push(batch.ts(ii));
|
aa.push(batch.ts(ii));
|
||||||
}
|
}
|
||||||
info!("MergedBlobsStream B emits {} events tss {:?}", batch.len(), aa);
|
debug!("MergedBlobsStream B emits {} events tss {:?}", batch.len(), aa);
|
||||||
};
|
};
|
||||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch)))))
|
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch)))))
|
||||||
} else {
|
} else {
|
||||||
@@ -374,9 +374,10 @@ mod test {
|
|||||||
let mut merged = MergedStream::new(inps);
|
let mut merged = MergedStream::new(inps);
|
||||||
let mut cevs = CollectedEvents { tss: vec![] };
|
let mut cevs = CollectedEvents { tss: vec![] };
|
||||||
let mut i1 = 0;
|
let mut i1 = 0;
|
||||||
|
// TODO assert more
|
||||||
while let Some(item) = merged.next().await {
|
while let Some(item) = merged.next().await {
|
||||||
if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item {
|
if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item {
|
||||||
info!("item: {:?}", item);
|
debug!("item: {:?}", item);
|
||||||
for ts in item.tss {
|
for ts in item.tss {
|
||||||
cevs.tss.push(ts);
|
cevs.tss.push(ts);
|
||||||
}
|
}
|
||||||
@@ -386,8 +387,8 @@ mod test {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!("read {} data items", i1);
|
debug!("read {} data items", i1);
|
||||||
info!("cevs: {:?}", cevs);
|
debug!("cevs: {:?}", cevs);
|
||||||
Ok(cevs)
|
Ok(cevs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ where
|
|||||||
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType,
|
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType,
|
||||||
{
|
{
|
||||||
pub fn new(evq: RawEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self {
|
pub fn new(evq: RawEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self {
|
||||||
info!("MergedFromRemotes evq {:?}", evq);
|
debug!("MergedFromRemotes evq {:?}", evq);
|
||||||
let mut tcp_establish_futs = vec![];
|
let mut tcp_establish_futs = vec![];
|
||||||
for node in &cluster.nodes {
|
for node in &cluster.nodes {
|
||||||
let f = x_processed_stream_from_node::<ENP>(evq.clone(), perf_opts.clone(), node.clone());
|
let f = x_processed_stream_from_node::<ENP>(evq.clone(), perf_opts.clone(), node.clone());
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ where
|
|||||||
<ENP as EventsNodeProcessor>::Output: Unpin + 'static,
|
<ENP as EventsNodeProcessor>::Output: Unpin + 'static,
|
||||||
Result<StreamItem<RangeCompletableItem<<ENP as EventsNodeProcessor>::Output>>, err::Error>: FrameType,
|
Result<StreamItem<RangeCompletableItem<<ENP as EventsNodeProcessor>::Output>>, err::Error>: FrameType,
|
||||||
{
|
{
|
||||||
netpod::log::info!("x_processed_stream_from_node to: {}:{}", node.host, node.port_raw);
|
netpod::log::debug!("x_processed_stream_from_node to: {}:{}", node.host, node.port_raw);
|
||||||
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
|
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
|
||||||
let qjs = serde_json::to_string(&query)?;
|
let qjs = serde_json::to_string(&query)?;
|
||||||
let (netin, mut netout) = net.into_split();
|
let (netin, mut netout) = net.into_split();
|
||||||
@@ -48,7 +48,7 @@ pub async fn x_processed_event_blobs_stream_from_node(
|
|||||||
perf_opts: PerfOpts,
|
perf_opts: PerfOpts,
|
||||||
node: Node,
|
node: Node,
|
||||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
|
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
|
||||||
netpod::log::info!(
|
netpod::log::debug!(
|
||||||
"x_processed_event_blobs_stream_from_node to: {}:{}",
|
"x_processed_event_blobs_stream_from_node to: {}:{}",
|
||||||
node.host,
|
node.host,
|
||||||
node.port_raw
|
node.port_raw
|
||||||
|
|||||||
@@ -167,10 +167,11 @@ async fn write_1() -> Result<(), Error> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
#[allow(unused)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
//#[test]
|
||||||
fn t1() -> Result<(), Error> {
|
fn t1() -> Result<(), Error> {
|
||||||
Ok(taskrun::run(write_1()).unwrap())
|
Ok(taskrun::run(write_1()).unwrap())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,9 +10,7 @@ use hyper::{Body, Client, Request, Response};
|
|||||||
use items::{RangeCompletableItem, Sitemty, StreamItem};
|
use items::{RangeCompletableItem, Sitemty, StreamItem};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use netpod::query::RawEventsQuery;
|
use netpod::query::RawEventsQuery;
|
||||||
use netpod::{
|
use netpod::{log::*, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, Shape, APP_OCTET};
|
||||||
log::*, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape, APP_OCTET,
|
|
||||||
};
|
|
||||||
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON};
|
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON};
|
||||||
use parse::channelconfig::{
|
use parse::channelconfig::{
|
||||||
extract_matching_config_entry, read_local_config, Config, ConfigEntry, MatchingConfigEntry,
|
extract_matching_config_entry, read_local_config, Config, ConfigEntry, MatchingConfigEntry,
|
||||||
@@ -575,7 +573,7 @@ impl DataApiPython3DataStream {
|
|||||||
if !*header_out {
|
if !*header_out {
|
||||||
let head = Api1ChannelHeader {
|
let head = Api1ChannelHeader {
|
||||||
name: channel.name.clone(),
|
name: channel.name.clone(),
|
||||||
ty: scalar_type_to_api3proto(&b.scalar_types[i1]).into(),
|
ty: b.scalar_types[i1].to_api3proto().into(),
|
||||||
byte_order: if b.be[i1] {
|
byte_order: if b.be[i1] {
|
||||||
"BIG_ENDIAN".into()
|
"BIG_ENDIAN".into()
|
||||||
} else {
|
} else {
|
||||||
@@ -688,7 +686,7 @@ impl Stream for DataApiPython3DataStream {
|
|||||||
let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 };
|
let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 };
|
||||||
// TODO is this a good to place decide this?
|
// TODO is this a good to place decide this?
|
||||||
let s = if self.node_config.node_config.cluster.is_central_storage {
|
let s = if self.node_config.node_config.cluster.is_central_storage {
|
||||||
info!("Set up central storage stream");
|
debug!("Set up central storage stream");
|
||||||
// TODO pull up this config
|
// TODO pull up this config
|
||||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||||
let s = disk::raw::conn::make_local_event_blobs_stream(
|
let s = disk::raw::conn::make_local_event_blobs_stream(
|
||||||
@@ -703,7 +701,7 @@ impl Stream for DataApiPython3DataStream {
|
|||||||
)?;
|
)?;
|
||||||
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
|
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
|
||||||
} else {
|
} else {
|
||||||
info!("Set up merged remote stream");
|
debug!("Set up merged remote stream");
|
||||||
let s = disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes::new(
|
let s = disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes::new(
|
||||||
evq,
|
evq,
|
||||||
perf_opts,
|
perf_opts,
|
||||||
@@ -769,7 +767,7 @@ impl Stream for DataApiPython3DataStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn api1_binary_events(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
pub async fn api1_binary_events(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||||
info!("api1_binary_events uri: {:?} headers: {:?}", req.uri(), req.headers());
|
debug!("api1_binary_events uri: {:?} headers: {:?}", req.uri(), req.headers());
|
||||||
let accept_def = "";
|
let accept_def = "";
|
||||||
let accept = req
|
let accept = req
|
||||||
.headers()
|
.headers()
|
||||||
@@ -784,12 +782,12 @@ pub async fn api1_binary_events(req: Request<Body>, node_config: &NodeConfigCach
|
|||||||
error!("got body_data: {:?}", String::from_utf8(body_data[..].to_vec()));
|
error!("got body_data: {:?}", String::from_utf8(body_data[..].to_vec()));
|
||||||
return Err(Error::with_msg_no_trace("can not parse query"));
|
return Err(Error::with_msg_no_trace("can not parse query"));
|
||||||
};
|
};
|
||||||
info!("got Api1Query: {:?}", qu);
|
debug!("got Api1Query: {:?}", qu);
|
||||||
let beg_date = chrono::DateTime::parse_from_rfc3339(&qu.range.start_date);
|
let beg_date = chrono::DateTime::parse_from_rfc3339(&qu.range.start_date);
|
||||||
let end_date = chrono::DateTime::parse_from_rfc3339(&qu.range.end_date);
|
let end_date = chrono::DateTime::parse_from_rfc3339(&qu.range.end_date);
|
||||||
let beg_date = beg_date?;
|
let beg_date = beg_date?;
|
||||||
let end_date = end_date?;
|
let end_date = end_date?;
|
||||||
info!("beg_date {:?} end_date {:?}", beg_date, end_date);
|
debug!("beg_date {:?} end_date {:?}", beg_date, end_date);
|
||||||
//let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
//let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||||
//let query = PlainEventsBinaryQuery::from_url(&url)?;
|
//let query = PlainEventsBinaryQuery::from_url(&url)?;
|
||||||
// TODO add stricter check for types, check with client.
|
// TODO add stricter check for types, check with client.
|
||||||
@@ -832,22 +830,6 @@ pub async fn api1_binary_events(req: Request<Body>, node_config: &NodeConfigCach
|
|||||||
return Ok(ret);
|
return Ok(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn scalar_type_to_api3proto(sty: &ScalarType) -> &'static str {
|
|
||||||
match sty {
|
|
||||||
ScalarType::U8 => "uint8",
|
|
||||||
ScalarType::U16 => "uint16",
|
|
||||||
ScalarType::U32 => "uint32",
|
|
||||||
ScalarType::U64 => "uint64",
|
|
||||||
ScalarType::I8 => "int8",
|
|
||||||
ScalarType::I16 => "int16",
|
|
||||||
ScalarType::I32 => "int32",
|
|
||||||
ScalarType::I64 => "int64",
|
|
||||||
ScalarType::F32 => "float32",
|
|
||||||
ScalarType::F64 => "float64",
|
|
||||||
ScalarType::BOOL => "bool",
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn shape_to_api3proto(sh: &Option<Vec<u32>>) -> Vec<u32> {
|
fn shape_to_api3proto(sh: &Option<Vec<u32>>) -> Vec<u32> {
|
||||||
match sh {
|
match sh {
|
||||||
None => vec![],
|
None => vec![],
|
||||||
|
|||||||
@@ -50,7 +50,8 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> {
|
|||||||
let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?;
|
let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?;
|
||||||
let make_service = make_service_fn({
|
let make_service = make_service_fn({
|
||||||
move |conn: &AddrStream| {
|
move |conn: &AddrStream| {
|
||||||
info!("new connection from {:?}", conn.remote_addr());
|
// TODO send to logstash
|
||||||
|
debug!("new connection from {:?}", conn.remote_addr());
|
||||||
let node_config = node_config.clone();
|
let node_config = node_config.clone();
|
||||||
async move {
|
async move {
|
||||||
Ok::<_, Error>(service_fn({
|
Ok::<_, Error>(service_fn({
|
||||||
@@ -167,7 +168,8 @@ macro_rules! static_http_api1 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||||
info!("http_service_try {:?}", req.uri());
|
// TODO send to logstash
|
||||||
|
debug!("http_service_try {:?}", req.uri());
|
||||||
let uri = req.uri().clone();
|
let uri = req.uri().clone();
|
||||||
let path = uri.path();
|
let path = uri.path();
|
||||||
if path == "/api/4/node_status" {
|
if path == "/api/4/node_status" {
|
||||||
@@ -299,7 +301,6 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
|
|||||||
} else if let Some(h) = channelarchiver::BlockStream::should_handle(path) {
|
} else if let Some(h) = channelarchiver::BlockStream::should_handle(path) {
|
||||||
h.handle(req, &node_config).await
|
h.handle(req, &node_config).await
|
||||||
} else if path.starts_with("/api/1/requestStatus/") {
|
} else if path.starts_with("/api/1/requestStatus/") {
|
||||||
info!("{}", path);
|
|
||||||
Ok(response(StatusCode::OK).body(Body::from("{}"))?)
|
Ok(response(StatusCode::OK).body(Body::from("{}"))?)
|
||||||
} else if path.starts_with("/api/1/documentation/") {
|
} else if path.starts_with("/api/1/documentation/") {
|
||||||
if req.method() == Method::GET {
|
if req.method() == Method::GET {
|
||||||
@@ -424,7 +425,7 @@ async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Re
|
|||||||
let desc = format!("binned-BEG-{}-END-{}", query.range().beg / SEC, query.range().end / SEC);
|
let desc = format!("binned-BEG-{}-END-{}", query.range().beg / SEC, query.range().end / SEC);
|
||||||
let span1 = span!(Level::INFO, "httpret::binned", desc = &desc.as_str());
|
let span1 = span!(Level::INFO, "httpret::binned", desc = &desc.as_str());
|
||||||
span1.in_scope(|| {
|
span1.in_scope(|| {
|
||||||
info!("binned STARTING {:?}", query);
|
debug!("binned STARTING {:?}", query);
|
||||||
});
|
});
|
||||||
match head.headers.get(http::header::ACCEPT) {
|
match head.headers.get(http::header::ACCEPT) {
|
||||||
Some(v) if v == APP_OCTET => binned_binary(query, node_config).await,
|
Some(v) if v == APP_OCTET => binned_binary(query, node_config).await,
|
||||||
@@ -473,7 +474,7 @@ async fn prebinned(req: Request<Body>, node_config: &NodeConfigCached) -> Result
|
|||||||
);
|
);
|
||||||
let span1 = span!(Level::INFO, "httpret::prebinned", desc = &desc.as_str());
|
let span1 = span!(Level::INFO, "httpret::prebinned", desc = &desc.as_str());
|
||||||
span1.in_scope(|| {
|
span1.in_scope(|| {
|
||||||
info!("prebinned STARTING");
|
debug!("prebinned STARTING");
|
||||||
});
|
});
|
||||||
let fut = disk::binned::prebinned::pre_binned_bytes_for_http(node_config, &query).instrument(span1);
|
let fut = disk::binned::prebinned::pre_binned_bytes_for_http(node_config, &query).instrument(span1);
|
||||||
let ret = match fut.await {
|
let ret = match fut.await {
|
||||||
@@ -491,7 +492,7 @@ async fn prebinned(req: Request<Body>, node_config: &NodeConfigCached) -> Result
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn plain_events(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
async fn plain_events(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||||
info!("httpret plain_events headers: {:?}", req.headers());
|
debug!("httpret plain_events headers: {:?}", req.headers());
|
||||||
let accept_def = "";
|
let accept_def = "";
|
||||||
let accept = req
|
let accept = req
|
||||||
.headers()
|
.headers()
|
||||||
@@ -522,7 +523,7 @@ async fn plain_events(req: Request<Body>, node_config: &NodeConfigCached) -> Res
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||||
info!("httpret plain_events_binary req: {:?}", req);
|
debug!("httpret plain_events_binary req: {:?}", req);
|
||||||
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||||
let query = PlainEventsBinaryQuery::from_url(&url)?;
|
let query = PlainEventsBinaryQuery::from_url(&url)?;
|
||||||
let op = disk::channelexec::PlainEvents::new(
|
let op = disk::channelexec::PlainEvents::new(
|
||||||
@@ -538,7 +539,7 @@ async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached)
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||||
info!("httpret plain_events_json req: {:?}", req);
|
debug!("httpret plain_events_json req: {:?}", req);
|
||||||
let (head, _body) = req.into_parts();
|
let (head, _body) = req.into_parts();
|
||||||
let query = PlainEventsJsonQuery::from_request_head(&head)?;
|
let query = PlainEventsJsonQuery::from_request_head(&head)?;
|
||||||
let op = disk::channelexec::PlainEventsJson::new(
|
let op = disk::channelexec::PlainEventsJson::new(
|
||||||
|
|||||||
@@ -179,7 +179,7 @@ where
|
|||||||
fn aggregator(range: NanoRange, _x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
|
fn aggregator(range: NanoRange, _x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator {
|
||||||
// TODO remove output
|
// TODO remove output
|
||||||
if range.delta() > SEC * 0 {
|
if range.delta() > SEC * 0 {
|
||||||
netpod::log::info!(
|
netpod::log::debug!(
|
||||||
"TimeBinnableType for XBinnedScalarEvents aggregator() range {:?}",
|
"TimeBinnableType for XBinnedScalarEvents aggregator() range {:?}",
|
||||||
range
|
range
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ arrayref = "0.3.6"
|
|||||||
byteorder = "1.4.3"
|
byteorder = "1.4.3"
|
||||||
futures-core = "0.3.14"
|
futures-core = "0.3.14"
|
||||||
futures-util = "0.3.14"
|
futures-util = "0.3.14"
|
||||||
|
md-5 = "0.9.1"
|
||||||
err = { path = "../err" }
|
err = { path = "../err" }
|
||||||
netpod = { path = "../netpod" }
|
netpod = { path = "../netpod" }
|
||||||
taskrun = { path = "../taskrun" }
|
taskrun = { path = "../taskrun" }
|
||||||
|
|||||||
@@ -114,4 +114,30 @@ impl NetBuf {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn put_u8(&mut self, v: u8) -> Result<(), Error> {
|
||||||
|
type T = u8;
|
||||||
|
const TS: usize = std::mem::size_of::<T>();
|
||||||
|
self.rewind_if_needed();
|
||||||
|
if self.wrcap() < TS {
|
||||||
|
return Err(Error::with_msg_no_trace("not enough space"));
|
||||||
|
} else {
|
||||||
|
self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes());
|
||||||
|
self.wp += TS;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn put_u64(&mut self, v: u64) -> Result<(), Error> {
|
||||||
|
type T = u64;
|
||||||
|
const TS: usize = std::mem::size_of::<T>();
|
||||||
|
self.rewind_if_needed();
|
||||||
|
if self.wrcap() < TS {
|
||||||
|
return Err(Error::with_msg_no_trace("not enough space"));
|
||||||
|
} else {
|
||||||
|
self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes());
|
||||||
|
self.wp += TS;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,12 +1,14 @@
|
|||||||
|
use err::Error;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
use netpod::{Cluster, Database, Node, NodeConfig, NodeConfigCached};
|
use netpod::{Cluster, Database, Node, NodeConfig, NodeConfigCached};
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::iter::FromIterator;
|
use std::iter::FromIterator;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn ca_connect_1() {
|
fn ca_connect_1() {
|
||||||
taskrun::run(async {
|
let fut = async {
|
||||||
let it = vec![(String::new(), String::new())].into_iter();
|
let it = vec![(String::new(), String::new())].into_iter();
|
||||||
let pairs = BTreeMap::from_iter(it);
|
let pairs = BTreeMap::from_iter(it);
|
||||||
let node_config = NodeConfigCached {
|
let node_config = NodeConfigCached {
|
||||||
@@ -42,9 +44,13 @@ fn ca_connect_1() {
|
|||||||
};
|
};
|
||||||
let mut rx = super::ca::ca_connect_1(pairs, &node_config).await?;
|
let mut rx = super::ca::ca_connect_1(pairs, &node_config).await?;
|
||||||
while let Some(item) = rx.next().await {
|
while let Some(item) = rx.next().await {
|
||||||
info!("got next: {:?}", item);
|
debug!("got next: {:?}", item);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok::<_, Error>(())
|
||||||
})
|
};
|
||||||
.unwrap();
|
let fut = async move {
|
||||||
|
let ret = tokio::time::timeout(Duration::from_millis(4000), fut).await??;
|
||||||
|
Ok(ret)
|
||||||
|
};
|
||||||
|
taskrun::run(fut).unwrap();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,10 +7,13 @@ use crate::netbuf::NetBuf;
|
|||||||
use crate::netbuf::RP_REW_PT;
|
use crate::netbuf::RP_REW_PT;
|
||||||
use async_channel::Receiver;
|
use async_channel::Receiver;
|
||||||
use async_channel::Sender;
|
use async_channel::Sender;
|
||||||
|
#[allow(unused)]
|
||||||
|
use bytes::BufMut;
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use futures_util::{pin_mut, StreamExt};
|
use futures_util::{pin_mut, StreamExt};
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
|
use netpod::timeunits::SEC;
|
||||||
use serde_json::Value as JsVal;
|
use serde_json::Value as JsVal;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
@@ -20,7 +23,8 @@ use std::time::Duration;
|
|||||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
|
|
||||||
#[test]
|
//#[test]
|
||||||
|
#[allow(unused)]
|
||||||
fn test_listen() -> Result<(), Error> {
|
fn test_listen() -> Result<(), Error> {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
@@ -30,7 +34,8 @@ fn test_listen() -> Result<(), Error> {
|
|||||||
taskrun::run(fut)
|
taskrun::run(fut)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
//#[test]
|
||||||
|
#[allow(unused)]
|
||||||
fn test_service() -> Result<(), Error> {
|
fn test_service() -> Result<(), Error> {
|
||||||
//use std::time::Duration;
|
//use std::time::Duration;
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
@@ -39,7 +44,7 @@ fn test_service() -> Result<(), Error> {
|
|||||||
info!("accepting...");
|
info!("accepting...");
|
||||||
let (conn, remote) = sock.accept().await?;
|
let (conn, remote) = sock.accept().await?;
|
||||||
info!("new connection from {:?}", remote);
|
info!("new connection from {:?}", remote);
|
||||||
let mut zmtp = Zmtp::new(conn);
|
let mut zmtp = Zmtp::new(conn, SocketType::PUSH);
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
while let Some(item) = zmtp.next().await {
|
while let Some(item) = zmtp.next().await {
|
||||||
info!("item from {:?} {:?}", remote, item);
|
info!("item from {:?} {:?}", remote, item);
|
||||||
@@ -61,7 +66,7 @@ pub async fn zmtp_00() -> Result<(), Error> {
|
|||||||
|
|
||||||
pub async fn zmtp_client(addr: &str) -> Result<(), Error> {
|
pub async fn zmtp_client(addr: &str) -> Result<(), Error> {
|
||||||
let conn = tokio::net::TcpStream::connect(addr).await?;
|
let conn = tokio::net::TcpStream::connect(addr).await?;
|
||||||
let mut zmtp = Zmtp::new(conn);
|
let mut zmtp = Zmtp::new(conn, SocketType::PULL);
|
||||||
let mut i1 = 0;
|
let mut i1 = 0;
|
||||||
while let Some(item) = zmtp.next().await {
|
while let Some(item) = zmtp.next().await {
|
||||||
match item {
|
match item {
|
||||||
@@ -117,11 +122,87 @@ impl ConnState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct DummyData {}
|
pub enum SocketType {
|
||||||
|
PUSH,
|
||||||
|
PULL,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct DummyData {
|
||||||
|
ts: u64,
|
||||||
|
pulse: u64,
|
||||||
|
value: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DummyData {
|
||||||
|
fn make_zmtp_msg(&self) -> Result<ZmtpMessage, Error> {
|
||||||
|
let head_b = HeadB {
|
||||||
|
htype: "bsr_d-1.1".into(),
|
||||||
|
channels: vec![ChannelDesc {
|
||||||
|
name: "TESTCHAN".into(),
|
||||||
|
ty: "int64".into(),
|
||||||
|
shape: JsVal::Array(vec![JsVal::Number(serde_json::Number::from(1))]),
|
||||||
|
encoding: "little".into(),
|
||||||
|
}],
|
||||||
|
};
|
||||||
|
let hb = serde_json::to_vec(&head_b).unwrap();
|
||||||
|
use md5::Digest;
|
||||||
|
let mut h = md5::Md5::new();
|
||||||
|
h.update(&hb);
|
||||||
|
let mut md5hex = String::with_capacity(32);
|
||||||
|
for c in h.finalize() {
|
||||||
|
use fmt::Write;
|
||||||
|
write!(&mut md5hex, "{:02x}", c).unwrap();
|
||||||
|
}
|
||||||
|
let head_a = HeadA {
|
||||||
|
htype: "bsr_m-1.1".into(),
|
||||||
|
hash: md5hex,
|
||||||
|
pulse_id: serde_json::Number::from(self.pulse),
|
||||||
|
global_timestamp: GlobalTimestamp {
|
||||||
|
sec: self.ts / SEC,
|
||||||
|
ns: self.ts % SEC,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
// TODO write directly to output buffer.
|
||||||
|
let ha = serde_json::to_vec(&head_a).unwrap();
|
||||||
|
let hf = self.value.to_le_bytes().to_vec();
|
||||||
|
let hp = [(self.ts / SEC).to_be_bytes(), (self.ts % SEC).to_be_bytes()].concat();
|
||||||
|
let mut msg = ZmtpMessage { frames: vec![] };
|
||||||
|
let fr = ZmtpFrame {
|
||||||
|
msglen: 0,
|
||||||
|
has_more: false,
|
||||||
|
is_command: false,
|
||||||
|
data: ha,
|
||||||
|
};
|
||||||
|
msg.frames.push(fr);
|
||||||
|
let fr = ZmtpFrame {
|
||||||
|
msglen: 0,
|
||||||
|
has_more: false,
|
||||||
|
is_command: false,
|
||||||
|
data: hb,
|
||||||
|
};
|
||||||
|
msg.frames.push(fr);
|
||||||
|
let fr = ZmtpFrame {
|
||||||
|
msglen: 0,
|
||||||
|
has_more: false,
|
||||||
|
is_command: false,
|
||||||
|
data: hf,
|
||||||
|
};
|
||||||
|
msg.frames.push(fr);
|
||||||
|
let fr = ZmtpFrame {
|
||||||
|
msglen: 0,
|
||||||
|
has_more: false,
|
||||||
|
is_command: false,
|
||||||
|
data: hp,
|
||||||
|
};
|
||||||
|
msg.frames.push(fr);
|
||||||
|
Ok(msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct Zmtp {
|
struct Zmtp {
|
||||||
done: bool,
|
done: bool,
|
||||||
complete: bool,
|
complete: bool,
|
||||||
|
socket_type: SocketType,
|
||||||
conn: TcpStream,
|
conn: TcpStream,
|
||||||
conn_state: ConnState,
|
conn_state: ConnState,
|
||||||
buf: NetBuf,
|
buf: NetBuf,
|
||||||
@@ -137,7 +218,7 @@ struct Zmtp {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Zmtp {
|
impl Zmtp {
|
||||||
fn new(conn: TcpStream) -> Self {
|
fn new(conn: TcpStream, socket_type: SocketType) -> Self {
|
||||||
//conn.set_send_buffer_size(1024 * 64)?;
|
//conn.set_send_buffer_size(1024 * 64)?;
|
||||||
//conn.set_recv_buffer_size(1024 * 1024 * 4)?;
|
//conn.set_recv_buffer_size(1024 * 1024 * 4)?;
|
||||||
//info!("send_buffer_size {:8}", conn.send_buffer_size()?);
|
//info!("send_buffer_size {:8}", conn.send_buffer_size()?);
|
||||||
@@ -146,6 +227,7 @@ impl Zmtp {
|
|||||||
Self {
|
Self {
|
||||||
done: false,
|
done: false,
|
||||||
complete: false,
|
complete: false,
|
||||||
|
socket_type,
|
||||||
conn,
|
conn,
|
||||||
conn_state: ConnState::InitSend,
|
conn_state: ConnState::InitSend,
|
||||||
buf: NetBuf::new(),
|
buf: NetBuf::new(),
|
||||||
@@ -192,17 +274,33 @@ impl Zmtp {
|
|||||||
}
|
}
|
||||||
ConnState::InitRecv2 => {
|
ConnState::InitRecv2 => {
|
||||||
info!("parse_item InitRecv2");
|
info!("parse_item InitRecv2");
|
||||||
|
let msgrem = self.conn_state.need_min();
|
||||||
|
let ver_min = self.buf.read_u8()?;
|
||||||
|
let msgrem = msgrem - 1;
|
||||||
|
info!("Peer minor version {}", ver_min);
|
||||||
// TODO parse greeting remainder.. sec-scheme.
|
// TODO parse greeting remainder.. sec-scheme.
|
||||||
self.buf.adv(self.conn_state.need_min())?;
|
self.buf.adv(msgrem)?;
|
||||||
self.outbuf
|
match self.socket_type {
|
||||||
.put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PULL"[..])?;
|
SocketType::PUSH => {
|
||||||
|
self.outbuf
|
||||||
|
.put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PUSH"[..])?;
|
||||||
|
}
|
||||||
|
SocketType::PULL => {
|
||||||
|
self.outbuf
|
||||||
|
.put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PULL"[..])?;
|
||||||
|
}
|
||||||
|
}
|
||||||
self.out_enable = true;
|
self.out_enable = true;
|
||||||
self.conn_state = ConnState::ReadFrameFlags;
|
self.conn_state = ConnState::ReadFrameFlags;
|
||||||
let tx = self.data_tx.clone();
|
let tx = self.data_tx.clone();
|
||||||
let fut1 = async move {
|
let fut1 = async move {
|
||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(Duration::from_millis(1000)).await;
|
tokio::time::sleep(Duration::from_millis(1000)).await;
|
||||||
let dd = DummyData {};
|
let dd = DummyData {
|
||||||
|
ts: 420032002200887766,
|
||||||
|
pulse: 123123123123,
|
||||||
|
value: -777,
|
||||||
|
};
|
||||||
match tx.send(dd).await {
|
match tx.send(dd).await {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
info!("item send to channel");
|
info!("item send to channel");
|
||||||
@@ -304,6 +402,20 @@ impl ZmtpMessage {
|
|||||||
pub fn frames(&self) -> &Vec<ZmtpFrame> {
|
pub fn frames(&self) -> &Vec<ZmtpFrame> {
|
||||||
&self.frames
|
&self.frames
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn emit_to_buffer(&self, out: &mut NetBuf) -> Result<(), Error> {
|
||||||
|
let n = self.frames.len();
|
||||||
|
for (i, fr) in self.frames.iter().enumerate() {
|
||||||
|
let mut flags: u8 = 2;
|
||||||
|
if i < n - 1 {
|
||||||
|
flags |= 1;
|
||||||
|
}
|
||||||
|
out.put_u8(flags)?;
|
||||||
|
out.put_u64(fr.data().len() as u64)?;
|
||||||
|
out.put_slice(fr.data())?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ZmtpFrame {
|
pub struct ZmtpFrame {
|
||||||
@@ -349,11 +461,23 @@ enum Int<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Int<T> {
|
impl<T> Int<T> {
|
||||||
fn is_item(&self) -> bool {
|
fn item_count(&self) -> u32 {
|
||||||
if let Int::Item(_) = self {
|
if let Int::Item(_) = self {
|
||||||
true
|
1
|
||||||
} else {
|
} else {
|
||||||
false
|
0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> fmt::Debug for Int<T> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::NoWork => write!(f, "NoWork"),
|
||||||
|
Self::Pending => write!(f, "Pending"),
|
||||||
|
Self::Empty => write!(f, "Empty"),
|
||||||
|
Self::Item(_) => write!(f, "Item"),
|
||||||
|
Self::Done => write!(f, "Done"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -375,39 +499,33 @@ impl Stream for Zmtp {
|
|||||||
return Ready(None);
|
return Ready(None);
|
||||||
}
|
}
|
||||||
loop {
|
loop {
|
||||||
let have_item = false;
|
let mut item_count = 0;
|
||||||
let serialized: Int<()> = if self.out_enable && self.outbuf.wrcap() >= RP_REW_PT {
|
let serialized: Int<Result<(), Error>> = if self.out_enable && self.outbuf.wrcap() >= RP_REW_PT {
|
||||||
match self.data_rx.poll_next_unpin(cx) {
|
match self.data_rx.poll_next_unpin(cx) {
|
||||||
Ready(Some(k)) => {
|
Ready(Some(item)) => {
|
||||||
info!("item from channel, put to output buffer");
|
let msg = item.make_zmtp_msg().unwrap();
|
||||||
let head_a = HeadA {
|
match msg.emit_to_buffer(&mut self.outbuf) {
|
||||||
htype: "bsr_m-1.1".into(),
|
Ok(_) => Int::Empty,
|
||||||
// TODO hash definition?
|
Err(e) => {
|
||||||
hash: "b9e0916effc5a8a2f1977a9eb8beea63".into(),
|
self.done = true;
|
||||||
pulse_id: serde_json::Number::from(42424242),
|
Int::Item(Err(e))
|
||||||
global_timestamp: GlobalTimestamp {
|
}
|
||||||
sec: 1636401670,
|
}
|
||||||
ns: 12920856,
|
/*let mut msgs = Vec::with_capacity(1024 * 8);
|
||||||
},
|
msgs.put_u8(1 | 2);
|
||||||
};
|
msgs.put_u64(ha.len() as u64);
|
||||||
let head_b = HeadB {
|
msgs.put_slice(&ha);
|
||||||
htype: "bsr_d-1.1".into(),
|
msgs.put_u8(1 | 2);
|
||||||
channels: vec![ChannelDesc {
|
msgs.put_u64(hb.len() as u64);
|
||||||
name: "TESTCHAN".into(),
|
msgs.put_slice(&hb);
|
||||||
ty: "int64".into(),
|
msgs.put_u8(1 | 2);
|
||||||
shape: JsVal::Array(vec![JsVal::Number(serde_json::Number::from(1))]),
|
msgs.put_u64(hf.len() as u64);
|
||||||
encoding: "little".into(),
|
msgs.put_slice(&hf);
|
||||||
}],
|
msgs.put_u8(2);
|
||||||
};
|
msgs.put_u64(hp.len() as u64);
|
||||||
let ha = serde_json::to_vec(&head_a).unwrap();
|
msgs.put_slice(&hp);
|
||||||
let hb = serde_json::to_vec(&head_b).unwrap();
|
self.outbuf.put_slice(&msgs).unwrap();
|
||||||
let hf = 23478236u64.to_le_bytes();
|
Int::Empty*/
|
||||||
let hp = 13131313u64.to_le_bytes();
|
|
||||||
self.outbuf.put_slice(&ha).unwrap();
|
|
||||||
self.outbuf.put_slice(&hb).unwrap();
|
|
||||||
self.outbuf.put_slice(&hf).unwrap();
|
|
||||||
self.outbuf.put_slice(&hp).unwrap();
|
|
||||||
Int::Empty
|
|
||||||
}
|
}
|
||||||
Ready(None) => Int::Done,
|
Ready(None) => Int::Done,
|
||||||
Pending => Int::Pending,
|
Pending => Int::Pending,
|
||||||
@@ -415,8 +533,10 @@ impl Stream for Zmtp {
|
|||||||
} else {
|
} else {
|
||||||
Int::NoWork
|
Int::NoWork
|
||||||
};
|
};
|
||||||
let have_item = have_item | serialized.is_item();
|
item_count += serialized.item_count();
|
||||||
let write = if self.outbuf.len() > 0 {
|
let write: Int<Result<(), _>> = if item_count > 0 {
|
||||||
|
Int::NoWork
|
||||||
|
} else if self.outbuf.len() > 0 {
|
||||||
let (b, w) = self.outbuf_conn();
|
let (b, w) = self.outbuf_conn();
|
||||||
pin_mut!(w);
|
pin_mut!(w);
|
||||||
match w.poll_write(cx, b) {
|
match w.poll_write(cx, b) {
|
||||||
@@ -427,17 +547,24 @@ impl Stream for Zmtp {
|
|||||||
self.outbuf.rewind_if_needed();
|
self.outbuf.rewind_if_needed();
|
||||||
Int::Empty
|
Int::Empty
|
||||||
}
|
}
|
||||||
Err(e) => Int::Item(Err::<(), _>(e)),
|
Err(e) => {
|
||||||
|
error!("advance error {:?}", e);
|
||||||
|
Int::Item(Err(e))
|
||||||
|
}
|
||||||
},
|
},
|
||||||
Err(e) => Int::Item(Err(e.into())),
|
Err(e) => {
|
||||||
|
error!("output write error {:?}", e);
|
||||||
|
Int::Item(Err(e.into()))
|
||||||
|
}
|
||||||
},
|
},
|
||||||
Pending => Int::Pending,
|
Pending => Int::Pending,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Int::NoWork
|
Int::NoWork
|
||||||
};
|
};
|
||||||
let have_item = have_item | write.is_item();
|
info!("write result: {:?} {}", write, self.outbuf.len());
|
||||||
let read: Int<Result<(), _>> = if have_item || self.inp_eof {
|
item_count += write.item_count();
|
||||||
|
let read: Int<Result<(), _>> = if item_count > 0 || self.inp_eof {
|
||||||
Int::NoWork
|
Int::NoWork
|
||||||
} else {
|
} else {
|
||||||
if self.buf.cap() < self.conn_state.need_min() {
|
if self.buf.cap() < self.conn_state.need_min() {
|
||||||
@@ -480,8 +607,8 @@ impl Stream for Zmtp {
|
|||||||
Int::NoWork
|
Int::NoWork
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let have_item = have_item | read.is_item();
|
item_count += read.item_count();
|
||||||
let parsed = if have_item || self.buf.len() < self.conn_state.need_min() {
|
let parsed = if item_count > 0 || self.buf.len() < self.conn_state.need_min() {
|
||||||
Int::NoWork
|
Int::NoWork
|
||||||
} else {
|
} else {
|
||||||
match self.parse_item() {
|
match self.parse_item() {
|
||||||
@@ -492,43 +619,63 @@ impl Stream for Zmtp {
|
|||||||
Err(e) => Int::Item(Err(e)),
|
Err(e) => Int::Item(Err(e)),
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let _have_item = have_item | parsed.is_item();
|
item_count += parsed.item_count();
|
||||||
|
let _ = item_count;
|
||||||
{
|
{
|
||||||
use Int::*;
|
use Int::*;
|
||||||
match (write, read, parsed) {
|
match (serialized, write, read, parsed) {
|
||||||
(Item(_), Item(_), _) => panic!(),
|
(NoWork | Done, NoWork | Done, NoWork | Done, NoWork | Done) => {
|
||||||
(Item(_), _, Item(_)) => panic!(),
|
|
||||||
(_, Item(_), Item(_)) => panic!(),
|
|
||||||
(NoWork | Done, NoWork | Done, NoWork | Done) => {
|
|
||||||
warn!("all NoWork or Done");
|
warn!("all NoWork or Done");
|
||||||
break Poll::Pending;
|
break Poll::Pending;
|
||||||
}
|
}
|
||||||
(_, Item(Err(e)), _) => {
|
(Item(Err(e)), _, _, _) => {
|
||||||
self.done = true;
|
self.done = true;
|
||||||
break Poll::Ready(Some(Err(e)));
|
break Poll::Ready(Some(Err(e)));
|
||||||
}
|
}
|
||||||
(_, _, Item(Err(e))) => {
|
(_, Item(Err(e)), _, _) => {
|
||||||
self.done = true;
|
self.done = true;
|
||||||
break Poll::Ready(Some(Err(e)));
|
break Poll::Ready(Some(Err(e)));
|
||||||
}
|
}
|
||||||
(Item(_), _, _) => {
|
(_, _, Item(Err(e)), _) => {
|
||||||
|
self.done = true;
|
||||||
|
break Poll::Ready(Some(Err(e)));
|
||||||
|
}
|
||||||
|
(_, _, _, Item(Err(e))) => {
|
||||||
|
self.done = true;
|
||||||
|
break Poll::Ready(Some(Err(e)));
|
||||||
|
}
|
||||||
|
(Item(_), _, _, _) => {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
(_, Item(Ok(_)), _) => {
|
(_, Item(_), _, _) => {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
(_, _, Item(Ok(item))) => {
|
(_, _, Item(_), _) => {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
(_, _, _, Item(Ok(item))) => {
|
||||||
break Poll::Ready(Some(Ok(item)));
|
break Poll::Ready(Some(Ok(item)));
|
||||||
}
|
}
|
||||||
(Empty, _, _) => continue,
|
(Empty, _, _, _) => continue,
|
||||||
(_, Empty, _) => continue,
|
(_, Empty, _, _) => continue,
|
||||||
(_, _, Empty) => continue,
|
(_, _, Empty, _) => continue,
|
||||||
|
(_, _, _, Empty) => continue,
|
||||||
#[allow(unreachable_patterns)]
|
#[allow(unreachable_patterns)]
|
||||||
(Pending, Pending | NoWork | Done, Pending | NoWork | Done) => break Poll::Pending,
|
(Pending, Pending | NoWork | Done, Pending | NoWork | Done, Pending | NoWork | Done) => {
|
||||||
|
break Poll::Pending
|
||||||
|
}
|
||||||
#[allow(unreachable_patterns)]
|
#[allow(unreachable_patterns)]
|
||||||
(Pending | NoWork | Done, Pending, Pending | NoWork | Done) => break Poll::Pending,
|
(Pending | NoWork | Done, Pending, Pending | NoWork | Done, Pending | NoWork | Done) => {
|
||||||
|
break Poll::Pending
|
||||||
|
}
|
||||||
#[allow(unreachable_patterns)]
|
#[allow(unreachable_patterns)]
|
||||||
(Pending | NoWork | Done, Pending | NoWork | Done, Pending) => break Poll::Pending,
|
(Pending | NoWork | Done, Pending | NoWork | Done, Pending, Pending | NoWork | Done) => {
|
||||||
|
break Poll::Pending
|
||||||
|
}
|
||||||
|
#[allow(unreachable_patterns)]
|
||||||
|
(Pending | NoWork | Done, Pending | NoWork | Done, Pending | NoWork | Done, Pending) => {
|
||||||
|
break Poll::Pending
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -134,6 +134,22 @@ impl ScalarType {
|
|||||||
BOOL => 0,
|
BOOL => 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn to_api3proto(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
ScalarType::U8 => "uint8",
|
||||||
|
ScalarType::U16 => "uint16",
|
||||||
|
ScalarType::U32 => "uint32",
|
||||||
|
ScalarType::U64 => "uint64",
|
||||||
|
ScalarType::I8 => "int8",
|
||||||
|
ScalarType::I16 => "int16",
|
||||||
|
ScalarType::I32 => "int32",
|
||||||
|
ScalarType::I64 => "int64",
|
||||||
|
ScalarType::F32 => "float32",
|
||||||
|
ScalarType::F64 => "float64",
|
||||||
|
ScalarType::BOOL => "bool",
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
|||||||
@@ -115,7 +115,7 @@ async fn events_conn_handler_inner_try(
|
|||||||
return Err((Error::with_msg("json parse error"), netout))?;
|
return Err((Error::with_msg("json parse error"), netout))?;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
info!("--- nodenet::conn got query -------------------\nevq {:?}", evq);
|
debug!("--- nodenet::conn got query -------------------\nevq {:?}", evq);
|
||||||
|
|
||||||
let mut p1: Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>> =
|
let mut p1: Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>> =
|
||||||
if let Some(aa) = &node_config.node.channel_archiver {
|
if let Some(aa) = &node_config.node.channel_archiver {
|
||||||
@@ -142,7 +142,6 @@ async fn events_conn_handler_inner_try(
|
|||||||
};
|
};
|
||||||
let mut buf_len_histo = HistoLog2::new(5);
|
let mut buf_len_histo = HistoLog2::new(5);
|
||||||
while let Some(item) = p1.next().await {
|
while let Some(item) = p1.next().await {
|
||||||
//info!("conn.rs encode frame typeid {:x}", item.typeid());
|
|
||||||
let item = item.make_frame();
|
let item = item.make_frame();
|
||||||
match item {
|
match item {
|
||||||
Ok(buf) => {
|
Ok(buf) => {
|
||||||
@@ -166,6 +165,6 @@ async fn events_conn_handler_inner_try(
|
|||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(e) => return Err((e, netout))?,
|
Err(e) => return Err((e, netout))?,
|
||||||
}
|
}
|
||||||
info!("events_conn_handler_inner_try buf_len_histo: {:?}", buf_len_histo);
|
debug!("events_conn_handler_inner_try buf_len_histo: {:?}", buf_len_histo);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user