From ba568c8850380bcb2de3dcb441a5640a3f78c768 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 27 Aug 2021 13:44:59 +0200 Subject: [PATCH] Receive source frames --- daqbuffer/Cargo.toml | 1 + daqbuffer/src/bin/daqbuffer.rs | 3 + daqbuffer/src/cli.rs | 7 + dbconn/src/search.rs | 1 + disk/src/cache.rs | 3 +- httpret/src/api1.rs | 28 ++- httpret/src/gather.rs | 26 +- httpret/src/proxy.rs | 107 +++++++- httpret/static/documentation/api4.html | 23 +- netfetch/src/zmtp.rs | 331 +++++++++++++++++++++++-- netpod/src/lib.rs | 4 + 11 files changed, 481 insertions(+), 53 deletions(-) diff --git a/daqbuffer/Cargo.toml b/daqbuffer/Cargo.toml index d641f98..b485b3c 100644 --- a/daqbuffer/Cargo.toml +++ b/daqbuffer/Cargo.toml @@ -30,3 +30,4 @@ netpod = { path = "../netpod" } #httpret = { path = "../httpret" } disk = { path = "../disk" } daqbufp2 = { path = "../daqbufp2" } +netfetch = { path = "../netfetch" } diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index f4b345b..9ecc623 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -94,6 +94,9 @@ async fn go() -> Result<(), Error> { SubCmd::GenerateTestData => { disk::gen::gen_test_data().await?; } + SubCmd::Zmtp(zmtp) => { + netfetch::zmtp::zmtp_client(&zmtp.addr).await?; + } } Ok(()) } diff --git a/daqbuffer/src/cli.rs b/daqbuffer/src/cli.rs index 94f7358..dd1699f 100644 --- a/daqbuffer/src/cli.rs +++ b/daqbuffer/src/cli.rs @@ -15,6 +15,7 @@ pub enum SubCmd { Proxy(Proxy), Client(Client), GenerateTestData, + Zmtp(Zmtp), } #[derive(Debug, Clap)] @@ -70,3 +71,9 @@ pub struct BinnedClient { #[clap(long, default_value = "1048576")] pub disk_stats_every_kb: u32, } + +#[derive(Debug, Clap)] +pub struct Zmtp { + #[clap(long)] + pub addr: String, +} diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index 8577128..f1ab47f 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -52,6 +52,7 @@ pub async fn search_channel( shape: shape, unit: row.get(5), description: row.get(6), + is_api_0: None, }; res.push(k); } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index efec22e..0fff4ea 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -229,10 +229,11 @@ pub async fn clear_cache_all(node_config: &NodeConfigCached, dry: bool) -> Resul } let mut dirs = VecDeque::new(); let mut stack = VecDeque::new(); - stack.push_front(node_config.node.data_base_path.join("cache")); + stack.push_front(node_config.node.cache_base_path.join("cache")); loop { match stack.pop_front() { Some(path) => { + info!("clear_cache_all try read dir {:?}", path); let mut rd = tokio::fs::read_dir(path).await?; while let Some(entry) = rd.next_entry().await? { let path = entry.path(); diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index a9b9214..afda07d 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -150,9 +150,17 @@ pub async fn channel_search_list_v1(req: Request, proxy_config: &ProxyConf .body(Body::from(serde_json::to_string(&res)?))?; Ok(res) }; - let ret = - gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, Duration::from_millis(3000)) - .await?; + let bodies = (0..urls.len()).into_iter().map(|_| None).collect(); + let ret = gather_get_json_generic( + http::Method::GET, + urls, + bodies, + tags, + nt, + ft, + Duration::from_millis(3000), + ) + .await?; Ok(ret) } else { Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) @@ -252,9 +260,17 @@ pub async fn channel_search_configs_v1( .body(Body::from(serde_json::to_string(&res)?))?; Ok(res) }; - let ret = - gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, Duration::from_millis(3000)) - .await?; + let bodies = (0..urls.len()).into_iter().map(|_| None).collect(); + let ret = gather_get_json_generic( + http::Method::GET, + urls, + bodies, + tags, + nt, + ft, + Duration::from_millis(3000), + ) + .await?; Ok(ret) } else { Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) diff --git a/httpret/src/gather.rs b/httpret/src/gather.rs index c07cb1d..c20bb87 100644 --- a/httpret/src/gather.rs +++ b/httpret/src/gather.rs @@ -170,7 +170,7 @@ pub struct SubRes { pub async fn gather_get_json_generic( method: http::Method, urls: Vec, - bodies: Option>, + bodies: Vec>, tags: Vec, nt: NT, ft: FT, @@ -181,11 +181,8 @@ where NT: Fn(Response) -> Pin> + Send>> + Send + Sync + Copy + 'static, FT: Fn(Vec>) -> Result, Error>, { + assert!(urls.len() == bodies.len()); assert!(urls.len() == tags.len()); - let bodies: Vec<_> = match bodies { - None => (0..urls.len()).into_iter().map(|_| Body::empty()).collect(), - Some(bodies) => bodies, - }; let spawned: Vec<_> = urls .into_iter() .zip(bodies.into_iter()) @@ -193,9 +190,22 @@ where .map(move |((url, body), tag)| { let url_str = url.as_str(); let is_tls = if url_str.starts_with("https://") { true } else { false }; - let req = Request::builder().method(method.clone()).uri(url_str); - //let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name)); + let req = if body.is_some() { + Request::builder().method(Method::POST).uri(url_str) + } else { + Request::builder().method(Method::GET).uri(url_str) + }; let req = req.header(http::header::ACCEPT, APP_JSON); + let req = if body.is_some() { + req.header(http::header::CONTENT_TYPE, APP_JSON) + } else { + req + }; + //let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name)); + let body = match body { + None => Body::empty(), + Some(body) => body, + }; let req = req.body(body); let task = tokio::spawn(async move { select! { @@ -248,7 +258,7 @@ mod test { let fut = gather_get_json_generic( hyper::Method::GET, vec![], - None, + vec![], vec![], |_res| { let fut = async { Ok(()) }; diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 237650a..b6c862d 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -12,8 +12,8 @@ use hyper::{Body, Request, Response, Server}; use itertools::Itertools; use netpod::log::*; use netpod::{ - AppendToUrl, ChannelConfigQuery, ChannelSearchQuery, ChannelSearchResult, FromUrl, HasBackend, HasTimeout, - ProxyConfig, APP_JSON, + AppendToUrl, ChannelConfigQuery, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, FromUrl, + HasBackend, HasTimeout, ProxyConfig, APP_JSON, }; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; @@ -59,6 +59,7 @@ async fn proxy_http_service(req: Request, proxy_config: ProxyConfig) -> Re async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { let uri = req.uri().clone(); let path = uri.path(); + let distri_pre = "/distri/"; if path == "/api/1/channels" { Ok(channel_search_list_v1(req, proxy_config).await?) } else if path == "/api/1/channels/config" { @@ -93,10 +94,15 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path.starts_with("/distri/daqbuffer") { + } else if path.starts_with(distri_pre) + && path + .chars() + .all(|c| c.is_ascii_alphanumeric() || ['/', '.', '-', '_'].contains(&c)) + && !path.contains("..") + { if req.method() == Method::GET { let s = FileStream { - file: File::open("/opt/distri/daqbuffer").await?, + file: File::open(format!("/opt/distri/{}", &path[distri_pre.len()..])).await?, }; Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?) } else { @@ -162,7 +168,8 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R if v == APP_JSON { let url = Url::parse(&format!("dummy:{}", head.uri))?; let query = ChannelSearchQuery::from_url(&url)?; - let urls = proxy_config + let mut bodies = vec![]; + let mut urls = proxy_config .search_hosts .iter() .map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) { @@ -174,16 +181,83 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R }) .fold_ok(vec![], |mut a, x| { a.push(x); + bodies.push(None); a })?; + if let (Some(hosts), Some(backends)) = + (&proxy_config.api_0_search_hosts, &proxy_config.api_0_search_backends) + { + #[derive(Serialize)] + struct QueryApi0 { + backends: Vec, + regex: String, + #[serde(rename = "sourceRegex")] + source_regex: String, + ordering: String, + reload: bool, + }; + hosts.iter().zip(backends.iter()).for_each(|(sh, back)| { + let url = Url::parse(&format!("{}/channels/config", sh)).unwrap(); + urls.push(url); + let q = QueryApi0 { + backends: vec![back.into()], + ordering: "asc".into(), + reload: false, + regex: query.name_regex.clone(), + source_regex: query.source_regex.clone(), + }; + let qs = serde_json::to_string(&q).unwrap(); + bodies.push(Some(Body::from(qs))); + }); + } let tags = urls.iter().map(|k| k.to_string()).collect(); let nt = |res| { let fut = async { let body = hyper::body::to_bytes(res).await?; - info!("got a result {:?}", body); - let res: ChannelSearchResult = match serde_json::from_slice(&body) { + //info!("got a result {:?}", body); + let res: ChannelSearchResult = match serde_json::from_slice::(&body) { Ok(k) => k, - Err(_) => ChannelSearchResult { channels: vec![] }, + Err(_) => { + #[derive(Deserialize)] + struct ResItemApi0 { + name: String, + source: String, + backend: String, + #[serde(rename = "type")] + ty: String, + }; + #[derive(Deserialize)] + struct ResContApi0 { + backend: String, + channels: Vec, + }; + match serde_json::from_slice::>(&body) { + Ok(k) => { + let mut a = vec![]; + if let Some(g) = k.first() { + for c in &g.channels { + let mut z = ChannelSearchSingleResult { + backend: c.backend.clone(), + description: String::new(), + name: c.name.clone(), + shape: vec![], + source: c.source.clone(), + ty: c.ty.clone(), + unit: String::new(), + is_api_0: Some(true), + }; + a.push(z); + } + } + let ret = ChannelSearchResult { channels: a }; + ret + } + Err(_) => { + error!("Channel search response parse failed"); + ChannelSearchResult { channels: vec![] } + } + } + } }; Ok(res) }; @@ -202,9 +276,16 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R .body(Body::from(serde_json::to_string(&res)?))?; Ok(res) }; - let ret = - gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, Duration::from_millis(3000)) - .await?; + let ret = gather_get_json_generic( + http::Method::GET, + urls, + bodies, + tags, + nt, + ft, + Duration::from_millis(3000), + ) + .await?; Ok(ret) } else { Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) @@ -265,7 +346,9 @@ where return Err(Error::with_msg("no response from upstream")); } }; - let ret = gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, query.timeout()).await?; + let bodies = (0..urls.len()).into_iter().map(|_| None).collect(); + let ret = + gather_get_json_generic(http::Method::GET, urls, bodies, tags, nt, ft, query.timeout()).await?; Ok(ret) } else { Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html index d709589..5c34af7 100644 --- a/httpret/static/documentation/api4.html +++ b/httpret/static/documentation/api4.html @@ -8,13 +8,14 @@ -

