>(&body) {
+ Ok(k) => {
+ let mut a = vec![];
+ if let Some(g) = k.first() {
+ for c in &g.channels {
+ let mut z = ChannelSearchSingleResult {
+ backend: c.backend.clone(),
+ description: String::new(),
+ name: c.name.clone(),
+ shape: vec![],
+ source: c.source.clone(),
+ ty: c.ty.clone(),
+ unit: String::new(),
+ is_api_0: Some(true),
+ };
+ a.push(z);
+ }
+ }
+ let ret = ChannelSearchResult { channels: a };
+ ret
+ }
+ Err(_) => {
+ error!("Channel search response parse failed");
+ ChannelSearchResult { channels: vec![] }
+ }
+ }
+ }
};
Ok(res)
};
@@ -202,9 +276,16 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R
.body(Body::from(serde_json::to_string(&res)?))?;
Ok(res)
};
- let ret =
- gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, Duration::from_millis(3000))
- .await?;
+ let ret = gather_get_json_generic(
+ http::Method::GET,
+ urls,
+ bodies,
+ tags,
+ nt,
+ ft,
+ Duration::from_millis(3000),
+ )
+ .await?;
Ok(ret)
} else {
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
@@ -265,7 +346,9 @@ where
return Err(Error::with_msg("no response from upstream"));
}
};
- let ret = gather_get_json_generic(http::Method::GET, urls, None, tags, nt, ft, query.timeout()).await?;
+ let bodies = (0..urls.len()).into_iter().map(|_| None).collect();
+ let ret =
+ gather_get_json_generic(http::Method::GET, urls, bodies, tags, nt, ft, query.timeout()).await?;
Ok(ret)
} else {
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html
index d709589..5c34af7 100644
--- a/httpret/static/documentation/api4.html
+++ b/httpret/static/documentation/api4.html
@@ -8,13 +8,14 @@
-
Databuffer API 4 Documentation
-Documented here are the endpoints for databuffer API 4. The endpoints of the "original" unversioned API is documented at
- this location.
+Documented here is the databuffer http api 4. The "original" unversioned api is documented at
+this location.
+In order to keep the api surface as small as possible in comparison to api 0, we add functionality on demand,
+so please feel free to create some Jira ticket!
Timestamp format
@@ -72,13 +73,14 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/backends'
Search channel
Method: GET
URL: https://data-api.psi.ch/api/4/search/channel
-Query parameters:
+Query parameters: (all optional)
- nameRegex (e.g. "LSCP.*6")
- sourceRegex (e.g. "178:9999")
- descriptionRegex (e.g. "celsius")
Request header: "Accept" must be "application/json"
+Full channel list is long, so it's encouraged to provide a search string of some minimal length.
CURL example:
@@ -86,6 +88,9 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel
Example response:
+Keys always present: name, backend.
+Other keys are optional, it depends on the data found on disk: sometimes there is an empty string on disk, sometimes
+that key is missing.
{
"channels": [
{
@@ -105,6 +110,16 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel
"shape": [],
"unit": "",
"description": ""
+ },
+ {
+ "isApi0": true,
+ "name": "EXAMPLE-CHANNEL-FROM-API-0-BACKEND",
+ "backend": "twlha-databuffer",
+ "source": "tcp://.....",
+ "type": "int32",
+ "shape": [],
+ "unit": "",
+ "description": ""
}
]
}
diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs
index 8ba9785..b625a56 100644
--- a/netfetch/src/zmtp.rs
+++ b/netfetch/src/zmtp.rs
@@ -2,14 +2,14 @@ use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use netpod::log::*;
+use std::fmt;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
-use tokio::io::{AsyncRead, ReadBuf};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
pub async fn zmtp_00() -> Result<(), Error> {
- // PV:BSREADCONFIG
let addr = "S10-CPPM-MOT0991:9999";
let conn = tokio::net::TcpStream::connect(addr).await?;
let mut zmtp = Zmtp::new(conn);
@@ -19,52 +19,339 @@ pub async fn zmtp_00() -> Result<(), Error> {
Ok(())
}
+pub async fn zmtp_client(addr: &str) -> Result<(), Error> {
+ let conn = tokio::net::TcpStream::connect(addr).await?;
+ let mut zmtp = Zmtp::new(conn);
+ while let Some(ev) = zmtp.next().await {
+ info!("got zmtp event: {:?}", ev);
+ }
+ Ok(())
+}
+
enum ConnState {
- Init,
+ InitSend,
+ InitRecv1,
+ InitRecv2,
+ ReadFrameFlags,
+ ReadFrameShort,
+ ReadFrameLong,
+ ReadFrameBody,
}
struct Zmtp {
+ done: bool,
conn: TcpStream,
conn_state: ConnState,
- buf1: Vec,
+ buf: NetBuf,
+ outbuf: NetBuf,
need_min: usize,
+ msglen: usize,
+ has_more: bool,
+ is_command: bool,
+ frames: Vec,
}
impl Zmtp {
fn new(conn: TcpStream) -> Self {
+ //conn.set_send_buffer_size(1024 * 64)?;
+ //conn.set_recv_buffer_size(1024 * 1024 * 4)?;
+ //info!("send_buffer_size {:8}", conn.send_buffer_size()?);
+ //info!("recv_buffer_size {:8}", conn.recv_buffer_size()?);
Self {
+ done: false,
conn,
- conn_state: ConnState::Init,
- buf1: vec![0; 1024],
- need_min: 4,
+ conn_state: ConnState::InitSend,
+ buf: NetBuf::new(),
+ outbuf: NetBuf::new(),
+ need_min: 0,
+ msglen: 0,
+ has_more: false,
+ is_command: false,
+ frames: vec![],
+ }
+ }
+
+ fn buf_conn(&mut self) -> (&mut TcpStream, ReadBuf) {
+ (&mut self.conn, self.buf.read_buf_for_fill())
+ }
+
+ fn outbuf_conn(&mut self) -> (&[u8], &mut TcpStream) {
+ let b = &self.outbuf.buf[self.outbuf.rp..self.outbuf.wp];
+ let w = &mut self.conn;
+ (b, w)
+ }
+
+ fn parse_item(&mut self) -> Result