diff --git a/daqbuffer/Cargo.toml b/daqbuffer/Cargo.toml index abf28fc..c388a87 100644 --- a/daqbuffer/Cargo.toml +++ b/daqbuffer/Cargo.toml @@ -30,4 +30,3 @@ netpod = { path = "../netpod" } #httpret = { path = "../httpret" } disk = { path = "../disk" } daqbufp2 = { path = "../daqbufp2" } -netfetch = { path = "../netfetch" } diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 8f85ab3..116d0f3 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -92,9 +92,6 @@ async fn go() -> Result<(), Error> { SubCmd::GenerateTestData => { disk::gen::gen_test_data().await?; } - SubCmd::Zmtp(zmtp) => { - netfetch::zmtp::zmtp_client(&zmtp.addr).await?; - } SubCmd::Logappend(k) => { let jh = tokio::task::spawn_blocking(move || { taskrun::append::append(&k.dir, std::io::stdin()).unwrap(); diff --git a/daqbuffer/src/cli.rs b/daqbuffer/src/cli.rs index 93b9572..182c58e 100644 --- a/daqbuffer/src/cli.rs +++ b/daqbuffer/src/cli.rs @@ -15,7 +15,6 @@ pub enum SubCmd { Proxy(Proxy), Client(Client), GenerateTestData, - Zmtp(Zmtp), Logappend(Logappend), Test, } @@ -74,12 +73,6 @@ pub struct BinnedClient { pub disk_stats_every_kb: u32, } -#[derive(Debug, Parser)] -pub struct Zmtp { - #[clap(long)] - pub addr: String, -} - #[derive(Debug, Parser)] pub struct Logappend { #[clap(long)] diff --git a/err/src/lib.rs b/err/src/lib.rs index 4a43ce0..1b8f6c1 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -208,6 +208,16 @@ where } } +pub trait ToErr { + fn to_err(self) -> Error; +} + +impl From for Error { + fn from(k: T) -> Self { + k.to_err() + } +} + impl From for Error { fn from(k: PublicError) -> Self { Self { diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 621b1db..8ecbd78 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -29,7 +29,6 @@ dbconn = { path = "../dbconn" } disk = { path = "../disk" } items = { path = "../items" } parse = { path = "../parse" } -netfetch = { path = "../netfetch" } archapp_wrap = { path = "../archapp_wrap" } nodenet = { path = "../nodenet" } commonio = { path = "../commonio" } diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 764dd42..72b8424 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -205,8 +205,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> if req.method() == Method::GET { let ret = serde_json::json!({ "data_api_version": { - "major": 4, - "minor": 0, + "major": 4u32, + "minor": 0u32, }, }); Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) @@ -279,12 +279,6 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/4/ca_connect_1" { - if req.method() == Method::GET { - Ok(ca_connect_1(req, &node_config).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } } else if path == "/api/4/archapp/files/scan/msgs" { if req.method() == Method::GET { Ok(archapp_scan_files(req, &node_config).await?) @@ -922,22 +916,6 @@ pub fn status_board() -> Result, Error> { } } -pub async fn ca_connect_1(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - let url = Url::parse(&format!("dummy:{}", req.uri()))?; - let pairs = get_url_query_pairs(&url); - let res = netfetch::ca::ca_connect_1(pairs, node_config).await?; - let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, APP_JSON_LINES) - .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) { - Ok(mut item) => { - item.push('\n'); - Ok(item) - } - Err(e) => Err(e), - })))?; - Ok(ret) -} - pub async fn archapp_scan_files(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let pairs = get_url_query_pairs(&url); diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 602906b..d5e11ed 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -454,7 +454,52 @@ pub async fn proxy_api1_single_backend_query( _req: Request, _proxy_config: &ProxyConfig, ) -> Result, Error> { - panic!() + // TODO + /* + if let Some(back) = proxy_config.backends_event_download.first() { + let is_tls = req + .uri() + .scheme() + .ok_or_else(|| Error::with_msg_no_trace("no uri scheme"))? + == &http::uri::Scheme::HTTPS; + let bld = Request::builder().method(req.method()); + let bld = bld.uri(req.uri()); + // TODO to proxy events over multiple backends, we also have to concat results from different backends. + // TODO Carry on needed headers (but should not simply append all) + for (k, v) in req.headers() { + bld.header(k, v); + } + { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut hasher = DefaultHasher::new(); + proxy_config.name.hash(&mut hasher); + let mid = hasher.finish(); + bld.header(format!("proxy-mark-{mid:0x}"), proxy_config.name); + } + let body_data = hyper::body::to_bytes(req.into_body()).await?; + let reqout = bld.body(Body::from(body_data))?; + let resfut = { + use hyper::Client; + if is_tls { + let https = HttpsConnector::new(); + let client = Client::builder().build::<_, Body>(https); + let req = client.request(reqout); + let req = Box::pin(req) as Pin, hyper::Error>> + Send>>; + req + } else { + let client = Client::new(); + let req = client.request(reqout); + let req = Box::pin(req) as _; + req + } + }; + resfut.timeout(); + } else { + Err(Error::with_msg_no_trace(format!("no api1 event backend configured"))) + } + */ + todo!() } pub async fn proxy_single_backend_query( diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml deleted file mode 100644 index 251d5a2..0000000 --- a/netfetch/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "netfetch" -version = "0.0.1-a.0" -authors = ["Dominik Werder "] -edition = "2021" - -[lib] -path = "src/netfetch.rs" - -[dependencies] -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -serde_cbor = "0.11.1" -tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } -tokio-stream = {version = "0.1.5", features = ["fs"]} -async-channel = "1.6" -bytes = "1.0.1" -arrayref = "0.3.6" -byteorder = "1.4.3" -futures-core = "0.3.14" -futures-util = "0.3.14" -md-5 = "0.9.1" -err = { path = "../err" } -netpod = { path = "../netpod" } -taskrun = { path = "../taskrun" } diff --git a/netfetch/src/bsread.rs b/netfetch/src/bsread.rs deleted file mode 100644 index 59acdea..0000000 --- a/netfetch/src/bsread.rs +++ /dev/null @@ -1,100 +0,0 @@ -use crate::zmtp::ZmtpMessage; -use err::Error; -#[allow(unused)] -use netpod::log::*; -use netpod::ByteOrder; -use netpod::ScalarType; -use netpod::Shape; -use serde::{Deserialize, Serialize}; -use serde_json::Value as JsVal; -use std::fmt; - -// TODO -pub struct ParseError { - pub err: Error, - pub msg: ZmtpMessage, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct GlobalTimestamp { - pub sec: u64, - pub ns: u64, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct ChannelDesc { - pub name: String, - #[serde(rename = "type")] - pub ty: String, - pub shape: JsVal, - pub encoding: String, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct HeadA { - pub htype: String, - pub hash: String, - pub pulse_id: serde_json::Number, - pub global_timestamp: GlobalTimestamp, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct HeadB { - pub htype: String, - pub channels: Vec, -} - -#[derive(Debug)] -pub struct BsreadMessage { - pub head_a: HeadA, - pub head_b: HeadB, - pub values: Vec>, -} - -pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result { - if msg.frames().len() < 3 { - return Err(Error::with_msg_no_trace("not enough frames for bsread")); - } - let head_a: HeadA = serde_json::from_slice(&msg.frames()[0].data())?; - let head_b: HeadB = serde_json::from_slice(&msg.frames()[1].data())?; - let mut values = vec![]; - if msg.frames().len() == head_b.channels.len() + 3 { - for (ch, fr) in head_b.channels.iter().zip(msg.frames()[2..].iter()) { - let sty = ScalarType::from_bsread_str(ch.ty.as_str())?; - let bo = ByteOrder::from_bsread_str(&ch.encoding)?; - let shape = Shape::from_bsread_jsval(&ch.shape)?; - match sty { - ScalarType::I64 => match &bo { - ByteOrder::LE => match &shape { - Shape::Scalar => { - assert_eq!(fr.data().len(), 8); - let v = i64::from_le_bytes(fr.data().try_into()?); - values.push(Box::new(v) as _); - } - Shape::Wave(_) => {} - Shape::Image(_, _) => {} - }, - _ => {} - }, - _ => {} - } - } - } - { - let fr = &msg.frames()[msg.frames().len() - 1]; - if fr.data().len() == 8 { - let pulse = u64::from_le_bytes(fr.data().try_into()?); - info!("pulse {}", pulse); - } - } - let ret = BsreadMessage { head_a, head_b, values }; - Ok(ret) -} - -pub struct BsreadCollector {} - -impl BsreadCollector { - pub fn new>(_addr: S) -> Self { - err::todoval() - } -} diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs deleted file mode 100644 index 44c073d..0000000 --- a/netfetch/src/ca.rs +++ /dev/null @@ -1,85 +0,0 @@ -use async_channel::{bounded, Receiver}; -use bytes::{BufMut, BytesMut}; -use err::{ErrStr, Error}; -use futures_util::FutureExt; -use netpod::NodeConfigCached; -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; - -#[derive(Debug, Serialize, Deserialize)] -pub struct Message { - cmd: u16, - payload_len: u16, - type_type: u16, - data_len: u16, -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum FetchItem { - Log(String), - Message(Message), -} - -pub async fn ca_connect_1( - _pairs: BTreeMap, - _node_config: &NodeConfigCached, -) -> Result>, Error> { - let (tx, rx) = bounded(16); - let tx2 = tx.clone(); - tokio::task::spawn( - async move { - let mut conn = tokio::net::TcpStream::connect("S30CB06-CVME-LLRF2.psi.ch:5064").await?; - let (mut inp, mut out) = conn.split(); - tx.send(Ok(FetchItem::Log(format!("connected")))).await.errstr()?; - let mut buf = [0; 64]; - - let mut b2 = BytesMut::with_capacity(128); - b2.put_u16(0x00); - b2.put_u16(0); - b2.put_u16(0); - b2.put_u16(0xb); - b2.put_u32(0); - b2.put_u32(0); - out.write_all(&b2).await?; - tx.send(Ok(FetchItem::Log(format!("written")))).await.errstr()?; - let n1 = inp.read(&mut buf).await?; - tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf)))) - .await - .errstr()?; - - // Search to get cid: - let chn = b"SATCB01-DBPM220:Y2"; - b2.clear(); - b2.put_u16(0x06); - b2.put_u16((16 + chn.len()) as u16); - b2.put_u16(0x00); - b2.put_u16(0x0b); - b2.put_u32(0x71803472); - b2.put_u32(0x71803472); - b2.put_slice(chn); - out.write_all(&b2).await?; - tx.send(Ok(FetchItem::Log(format!("written")))).await.errstr()?; - let n1 = inp.read(&mut buf).await?; - tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf)))) - .await - .errstr()?; - - Ok::<_, Error>(()) - } - .then({ - move |item| async move { - match item { - Ok(_) => {} - Err(e) => { - tx2.send(Ok(FetchItem::Log(format!("Seeing error: {:?}", e)))) - .await - .errstr()?; - } - } - Ok::<_, Error>(()) - } - }), - ); - Ok(rx) -} diff --git a/netfetch/src/netbuf.rs b/netfetch/src/netbuf.rs deleted file mode 100644 index 1eca692..0000000 --- a/netfetch/src/netbuf.rs +++ /dev/null @@ -1,143 +0,0 @@ -use err::Error; -use tokio::io::ReadBuf; - -pub const BUFCAP: usize = 1024 * 128; -pub const RP_REW_PT: usize = 1024 * 64; - -pub struct NetBuf { - buf: Vec, - wp: usize, - rp: usize, -} - -impl NetBuf { - pub fn new() -> Self { - Self { - buf: vec![0; BUFCAP], - wp: 0, - rp: 0, - } - } - - pub fn len(&self) -> usize { - self.wp - self.rp - } - - pub fn cap(&self) -> usize { - self.buf.len() - } - - pub fn wrcap(&self) -> usize { - self.buf.len() - self.wp - } - - pub fn data(&self) -> &[u8] { - &self.buf[self.rp..self.wp] - } - - pub fn adv(&mut self, x: usize) -> Result<(), Error> { - if self.len() < x { - return Err(Error::with_msg_no_trace("not enough bytes")); - } else { - self.rp += x; - Ok(()) - } - } - - pub fn wpadv(&mut self, x: usize) -> Result<(), Error> { - if self.wrcap() < x { - return Err(Error::with_msg_no_trace("not enough space")); - } else { - self.wp += x; - Ok(()) - } - } - - pub fn read_u8(&mut self) -> Result { - type T = u8; - const TS: usize = std::mem::size_of::(); - if self.len() < TS { - return Err(Error::with_msg_no_trace("not enough bytes")); - } else { - let val = self.buf[self.rp]; - self.rp += TS; - Ok(val) - } - } - - pub fn read_u64(&mut self) -> Result { - type T = u64; - const TS: usize = std::mem::size_of::(); - if self.len() < TS { - return Err(Error::with_msg_no_trace("not enough bytes")); - } else { - let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); - self.rp += TS; - Ok(val) - } - } - - pub fn read_bytes(&mut self, n: usize) -> Result<&[u8], Error> { - if self.len() < n { - return Err(Error::with_msg_no_trace("not enough bytes")); - } else { - let val = self.buf[self.rp..self.rp + n].as_ref(); - self.rp += n; - Ok(val) - } - } - - pub fn read_buf_for_fill(&mut self) -> ReadBuf { - self.rewind_if_needed(); - let read_buf = ReadBuf::new(&mut self.buf[self.wp..]); - read_buf - } - - pub fn rewind_if_needed(&mut self) { - if self.rp != 0 && self.rp == self.wp { - self.rp = 0; - self.wp = 0; - } else if self.rp > RP_REW_PT { - self.buf.copy_within(self.rp..self.wp, 0); - self.wp -= self.rp; - self.rp = 0; - } - } - - pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> { - self.rewind_if_needed(); - if self.wrcap() < buf.len() { - return Err(Error::with_msg_no_trace("not enough space")); - } else { - self.buf[self.wp..self.wp + buf.len()].copy_from_slice(buf); - self.wp += buf.len(); - Ok(()) - } - } - - pub fn put_u8(&mut self, v: u8) -> Result<(), Error> { - type T = u8; - const TS: usize = std::mem::size_of::(); - self.rewind_if_needed(); - if self.wrcap() < TS { - return Err(Error::with_msg_no_trace("not enough space")); - } else { - self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); - self.wp += TS; - Ok(()) - } - } - - pub fn put_u64(&mut self, v: u64) -> Result<(), Error> { - type T = u64; - const TS: usize = std::mem::size_of::(); - self.rewind_if_needed(); - if self.wrcap() < TS { - return Err(Error::with_msg_no_trace("not enough space")); - } else { - self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); - self.wp += TS; - Ok(()) - } - } -} diff --git a/netfetch/src/netfetch.rs b/netfetch/src/netfetch.rs deleted file mode 100644 index 983e103..0000000 --- a/netfetch/src/netfetch.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub mod bsread; -pub mod ca; -pub mod netbuf; -#[cfg(test)] -pub mod test; -pub mod zmtp; diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs deleted file mode 100644 index 09b4e32..0000000 --- a/netfetch/src/test.rs +++ /dev/null @@ -1,60 +0,0 @@ -use err::Error; -use futures_util::StreamExt; -use netpod::{log::*, SfDatabuffer}; -use netpod::{Cluster, Database, Node, NodeConfig, NodeConfigCached}; -use std::collections::BTreeMap; -use std::iter::FromIterator; -use std::time::Duration; - -#[test] -fn ca_connect_1() { - let fut = async { - let it = vec![(String::new(), String::new())].into_iter(); - let pairs = BTreeMap::from_iter(it); - let node_config = NodeConfigCached { - node: Node { - host: "".into(), - port: 123, - port_raw: 123, - cache_base_path: "".into(), - listen: "".into(), - sf_databuffer: Some(SfDatabuffer { - data_base_path: "".into(), - ksprefix: "".into(), - splits: None, - }), - archiver_appliance: None, - channel_archiver: None, - }, - node_config: NodeConfig { - name: "".into(), - cluster: Cluster { - backend: "".into(), - nodes: vec![], - database: Database { - host: "".into(), - name: "".into(), - user: "".into(), - pass: "".into(), - }, - run_map_pulse_task: false, - is_central_storage: false, - file_io_buffer_size: Default::default(), - }, - }, - ix: 0, - }; - let mut rx = super::ca::ca_connect_1(pairs, &node_config).await?; - while let Some(item) = rx.next().await { - debug!("got next: {:?}", item); - } - Ok::<_, Error>(()) - }; - let fut = async move { - let ret = tokio::time::timeout(Duration::from_millis(4000), fut) - .await - .map_err(Error::from_string)??; - Ok(ret) - }; - taskrun::run(fut).unwrap(); -} diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs deleted file mode 100644 index c3e9c69..0000000 --- a/netfetch/src/zmtp.rs +++ /dev/null @@ -1,683 +0,0 @@ -use crate::bsread::parse_zmtp_message; -use crate::bsread::ChannelDesc; -use crate::bsread::GlobalTimestamp; -use crate::bsread::HeadA; -use crate::bsread::HeadB; -use crate::netbuf::NetBuf; -use crate::netbuf::RP_REW_PT; -use async_channel::Receiver; -use async_channel::Sender; -#[allow(unused)] -use bytes::BufMut; -use err::Error; -use futures_core::Stream; -use futures_util::{pin_mut, StreamExt}; -use netpod::log::*; -use netpod::timeunits::SEC; -use serde_json::Value as JsVal; -use std::fmt; -use std::mem; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio::net::TcpStream; - -//#[test] -#[allow(unused)] -fn test_listen() -> Result<(), Error> { - use std::time::Duration; - let fut = async move { - let _ = tokio::time::timeout(Duration::from_millis(16000), zmtp_client("camtest:9999")).await; - Ok::<_, Error>(()) - }; - taskrun::run(fut) -} - -//#[test] -#[allow(unused)] -fn test_service() -> Result<(), Error> { - //use std::time::Duration; - let fut = async move { - let sock = tokio::net::TcpListener::bind("0.0.0.0:9999").await?; - loop { - info!("accepting..."); - let (conn, remote) = sock.accept().await?; - info!("new connection from {:?}", remote); - let mut zmtp = Zmtp::new(conn, SocketType::PUSH); - let fut = async move { - while let Some(item) = zmtp.next().await { - info!("item from {:?} {:?}", remote, item); - } - Ok::<_, Error>(()) - }; - taskrun::spawn(fut); - } - //Ok::<_, Error>(()) - }; - taskrun::run(fut) -} - -pub async fn zmtp_00() -> Result<(), Error> { - let addr = "S10-CPPM-MOT0991:9999"; - zmtp_client(addr).await?; - Ok(()) -} - -pub async fn zmtp_client(addr: &str) -> Result<(), Error> { - let conn = tokio::net::TcpStream::connect(addr).await?; - let mut zmtp = Zmtp::new(conn, SocketType::PULL); - let mut i1 = 0; - while let Some(item) = zmtp.next().await { - match item { - Ok(ev) => match ev { - ZmtpEvent::ZmtpMessage(msg) => { - info!("Message frames: {}", msg.frames.len()); - match parse_zmtp_message(&msg) { - Ok(msg) => info!("{:?}", msg), - Err(e) => { - error!("{}", e); - for frame in &msg.frames { - info!("Frame: {:?}", frame); - } - } - } - } - }, - Err(e) => { - error!("{}", e); - return Err(e); - } - } - i1 += 1; - if i1 > 100 { - break; - } - } - Ok(()) -} - -enum ConnState { - InitSend, - InitRecv1, - InitRecv2, - ReadFrameFlags, - ReadFrameShort, - ReadFrameLong, - ReadFrameBody(usize), -} - -impl ConnState { - fn need_min(&self) -> usize { - use ConnState::*; - match self { - InitSend => 0, - InitRecv1 => 11, - InitRecv2 => 53, - ReadFrameFlags => 1, - ReadFrameShort => 1, - ReadFrameLong => 8, - ReadFrameBody(msglen) => *msglen, - } - } -} - -pub enum SocketType { - PUSH, - PULL, -} - -struct DummyData { - ts: u64, - pulse: u64, - value: i64, -} - -impl DummyData { - fn make_zmtp_msg(&self) -> Result { - let head_b = HeadB { - htype: "bsr_d-1.1".into(), - channels: vec![ChannelDesc { - name: "TESTCHAN".into(), - ty: "int64".into(), - shape: JsVal::Array(vec![JsVal::Number(serde_json::Number::from(1))]), - encoding: "little".into(), - }], - }; - let hb = serde_json::to_vec(&head_b).unwrap(); - use md5::Digest; - let mut h = md5::Md5::new(); - h.update(&hb); - let mut md5hex = String::with_capacity(32); - for c in h.finalize() { - use fmt::Write; - write!(&mut md5hex, "{:02x}", c).unwrap(); - } - let head_a = HeadA { - htype: "bsr_m-1.1".into(), - hash: md5hex, - pulse_id: serde_json::Number::from(self.pulse), - global_timestamp: GlobalTimestamp { - sec: self.ts / SEC, - ns: self.ts % SEC, - }, - }; - // TODO write directly to output buffer. - let ha = serde_json::to_vec(&head_a).unwrap(); - let hf = self.value.to_le_bytes().to_vec(); - let hp = [(self.ts / SEC).to_be_bytes(), (self.ts % SEC).to_be_bytes()].concat(); - let mut msg = ZmtpMessage { frames: vec![] }; - let fr = ZmtpFrame { - msglen: 0, - has_more: false, - is_command: false, - data: ha, - }; - msg.frames.push(fr); - let fr = ZmtpFrame { - msglen: 0, - has_more: false, - is_command: false, - data: hb, - }; - msg.frames.push(fr); - let fr = ZmtpFrame { - msglen: 0, - has_more: false, - is_command: false, - data: hf, - }; - msg.frames.push(fr); - let fr = ZmtpFrame { - msglen: 0, - has_more: false, - is_command: false, - data: hp, - }; - msg.frames.push(fr); - Ok(msg) - } -} - -struct Zmtp { - done: bool, - complete: bool, - socket_type: SocketType, - conn: TcpStream, - conn_state: ConnState, - buf: NetBuf, - outbuf: NetBuf, - out_enable: bool, - msglen: usize, - has_more: bool, - is_command: bool, - frames: Vec, - inp_eof: bool, - data_tx: Sender, - data_rx: Receiver, -} - -impl Zmtp { - fn new(conn: TcpStream, socket_type: SocketType) -> Self { - //conn.set_send_buffer_size(1024 * 64)?; - //conn.set_recv_buffer_size(1024 * 1024 * 4)?; - //info!("send_buffer_size {:8}", conn.send_buffer_size()?); - //info!("recv_buffer_size {:8}", conn.recv_buffer_size()?); - let (tx, rx) = async_channel::bounded(1); - Self { - done: false, - complete: false, - socket_type, - conn, - conn_state: ConnState::InitSend, - buf: NetBuf::new(), - outbuf: NetBuf::new(), - out_enable: false, - msglen: 0, - has_more: false, - is_command: false, - frames: vec![], - inp_eof: false, - data_tx: tx, - data_rx: rx, - } - } - - fn inpbuf_conn(&mut self) -> (&mut TcpStream, ReadBuf) { - (&mut self.conn, self.buf.read_buf_for_fill()) - } - - fn outbuf_conn(&mut self) -> (&[u8], &mut TcpStream) { - (self.outbuf.data(), &mut self.conn) - } - - fn parse_item(&mut self) -> Result, Error> { - match self.conn_state { - ConnState::InitSend => { - info!("parse_item InitSend"); - self.outbuf.put_slice(&[0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3])?; - self.conn_state = ConnState::InitRecv1; - Ok(None) - } - ConnState::InitRecv1 => { - self.buf.adv(10)?; - let ver = self.buf.read_u8()?; - info!("parse_item InitRecv1 major version {}", ver); - if ver != 3 { - return Err(Error::with_msg_no_trace(format!("bad version {}", ver))); - } - self.outbuf.put_slice(&[0, 0x4e, 0x55, 0x4c, 0x4c])?; - let a = vec![0; 48]; - self.outbuf.put_slice(&a)?; - self.conn_state = ConnState::InitRecv2; - Ok(None) - } - ConnState::InitRecv2 => { - info!("parse_item InitRecv2"); - let msgrem = self.conn_state.need_min(); - let ver_min = self.buf.read_u8()?; - let msgrem = msgrem - 1; - info!("Peer minor version {}", ver_min); - // TODO parse greeting remainder.. sec-scheme. - self.buf.adv(msgrem)?; - match self.socket_type { - SocketType::PUSH => { - self.outbuf - .put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PUSH"[..])?; - } - SocketType::PULL => { - self.outbuf - .put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PULL"[..])?; - } - } - self.out_enable = true; - self.conn_state = ConnState::ReadFrameFlags; - let tx = self.data_tx.clone(); - let fut1 = async move { - loop { - tokio::time::sleep(Duration::from_millis(1000)).await; - let dd = DummyData { - ts: 420032002200887766, - pulse: 123123123123, - value: -777, - }; - match tx.send(dd).await { - Ok(()) => { - info!("item send to channel"); - } - Err(_) => break, - } - } - }; - taskrun::spawn(fut1); - Ok(None) - } - ConnState::ReadFrameFlags => { - let flags = self.buf.read_u8()?; - let has_more = flags & 0x01 != 0; - let long_size = flags & 0x02 != 0; - let is_command = flags & 0x04 != 0; - self.has_more = has_more; - self.is_command = is_command; - trace!( - "parse_item ReadFrameFlags has_more {} long_size {} is_command {}", - has_more, - long_size, - is_command - ); - if is_command { - warn!("Got zmtp command frame"); - } - if false && is_command { - return Err(Error::with_msg_no_trace("got zmtp command frame")); - } - if long_size { - self.conn_state = ConnState::ReadFrameLong; - } else { - self.conn_state = ConnState::ReadFrameShort; - } - Ok(None) - } - ConnState::ReadFrameShort => { - self.msglen = self.buf.read_u8()? as usize; - trace!("parse_item ReadFrameShort msglen {}", self.msglen); - self.conn_state = ConnState::ReadFrameBody(self.msglen); - if self.msglen > 1024 * 64 { - return Err(Error::with_msg_no_trace(format!( - "larger msglen not yet supported {}", - self.msglen, - ))); - } - Ok(None) - } - ConnState::ReadFrameLong => { - self.msglen = self.buf.read_u64()? as usize; - trace!("parse_item ReadFrameShort msglen {}", self.msglen); - self.conn_state = ConnState::ReadFrameBody(self.msglen); - if self.msglen > 1024 * 64 { - return Err(Error::with_msg_no_trace(format!( - "larger msglen not yet supported {}", - self.msglen, - ))); - } - Ok(None) - } - ConnState::ReadFrameBody(msglen) => { - let data = self.buf.read_bytes(msglen)?.to_vec(); - self.msglen = 0; - if false { - let n1 = data.len().min(256); - let s = String::from_utf8_lossy(&data[..n1]); - trace!("parse_item ReadFrameBody msglen {} string {}", msglen, s); - } - self.conn_state = ConnState::ReadFrameFlags; - if !self.is_command { - let g = ZmtpFrame { - msglen: self.msglen, - has_more: self.has_more, - is_command: self.is_command, - data, - }; - self.frames.push(g); - } - if self.has_more { - Ok(None) - } else { - let g = ZmtpMessage { - frames: mem::replace(&mut self.frames, vec![]), - }; - Ok(Some(ZmtpEvent::ZmtpMessage(g))) - } - } - } - } -} - -#[derive(Debug)] -pub struct ZmtpMessage { - frames: Vec, -} - -impl ZmtpMessage { - pub fn frames(&self) -> &Vec { - &self.frames - } - - pub fn emit_to_buffer(&self, out: &mut NetBuf) -> Result<(), Error> { - let n = self.frames.len(); - for (i, fr) in self.frames.iter().enumerate() { - let mut flags: u8 = 2; - if i < n - 1 { - flags |= 1; - } - out.put_u8(flags)?; - out.put_u64(fr.data().len() as u64)?; - out.put_slice(fr.data())?; - } - Ok(()) - } -} - -pub struct ZmtpFrame { - msglen: usize, - has_more: bool, - is_command: bool, - data: Vec, -} - -impl ZmtpFrame { - pub fn data(&self) -> &[u8] { - &self.data - } -} - -impl fmt::Debug for ZmtpFrame { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let data = match String::from_utf8(self.data.clone()) { - Ok(s) => s - .chars() - .filter(|x| { - // - x.is_ascii_alphanumeric() || x.is_ascii_punctuation() || x.is_ascii_whitespace() - }) - .collect::(), - Err(_) => format!("Binary {{ len: {} }}", self.data.len()), - }; - f.debug_struct("ZmtpFrame") - .field("msglen", &self.msglen) - .field("has_more", &self.has_more) - .field("is_command", &self.is_command) - .field("data", &data) - .finish() - } -} - -enum Int { - NoWork, - Pending, - Empty, - Item(T), - Done, -} - -impl Int { - fn item_count(&self) -> u32 { - if let Int::Item(_) = self { - 1 - } else { - 0 - } - } -} - -impl fmt::Debug for Int { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::NoWork => write!(f, "NoWork"), - Self::Pending => write!(f, "Pending"), - Self::Empty => write!(f, "Empty"), - Self::Item(_) => write!(f, "Item"), - Self::Done => write!(f, "Done"), - } - } -} - -#[derive(Debug)] -enum ZmtpEvent { - ZmtpMessage(ZmtpMessage), -} - -impl Stream for Zmtp { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - if self.complete { - panic!("poll_next on complete") - } else if self.done { - self.complete = true; - return Ready(None); - } - loop { - let mut item_count = 0; - let serialized: Int> = if self.out_enable && self.outbuf.wrcap() >= RP_REW_PT { - match self.data_rx.poll_next_unpin(cx) { - Ready(Some(item)) => { - let msg = item.make_zmtp_msg().unwrap(); - match msg.emit_to_buffer(&mut self.outbuf) { - Ok(_) => Int::Empty, - Err(e) => { - self.done = true; - Int::Item(Err(e)) - } - } - /*let mut msgs = Vec::with_capacity(1024 * 8); - msgs.put_u8(1 | 2); - msgs.put_u64(ha.len() as u64); - msgs.put_slice(&ha); - msgs.put_u8(1 | 2); - msgs.put_u64(hb.len() as u64); - msgs.put_slice(&hb); - msgs.put_u8(1 | 2); - msgs.put_u64(hf.len() as u64); - msgs.put_slice(&hf); - msgs.put_u8(2); - msgs.put_u64(hp.len() as u64); - msgs.put_slice(&hp); - self.outbuf.put_slice(&msgs).unwrap(); - Int::Empty*/ - } - Ready(None) => Int::Done, - Pending => Int::Pending, - } - } else { - Int::NoWork - }; - item_count += serialized.item_count(); - let write: Int> = if item_count > 0 { - Int::NoWork - } else if self.outbuf.len() > 0 { - let (b, w) = self.outbuf_conn(); - pin_mut!(w); - match w.poll_write(cx, b) { - Ready(k) => match k { - Ok(k) => match self.outbuf.adv(k) { - Ok(()) => { - info!("sent {} bytes", k); - self.outbuf.rewind_if_needed(); - Int::Empty - } - Err(e) => { - error!("advance error {:?}", e); - Int::Item(Err(e)) - } - }, - Err(e) => { - error!("output write error {:?}", e); - Int::Item(Err(e.into())) - } - }, - Pending => Int::Pending, - } - } else { - Int::NoWork - }; - info!("write result: {:?} {}", write, self.outbuf.len()); - item_count += write.item_count(); - let read: Int> = if item_count > 0 || self.inp_eof { - Int::NoWork - } else { - if self.buf.cap() < self.conn_state.need_min() { - self.done = true; - let e = Error::with_msg_no_trace(format!( - "buffer too small for need_min {} {}", - self.buf.cap(), - self.conn_state.need_min() - )); - Int::Item(Err(e)) - } else if self.buf.len() < self.conn_state.need_min() { - let (w, mut rbuf) = self.inpbuf_conn(); - pin_mut!(w); - match w.poll_read(cx, &mut rbuf) { - Ready(k) => match k { - Ok(()) => { - let nf = rbuf.filled().len(); - if nf == 0 { - info!("EOF"); - self.inp_eof = true; - Int::Done - } else { - info!("received {} bytes", rbuf.filled().len()); - if false { - let t = rbuf.filled().len(); - let t = if t < 32 { t } else { 32 }; - info!("got data {:?}", &rbuf.filled()[0..t]); - } - match self.buf.wpadv(nf) { - Ok(()) => Int::Empty, - Err(e) => Int::Item(Err(e)), - } - } - } - Err(e) => Int::Item(Err(e.into())), - }, - Pending => Int::Pending, - } - } else { - Int::NoWork - } - }; - item_count += read.item_count(); - let parsed = if item_count > 0 || self.buf.len() < self.conn_state.need_min() { - Int::NoWork - } else { - match self.parse_item() { - Ok(k) => match k { - Some(k) => Int::Item(Ok(k)), - None => Int::Empty, - }, - Err(e) => Int::Item(Err(e)), - } - }; - item_count += parsed.item_count(); - let _ = item_count; - { - use Int::*; - match (serialized, write, read, parsed) { - (NoWork | Done, NoWork | Done, NoWork | Done, NoWork | Done) => { - warn!("all NoWork or Done"); - break Poll::Pending; - } - (Item(Err(e)), _, _, _) => { - self.done = true; - break Poll::Ready(Some(Err(e))); - } - (_, Item(Err(e)), _, _) => { - self.done = true; - break Poll::Ready(Some(Err(e))); - } - (_, _, Item(Err(e)), _) => { - self.done = true; - break Poll::Ready(Some(Err(e))); - } - (_, _, _, Item(Err(e))) => { - self.done = true; - break Poll::Ready(Some(Err(e))); - } - (Item(_), _, _, _) => { - continue; - } - (_, Item(_), _, _) => { - continue; - } - (_, _, Item(_), _) => { - continue; - } - (_, _, _, Item(Ok(item))) => { - break Poll::Ready(Some(Ok(item))); - } - (Empty, _, _, _) => continue, - (_, Empty, _, _) => continue, - (_, _, Empty, _) => continue, - (_, _, _, Empty) => continue, - #[allow(unreachable_patterns)] - (Pending, Pending | NoWork | Done, Pending | NoWork | Done, Pending | NoWork | Done) => { - break Poll::Pending - } - #[allow(unreachable_patterns)] - (Pending | NoWork | Done, Pending, Pending | NoWork | Done, Pending | NoWork | Done) => { - break Poll::Pending - } - #[allow(unreachable_patterns)] - (Pending | NoWork | Done, Pending | NoWork | Done, Pending, Pending | NoWork | Done) => { - break Poll::Pending - } - #[allow(unreachable_patterns)] - (Pending | NoWork | Done, Pending | NoWork | Done, Pending | NoWork | Done, Pending) => { - break Poll::Pending - } - } - }; - } - } -} diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index d0a0b44..6600abc 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1477,10 +1477,16 @@ pub struct ProxyConfig { pub name: String, pub listen: String, pub port: u16, + #[serde(default)] pub backends_status: Vec, + #[serde(default)] pub backends: Vec, + #[serde(default)] pub backends_pulse_map: Vec, + #[serde(default)] pub backends_search: Vec, + #[serde(default)] + pub backends_event_download: Vec, pub api_0_search_hosts: Option>, pub api_0_search_backends: Option>, }