Databuffer API 4 Documentation

-

Documented here are the endpoints for databuffer API 4. The endpoints of the "original" unversioned API is documented at - this location.

+

Documented here is the databuffer http api 4. The "original" unversioned api is documented at +this location.

+

In order to keep the api surface as small as possible in comparison to api 0, we add functionality on demand, +so please feel free to create some Jira ticket!

Timestamp format

@@ -72,13 +73,14 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/backends'

Search channel

Method: GET

URL: https://data-api.psi.ch/api/4/search/channel

-

Query parameters:

+

Query parameters: (all optional)

  • nameRegex (e.g. "LSCP.*6")
  • sourceRegex (e.g. "178:9999")
  • descriptionRegex (e.g. "celsius")

Request header: "Accept" must be "application/json"

+

Full channel list is long, so it's encouraged to provide a search string of some minimal length.

CURL example:

@@ -86,6 +88,9 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel
 

Example response:

+

Keys always present: name, backend.

+

Other keys are optional, it depends on the data found on disk: sometimes there is an empty string on disk, sometimes +that key is missing.

{
   "channels": [
     {
@@ -105,6 +110,16 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel
       "shape": [],
       "unit": "",
       "description": ""
+    },
+    {
+      "isApi0": true,
+      "name": "EXAMPLE-CHANNEL-FROM-API-0-BACKEND",
+      "backend": "twlha-databuffer",
+      "source": "tcp://.....",
+      "type": "int32",
+      "shape": [],
+      "unit": "",
+      "description": ""
     }
   ]
 }
diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index 8ba9785..b625a56 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -2,14 +2,14 @@ use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use netpod::log::*; +use std::fmt; use std::mem; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::{AsyncRead, ReadBuf}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; pub async fn zmtp_00() -> Result<(), Error> { - // PV:BSREADCONFIG let addr = "S10-CPPM-MOT0991:9999"; let conn = tokio::net::TcpStream::connect(addr).await?; let mut zmtp = Zmtp::new(conn); @@ -19,52 +19,339 @@ pub async fn zmtp_00() -> Result<(), Error> { Ok(()) } +pub async fn zmtp_client(addr: &str) -> Result<(), Error> { + let conn = tokio::net::TcpStream::connect(addr).await?; + let mut zmtp = Zmtp::new(conn); + while let Some(ev) = zmtp.next().await { + info!("got zmtp event: {:?}", ev); + } + Ok(()) +} + enum ConnState { - Init, + InitSend, + InitRecv1, + InitRecv2, + ReadFrameFlags, + ReadFrameShort, + ReadFrameLong, + ReadFrameBody, } struct Zmtp { + done: bool, conn: TcpStream, conn_state: ConnState, - buf1: Vec, + buf: NetBuf, + outbuf: NetBuf, need_min: usize, + msglen: usize, + has_more: bool, + is_command: bool, + frames: Vec, } impl Zmtp { fn new(conn: TcpStream) -> Self { + //conn.set_send_buffer_size(1024 * 64)?; + //conn.set_recv_buffer_size(1024 * 1024 * 4)?; + //info!("send_buffer_size {:8}", conn.send_buffer_size()?); + //info!("recv_buffer_size {:8}", conn.recv_buffer_size()?); Self { + done: false, conn, - conn_state: ConnState::Init, - buf1: vec![0; 1024], - need_min: 4, + conn_state: ConnState::InitSend, + buf: NetBuf::new(), + outbuf: NetBuf::new(), + need_min: 0, + msglen: 0, + has_more: false, + is_command: false, + frames: vec![], + } + } + + fn buf_conn(&mut self) -> (&mut TcpStream, ReadBuf) { + (&mut self.conn, self.buf.read_buf_for_fill()) + } + + fn outbuf_conn(&mut self) -> (&[u8], &mut TcpStream) { + let b = &self.outbuf.buf[self.outbuf.rp..self.outbuf.wp]; + let w = &mut self.conn; + (b, w) + } + + fn parse_item(&mut self) -> Result, Error> { + match self.conn_state { + ConnState::InitSend => { + info!("parse_item InitSend"); + // TODO allow to specify a minimum amount of needed space. + // TODO factor writing into the buffer in some way... + let mut b = self.outbuf.read_buf_for_fill(); + b.put_slice(&[0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3]); + self.outbuf.wp += b.filled().len(); + self.conn_state = ConnState::InitRecv1; + self.need_min = 11; + Ok(None) + } + ConnState::InitRecv1 => { + let ver = self.buf.buf[self.buf.rp + 10]; + self.buf.rp += self.need_min; + info!("parse_item InitRecv1 major version {}", ver); + if ver != 3 { + return Err(Error::with_msg_no_trace(format!("bad version {}", ver))); + } + let mut b = self.outbuf.read_buf_for_fill(); + b.put_slice(&[0, 0x4e, 0x55, 0x4c, 0x4c]); + let a = vec![0; 48]; + b.put_slice(&a); + self.outbuf.wp += b.filled().len(); + self.conn_state = ConnState::InitRecv2; + self.need_min = 53; + Ok(None) + } + ConnState::InitRecv2 => { + info!("parse_item InitRecv2"); + self.buf.rp += self.need_min; + let mut b = self.outbuf.read_buf_for_fill(); + b.put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PULL"[..]); + self.outbuf.wp += b.filled().len(); + self.conn_state = ConnState::ReadFrameFlags; + self.need_min = 1; + Ok(None) + } + ConnState::ReadFrameFlags => { + let flags = self.buf.buf[self.buf.rp + 0]; + self.buf.rp += self.need_min; + let has_more = flags & 0x01 != 0; + let long_size = flags & 0x02 != 0; + let is_command = flags & 0x04 != 0; + self.has_more = has_more; + self.is_command = is_command; + trace!( + "parse_item ReadFrameFlags has_more {} long_size {} is_command {}", + has_more, + long_size, + is_command + ); + if false && is_command { + return Err(Error::with_msg_no_trace("got zmtp command frame")); + } + if long_size { + self.conn_state = ConnState::ReadFrameLong; + self.need_min = 8; + } else { + self.conn_state = ConnState::ReadFrameShort; + self.need_min = 1; + } + Ok(None) + } + ConnState::ReadFrameShort => { + let len = self.buf.buf[self.buf.rp + 0]; + self.buf.rp += self.need_min; + self.msglen = len as usize; + trace!("parse_item ReadFrameShort self.msglen {}", self.msglen); + self.conn_state = ConnState::ReadFrameBody; + self.need_min = self.msglen; + Ok(None) + } + ConnState::ReadFrameLong => { + let mut a = [0; 8]; + for i1 in 0..8 { + a[i1] = self.buf.buf[self.buf.rp + i1]; + } + self.buf.rp += self.need_min; + self.msglen = usize::from_be_bytes(a); + trace!("parse_item ReadFrameLong self.msglen {}", self.msglen); + self.conn_state = ConnState::ReadFrameBody; + self.need_min = self.msglen; + Ok(None) + } + ConnState::ReadFrameBody => { + let n1 = self.buf.len(); + let n1 = if n1 < 256 { n1 } else { 256 }; + let data = self.buf.buf[self.buf.rp..(self.buf.rp + self.msglen)].to_vec(); + if false { + let s = String::from_utf8_lossy(&self.buf.buf[self.buf.rp..(self.buf.rp + n1)]); + trace!( + "parse_item ReadFrameBody self.need_min {} string {}", + self.need_min, + s + ); + } + self.buf.rp += self.need_min; + self.conn_state = ConnState::ReadFrameFlags; + self.need_min = 1; + if !self.is_command { + let g = ZmtpFrame { + msglen: self.msglen, + has_more: self.has_more, + is_command: self.is_command, + data, + }; + self.frames.push(g); + } + if self.has_more { + Ok(None) + } else { + let g = ZmtpMessage { + frames: mem::replace(&mut self.frames, vec![]), + }; + Ok(Some(ZmtpEvent::ZmtpMessage(g))) + } + } + } + } +} + +struct NetBuf { + buf: Vec, + wp: usize, + rp: usize, +} + +impl NetBuf { + fn new() -> Self { + Self { + buf: vec![0; 1024 * 128], + wp: 0, + rp: 0, + } + } + + fn len(&self) -> usize { + self.wp - self.rp + } + + fn read_buf_for_fill(&mut self) -> ReadBuf { + self.rewind_if_needed(); + let read_buf = ReadBuf::new(&mut self.buf[self.wp..]); + read_buf + } + + fn rewind_if_needed(&mut self) { + if self.rp != 0 && self.rp == self.wp { + self.rp = 0; + self.wp = 0; + } else { + self.buf.copy_within(self.rp..self.wp, 0); + self.wp -= self.rp; + self.rp = 0; } } } #[derive(Debug)] -struct ZmtpEvent {} +struct ZmtpMessage { + frames: Vec, +} + +struct ZmtpFrame { + msglen: usize, + has_more: bool, + is_command: bool, + data: Vec, +} + +impl fmt::Debug for ZmtpFrame { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let s = String::from_utf8(self.data.clone()).unwrap_or_else(|_| String::new()); + let s = if s.is_ascii() && !s.contains("\x00") { + s + } else { + "...".into() + }; + f.debug_struct("ZmtpFrame") + .field("msglen", &self.msglen) + .field("has_more", &self.has_more) + .field("is_command", &self.is_command) + .field("data", &s) + .finish() + } +} + +#[derive(Debug)] +enum ZmtpEvent { + ZmtpMessage(ZmtpMessage), +} impl Stream for Zmtp { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - loop { - break if let ConnState::Init = self.conn_state { - // can it be that we already have enough bytes received in the buffer? - let mut buf1 = mem::replace(&mut self.buf1, vec![]); - let mut rbuf = ReadBuf::new(&mut buf1); - let w = &mut self.conn; - pin_mut!(w); - let m1 = w.poll_read(cx, &mut rbuf); - self.buf1 = buf1; - match m1 { - Ready(item) => Pending, - Pending => Pending, + if self.done { + return Ready(None); + } + 'outer: loop { + let write_pending = loop { + if self.outbuf.len() > 0 { + let (b, w) = self.outbuf_conn(); + pin_mut!(w); + match w.poll_write(cx, b) { + Ready(k) => match k { + Ok(k) => { + self.outbuf.rp += k; + info!("sent {} bytes", k); + } + Err(e) => { + self.done = true; + break 'outer Ready(Some(Err(e.into()))); + } + }, + Pending => break true, + } + } else { + break false; } - } else { - Pending }; + let read_pending = loop { + if self.buf.len() < self.need_min { + let nf1 = self.buf.buf.len() - self.buf.rp; + let nf2 = self.need_min; + let (w, mut rbuf) = self.buf_conn(); + if nf1 < nf2 { + break 'outer Ready(Some(Err(Error::with_msg_no_trace("buffer too small for need_min")))); + } + pin_mut!(w); + let r = w.poll_read(cx, &mut rbuf); + match r { + Ready(k) => match k { + Ok(_) => { + info!("received {} bytes", rbuf.filled().len()); + if false { + let t = rbuf.filled().len(); + let t = if t < 32 { t } else { 32 }; + info!("got data {:?}", &rbuf.filled()[0..t]); + } + self.buf.wp += rbuf.filled().len(); + } + Err(e) => { + self.done = true; + break 'outer Ready(Some(Err(e.into()))); + } + }, + Pending => break true, + } + } else { + break false; + } + }; + if self.buf.len() >= self.need_min { + match self.parse_item() { + Ok(k) => match k { + Some(k) => break 'outer Ready(Some(Ok(k))), + None => (), + }, + Err(e) => { + self.done = true; + break 'outer Ready(Some(Err(e.into()))); + } + } + } + if write_pending || read_pending { + break 'outer Pending; + } } } } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index f7582db..253658c 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -938,6 +938,8 @@ pub struct ChannelSearchSingleResult { pub shape: Vec, pub unit: String, pub description: String, + #[serde(rename = "isApi0", skip_serializing_if = "Option::is_none")] + pub is_api_0: Option, } #[derive(Serialize, Deserialize)] @@ -957,6 +959,8 @@ pub struct ProxyConfig { pub port: u16, pub search_hosts: Vec, pub backends: Vec, + pub api_0_search_hosts: Option>, + pub api_0_search_backends: Option>, } pub trait HasBackend {