Remove unused

This commit is contained in:
Dominik Werder
2022-03-28 10:31:44 +02:00
parent c67e8e4dbb
commit b865558641
15 changed files with 64 additions and 1139 deletions

View File

@@ -30,4 +30,3 @@ netpod = { path = "../netpod" }
#httpret = { path = "../httpret" }
disk = { path = "../disk" }
daqbufp2 = { path = "../daqbufp2" }
netfetch = { path = "../netfetch" }

View File

@@ -92,9 +92,6 @@ async fn go() -> Result<(), Error> {
SubCmd::GenerateTestData => {
disk::gen::gen_test_data().await?;
}
SubCmd::Zmtp(zmtp) => {
netfetch::zmtp::zmtp_client(&zmtp.addr).await?;
}
SubCmd::Logappend(k) => {
let jh = tokio::task::spawn_blocking(move || {
taskrun::append::append(&k.dir, std::io::stdin()).unwrap();

View File

@@ -15,7 +15,6 @@ pub enum SubCmd {
Proxy(Proxy),
Client(Client),
GenerateTestData,
Zmtp(Zmtp),
Logappend(Logappend),
Test,
}
@@ -74,12 +73,6 @@ pub struct BinnedClient {
pub disk_stats_every_kb: u32,
}
#[derive(Debug, Parser)]
pub struct Zmtp {
#[clap(long)]
pub addr: String,
}
#[derive(Debug, Parser)]
pub struct Logappend {
#[clap(long)]

View File

@@ -208,6 +208,16 @@ where
}
}
pub trait ToErr {
fn to_err(self) -> Error;
}
impl<T: ToErr> From<T> for Error {
fn from(k: T) -> Self {
k.to_err()
}
}
impl From<PublicError> for Error {
fn from(k: PublicError) -> Self {
Self {

View File

@@ -29,7 +29,6 @@ dbconn = { path = "../dbconn" }
disk = { path = "../disk" }
items = { path = "../items" }
parse = { path = "../parse" }
netfetch = { path = "../netfetch" }
archapp_wrap = { path = "../archapp_wrap" }
nodenet = { path = "../nodenet" }
commonio = { path = "../commonio" }

View File

@@ -205,8 +205,8 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
if req.method() == Method::GET {
let ret = serde_json::json!({
"data_api_version": {
"major": 4,
"minor": 0,
"major": 4u32,
"minor": 0u32,
},
});
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
@@ -279,12 +279,6 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/ca_connect_1" {
if req.method() == Method::GET {
Ok(ca_connect_1(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/archapp/files/scan/msgs" {
if req.method() == Method::GET {
Ok(archapp_scan_files(req, &node_config).await?)
@@ -922,22 +916,6 @@ pub fn status_board() -> Result<RwLockWriteGuard<'static, StatusBoard>, Error> {
}
}
pub async fn ca_connect_1(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let pairs = get_url_query_pairs(&url);
let res = netfetch::ca::ca_connect_1(pairs, node_config).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
Ok(mut item) => {
item.push('\n');
Ok(item)
}
Err(e) => Err(e),
})))?;
Ok(ret)
}
pub async fn archapp_scan_files(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let pairs = get_url_query_pairs(&url);

View File

@@ -454,7 +454,52 @@ pub async fn proxy_api1_single_backend_query(
_req: Request<Body>,
_proxy_config: &ProxyConfig,
) -> Result<Response<Body>, Error> {
panic!()
// TODO
/*
if let Some(back) = proxy_config.backends_event_download.first() {
let is_tls = req
.uri()
.scheme()
.ok_or_else(|| Error::with_msg_no_trace("no uri scheme"))?
== &http::uri::Scheme::HTTPS;
let bld = Request::builder().method(req.method());
let bld = bld.uri(req.uri());
// TODO to proxy events over multiple backends, we also have to concat results from different backends.
// TODO Carry on needed headers (but should not simply append all)
for (k, v) in req.headers() {
bld.header(k, v);
}
{
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
proxy_config.name.hash(&mut hasher);
let mid = hasher.finish();
bld.header(format!("proxy-mark-{mid:0x}"), proxy_config.name);
}
let body_data = hyper::body::to_bytes(req.into_body()).await?;
let reqout = bld.body(Body::from(body_data))?;
let resfut = {
use hyper::Client;
if is_tls {
let https = HttpsConnector::new();
let client = Client::builder().build::<_, Body>(https);
let req = client.request(reqout);
let req = Box::pin(req) as Pin<Box<dyn Future<Output = Result<Response<Body>, hyper::Error>> + Send>>;
req
} else {
let client = Client::new();
let req = client.request(reqout);
let req = Box::pin(req) as _;
req
}
};
resfut.timeout();
} else {
Err(Error::with_msg_no_trace(format!("no api1 event backend configured")))
}
*/
todo!()
}
pub async fn proxy_single_backend_query<QT>(

View File

@@ -1,25 +0,0 @@
[package]
name = "netfetch"
version = "0.0.1-a.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[lib]
path = "src/netfetch.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11.1"
tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
tokio-stream = {version = "0.1.5", features = ["fs"]}
async-channel = "1.6"
bytes = "1.0.1"
arrayref = "0.3.6"
byteorder = "1.4.3"
futures-core = "0.3.14"
futures-util = "0.3.14"
md-5 = "0.9.1"
err = { path = "../err" }
netpod = { path = "../netpod" }
taskrun = { path = "../taskrun" }

View File

@@ -1,100 +0,0 @@
use crate::zmtp::ZmtpMessage;
use err::Error;
#[allow(unused)]
use netpod::log::*;
use netpod::ByteOrder;
use netpod::ScalarType;
use netpod::Shape;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsVal;
use std::fmt;
// TODO
pub struct ParseError {
pub err: Error,
pub msg: ZmtpMessage,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct GlobalTimestamp {
pub sec: u64,
pub ns: u64,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ChannelDesc {
pub name: String,
#[serde(rename = "type")]
pub ty: String,
pub shape: JsVal,
pub encoding: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct HeadA {
pub htype: String,
pub hash: String,
pub pulse_id: serde_json::Number,
pub global_timestamp: GlobalTimestamp,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct HeadB {
pub htype: String,
pub channels: Vec<ChannelDesc>,
}
#[derive(Debug)]
pub struct BsreadMessage {
pub head_a: HeadA,
pub head_b: HeadB,
pub values: Vec<Box<dyn fmt::Debug>>,
}
pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result<BsreadMessage, Error> {
if msg.frames().len() < 3 {
return Err(Error::with_msg_no_trace("not enough frames for bsread"));
}
let head_a: HeadA = serde_json::from_slice(&msg.frames()[0].data())?;
let head_b: HeadB = serde_json::from_slice(&msg.frames()[1].data())?;
let mut values = vec![];
if msg.frames().len() == head_b.channels.len() + 3 {
for (ch, fr) in head_b.channels.iter().zip(msg.frames()[2..].iter()) {
let sty = ScalarType::from_bsread_str(ch.ty.as_str())?;
let bo = ByteOrder::from_bsread_str(&ch.encoding)?;
let shape = Shape::from_bsread_jsval(&ch.shape)?;
match sty {
ScalarType::I64 => match &bo {
ByteOrder::LE => match &shape {
Shape::Scalar => {
assert_eq!(fr.data().len(), 8);
let v = i64::from_le_bytes(fr.data().try_into()?);
values.push(Box::new(v) as _);
}
Shape::Wave(_) => {}
Shape::Image(_, _) => {}
},
_ => {}
},
_ => {}
}
}
}
{
let fr = &msg.frames()[msg.frames().len() - 1];
if fr.data().len() == 8 {
let pulse = u64::from_le_bytes(fr.data().try_into()?);
info!("pulse {}", pulse);
}
}
let ret = BsreadMessage { head_a, head_b, values };
Ok(ret)
}
pub struct BsreadCollector {}
impl BsreadCollector {
pub fn new<S: Into<String>>(_addr: S) -> Self {
err::todoval()
}
}

View File

@@ -1,85 +0,0 @@
use async_channel::{bounded, Receiver};
use bytes::{BufMut, BytesMut};
use err::{ErrStr, Error};
use futures_util::FutureExt;
use netpod::NodeConfigCached;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[derive(Debug, Serialize, Deserialize)]
pub struct Message {
cmd: u16,
payload_len: u16,
type_type: u16,
data_len: u16,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum FetchItem {
Log(String),
Message(Message),
}
pub async fn ca_connect_1(
_pairs: BTreeMap<String, String>,
_node_config: &NodeConfigCached,
) -> Result<Receiver<Result<FetchItem, Error>>, Error> {
let (tx, rx) = bounded(16);
let tx2 = tx.clone();
tokio::task::spawn(
async move {
let mut conn = tokio::net::TcpStream::connect("S30CB06-CVME-LLRF2.psi.ch:5064").await?;
let (mut inp, mut out) = conn.split();
tx.send(Ok(FetchItem::Log(format!("connected")))).await.errstr()?;
let mut buf = [0; 64];
let mut b2 = BytesMut::with_capacity(128);
b2.put_u16(0x00);
b2.put_u16(0);
b2.put_u16(0);
b2.put_u16(0xb);
b2.put_u32(0);
b2.put_u32(0);
out.write_all(&b2).await?;
tx.send(Ok(FetchItem::Log(format!("written")))).await.errstr()?;
let n1 = inp.read(&mut buf).await?;
tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf))))
.await
.errstr()?;
// Search to get cid:
let chn = b"SATCB01-DBPM220:Y2";
b2.clear();
b2.put_u16(0x06);
b2.put_u16((16 + chn.len()) as u16);
b2.put_u16(0x00);
b2.put_u16(0x0b);
b2.put_u32(0x71803472);
b2.put_u32(0x71803472);
b2.put_slice(chn);
out.write_all(&b2).await?;
tx.send(Ok(FetchItem::Log(format!("written")))).await.errstr()?;
let n1 = inp.read(&mut buf).await?;
tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf))))
.await
.errstr()?;
Ok::<_, Error>(())
}
.then({
move |item| async move {
match item {
Ok(_) => {}
Err(e) => {
tx2.send(Ok(FetchItem::Log(format!("Seeing error: {:?}", e))))
.await
.errstr()?;
}
}
Ok::<_, Error>(())
}
}),
);
Ok(rx)
}

View File

@@ -1,143 +0,0 @@
use err::Error;
use tokio::io::ReadBuf;
pub const BUFCAP: usize = 1024 * 128;
pub const RP_REW_PT: usize = 1024 * 64;
pub struct NetBuf {
buf: Vec<u8>,
wp: usize,
rp: usize,
}
impl NetBuf {
pub fn new() -> Self {
Self {
buf: vec![0; BUFCAP],
wp: 0,
rp: 0,
}
}
pub fn len(&self) -> usize {
self.wp - self.rp
}
pub fn cap(&self) -> usize {
self.buf.len()
}
pub fn wrcap(&self) -> usize {
self.buf.len() - self.wp
}
pub fn data(&self) -> &[u8] {
&self.buf[self.rp..self.wp]
}
pub fn adv(&mut self, x: usize) -> Result<(), Error> {
if self.len() < x {
return Err(Error::with_msg_no_trace("not enough bytes"));
} else {
self.rp += x;
Ok(())
}
}
pub fn wpadv(&mut self, x: usize) -> Result<(), Error> {
if self.wrcap() < x {
return Err(Error::with_msg_no_trace("not enough space"));
} else {
self.wp += x;
Ok(())
}
}
pub fn read_u8(&mut self) -> Result<u8, Error> {
type T = u8;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
return Err(Error::with_msg_no_trace("not enough bytes"));
} else {
let val = self.buf[self.rp];
self.rp += TS;
Ok(val)
}
}
pub fn read_u64(&mut self) -> Result<u64, Error> {
type T = u64;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
return Err(Error::with_msg_no_trace("not enough bytes"));
} else {
let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?);
self.rp += TS;
Ok(val)
}
}
pub fn read_bytes(&mut self, n: usize) -> Result<&[u8], Error> {
if self.len() < n {
return Err(Error::with_msg_no_trace("not enough bytes"));
} else {
let val = self.buf[self.rp..self.rp + n].as_ref();
self.rp += n;
Ok(val)
}
}
pub fn read_buf_for_fill(&mut self) -> ReadBuf {
self.rewind_if_needed();
let read_buf = ReadBuf::new(&mut self.buf[self.wp..]);
read_buf
}
pub fn rewind_if_needed(&mut self) {
if self.rp != 0 && self.rp == self.wp {
self.rp = 0;
self.wp = 0;
} else if self.rp > RP_REW_PT {
self.buf.copy_within(self.rp..self.wp, 0);
self.wp -= self.rp;
self.rp = 0;
}
}
pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> {
self.rewind_if_needed();
if self.wrcap() < buf.len() {
return Err(Error::with_msg_no_trace("not enough space"));
} else {
self.buf[self.wp..self.wp + buf.len()].copy_from_slice(buf);
self.wp += buf.len();
Ok(())
}
}
pub fn put_u8(&mut self, v: u8) -> Result<(), Error> {
type T = u8;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed();
if self.wrcap() < TS {
return Err(Error::with_msg_no_trace("not enough space"));
} else {
self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes());
self.wp += TS;
Ok(())
}
}
pub fn put_u64(&mut self, v: u64) -> Result<(), Error> {
type T = u64;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed();
if self.wrcap() < TS {
return Err(Error::with_msg_no_trace("not enough space"));
} else {
self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes());
self.wp += TS;
Ok(())
}
}
}

View File

@@ -1,6 +0,0 @@
pub mod bsread;
pub mod ca;
pub mod netbuf;
#[cfg(test)]
pub mod test;
pub mod zmtp;

View File

@@ -1,60 +0,0 @@
use err::Error;
use futures_util::StreamExt;
use netpod::{log::*, SfDatabuffer};
use netpod::{Cluster, Database, Node, NodeConfig, NodeConfigCached};
use std::collections::BTreeMap;
use std::iter::FromIterator;
use std::time::Duration;
#[test]
fn ca_connect_1() {
let fut = async {
let it = vec![(String::new(), String::new())].into_iter();
let pairs = BTreeMap::from_iter(it);
let node_config = NodeConfigCached {
node: Node {
host: "".into(),
port: 123,
port_raw: 123,
cache_base_path: "".into(),
listen: "".into(),
sf_databuffer: Some(SfDatabuffer {
data_base_path: "".into(),
ksprefix: "".into(),
splits: None,
}),
archiver_appliance: None,
channel_archiver: None,
},
node_config: NodeConfig {
name: "".into(),
cluster: Cluster {
backend: "".into(),
nodes: vec![],
database: Database {
host: "".into(),
name: "".into(),
user: "".into(),
pass: "".into(),
},
run_map_pulse_task: false,
is_central_storage: false,
file_io_buffer_size: Default::default(),
},
},
ix: 0,
};
let mut rx = super::ca::ca_connect_1(pairs, &node_config).await?;
while let Some(item) = rx.next().await {
debug!("got next: {:?}", item);
}
Ok::<_, Error>(())
};
let fut = async move {
let ret = tokio::time::timeout(Duration::from_millis(4000), fut)
.await
.map_err(Error::from_string)??;
Ok(ret)
};
taskrun::run(fut).unwrap();
}

View File

@@ -1,683 +0,0 @@
use crate::bsread::parse_zmtp_message;
use crate::bsread::ChannelDesc;
use crate::bsread::GlobalTimestamp;
use crate::bsread::HeadA;
use crate::bsread::HeadB;
use crate::netbuf::NetBuf;
use crate::netbuf::RP_REW_PT;
use async_channel::Receiver;
use async_channel::Sender;
#[allow(unused)]
use bytes::BufMut;
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use netpod::log::*;
use netpod::timeunits::SEC;
use serde_json::Value as JsVal;
use std::fmt;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
//#[test]
#[allow(unused)]
fn test_listen() -> Result<(), Error> {
use std::time::Duration;
let fut = async move {
let _ = tokio::time::timeout(Duration::from_millis(16000), zmtp_client("camtest:9999")).await;
Ok::<_, Error>(())
};
taskrun::run(fut)
}
//#[test]
#[allow(unused)]
fn test_service() -> Result<(), Error> {
//use std::time::Duration;
let fut = async move {
let sock = tokio::net::TcpListener::bind("0.0.0.0:9999").await?;
loop {
info!("accepting...");
let (conn, remote) = sock.accept().await?;
info!("new connection from {:?}", remote);
let mut zmtp = Zmtp::new(conn, SocketType::PUSH);
let fut = async move {
while let Some(item) = zmtp.next().await {
info!("item from {:?} {:?}", remote, item);
}
Ok::<_, Error>(())
};
taskrun::spawn(fut);
}
//Ok::<_, Error>(())
};
taskrun::run(fut)
}
pub async fn zmtp_00() -> Result<(), Error> {
let addr = "S10-CPPM-MOT0991:9999";
zmtp_client(addr).await?;
Ok(())
}
pub async fn zmtp_client(addr: &str) -> Result<(), Error> {
let conn = tokio::net::TcpStream::connect(addr).await?;
let mut zmtp = Zmtp::new(conn, SocketType::PULL);
let mut i1 = 0;
while let Some(item) = zmtp.next().await {
match item {
Ok(ev) => match ev {
ZmtpEvent::ZmtpMessage(msg) => {
info!("Message frames: {}", msg.frames.len());
match parse_zmtp_message(&msg) {
Ok(msg) => info!("{:?}", msg),
Err(e) => {
error!("{}", e);
for frame in &msg.frames {
info!("Frame: {:?}", frame);
}
}
}
}
},
Err(e) => {
error!("{}", e);
return Err(e);
}
}
i1 += 1;
if i1 > 100 {
break;
}
}
Ok(())
}
enum ConnState {
InitSend,
InitRecv1,
InitRecv2,
ReadFrameFlags,
ReadFrameShort,
ReadFrameLong,
ReadFrameBody(usize),
}
impl ConnState {
fn need_min(&self) -> usize {
use ConnState::*;
match self {
InitSend => 0,
InitRecv1 => 11,
InitRecv2 => 53,
ReadFrameFlags => 1,
ReadFrameShort => 1,
ReadFrameLong => 8,
ReadFrameBody(msglen) => *msglen,
}
}
}
pub enum SocketType {
PUSH,
PULL,
}
struct DummyData {
ts: u64,
pulse: u64,
value: i64,
}
impl DummyData {
fn make_zmtp_msg(&self) -> Result<ZmtpMessage, Error> {
let head_b = HeadB {
htype: "bsr_d-1.1".into(),
channels: vec![ChannelDesc {
name: "TESTCHAN".into(),
ty: "int64".into(),
shape: JsVal::Array(vec![JsVal::Number(serde_json::Number::from(1))]),
encoding: "little".into(),
}],
};
let hb = serde_json::to_vec(&head_b).unwrap();
use md5::Digest;
let mut h = md5::Md5::new();
h.update(&hb);
let mut md5hex = String::with_capacity(32);
for c in h.finalize() {
use fmt::Write;
write!(&mut md5hex, "{:02x}", c).unwrap();
}
let head_a = HeadA {
htype: "bsr_m-1.1".into(),
hash: md5hex,
pulse_id: serde_json::Number::from(self.pulse),
global_timestamp: GlobalTimestamp {
sec: self.ts / SEC,
ns: self.ts % SEC,
},
};
// TODO write directly to output buffer.
let ha = serde_json::to_vec(&head_a).unwrap();
let hf = self.value.to_le_bytes().to_vec();
let hp = [(self.ts / SEC).to_be_bytes(), (self.ts % SEC).to_be_bytes()].concat();
let mut msg = ZmtpMessage { frames: vec![] };
let fr = ZmtpFrame {
msglen: 0,
has_more: false,
is_command: false,
data: ha,
};
msg.frames.push(fr);
let fr = ZmtpFrame {
msglen: 0,
has_more: false,
is_command: false,
data: hb,
};
msg.frames.push(fr);
let fr = ZmtpFrame {
msglen: 0,
has_more: false,
is_command: false,
data: hf,
};
msg.frames.push(fr);
let fr = ZmtpFrame {
msglen: 0,
has_more: false,
is_command: false,
data: hp,
};
msg.frames.push(fr);
Ok(msg)
}
}
struct Zmtp {
done: bool,
complete: bool,
socket_type: SocketType,
conn: TcpStream,
conn_state: ConnState,
buf: NetBuf,
outbuf: NetBuf,
out_enable: bool,
msglen: usize,
has_more: bool,
is_command: bool,
frames: Vec<ZmtpFrame>,
inp_eof: bool,
data_tx: Sender<DummyData>,
data_rx: Receiver<DummyData>,
}
impl Zmtp {
fn new(conn: TcpStream, socket_type: SocketType) -> Self {
//conn.set_send_buffer_size(1024 * 64)?;
//conn.set_recv_buffer_size(1024 * 1024 * 4)?;
//info!("send_buffer_size {:8}", conn.send_buffer_size()?);
//info!("recv_buffer_size {:8}", conn.recv_buffer_size()?);
let (tx, rx) = async_channel::bounded(1);
Self {
done: false,
complete: false,
socket_type,
conn,
conn_state: ConnState::InitSend,
buf: NetBuf::new(),
outbuf: NetBuf::new(),
out_enable: false,
msglen: 0,
has_more: false,
is_command: false,
frames: vec![],
inp_eof: false,
data_tx: tx,
data_rx: rx,
}
}
fn inpbuf_conn(&mut self) -> (&mut TcpStream, ReadBuf) {
(&mut self.conn, self.buf.read_buf_for_fill())
}
fn outbuf_conn(&mut self) -> (&[u8], &mut TcpStream) {
(self.outbuf.data(), &mut self.conn)
}
fn parse_item(&mut self) -> Result<Option<ZmtpEvent>, Error> {
match self.conn_state {
ConnState::InitSend => {
info!("parse_item InitSend");
self.outbuf.put_slice(&[0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3])?;
self.conn_state = ConnState::InitRecv1;
Ok(None)
}
ConnState::InitRecv1 => {
self.buf.adv(10)?;
let ver = self.buf.read_u8()?;
info!("parse_item InitRecv1 major version {}", ver);
if ver != 3 {
return Err(Error::with_msg_no_trace(format!("bad version {}", ver)));
}
self.outbuf.put_slice(&[0, 0x4e, 0x55, 0x4c, 0x4c])?;
let a = vec![0; 48];
self.outbuf.put_slice(&a)?;
self.conn_state = ConnState::InitRecv2;
Ok(None)
}
ConnState::InitRecv2 => {
info!("parse_item InitRecv2");
let msgrem = self.conn_state.need_min();
let ver_min = self.buf.read_u8()?;
let msgrem = msgrem - 1;
info!("Peer minor version {}", ver_min);
// TODO parse greeting remainder.. sec-scheme.
self.buf.adv(msgrem)?;
match self.socket_type {
SocketType::PUSH => {
self.outbuf
.put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PUSH"[..])?;
}
SocketType::PULL => {
self.outbuf
.put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PULL"[..])?;
}
}
self.out_enable = true;
self.conn_state = ConnState::ReadFrameFlags;
let tx = self.data_tx.clone();
let fut1 = async move {
loop {
tokio::time::sleep(Duration::from_millis(1000)).await;
let dd = DummyData {
ts: 420032002200887766,
pulse: 123123123123,
value: -777,
};
match tx.send(dd).await {
Ok(()) => {
info!("item send to channel");
}
Err(_) => break,
}
}
};
taskrun::spawn(fut1);
Ok(None)
}
ConnState::ReadFrameFlags => {
let flags = self.buf.read_u8()?;
let has_more = flags & 0x01 != 0;
let long_size = flags & 0x02 != 0;
let is_command = flags & 0x04 != 0;
self.has_more = has_more;
self.is_command = is_command;
trace!(
"parse_item ReadFrameFlags has_more {} long_size {} is_command {}",
has_more,
long_size,
is_command
);
if is_command {
warn!("Got zmtp command frame");
}
if false && is_command {
return Err(Error::with_msg_no_trace("got zmtp command frame"));
}
if long_size {
self.conn_state = ConnState::ReadFrameLong;
} else {
self.conn_state = ConnState::ReadFrameShort;
}
Ok(None)
}
ConnState::ReadFrameShort => {
self.msglen = self.buf.read_u8()? as usize;
trace!("parse_item ReadFrameShort msglen {}", self.msglen);
self.conn_state = ConnState::ReadFrameBody(self.msglen);
if self.msglen > 1024 * 64 {
return Err(Error::with_msg_no_trace(format!(
"larger msglen not yet supported {}",
self.msglen,
)));
}
Ok(None)
}
ConnState::ReadFrameLong => {
self.msglen = self.buf.read_u64()? as usize;
trace!("parse_item ReadFrameShort msglen {}", self.msglen);
self.conn_state = ConnState::ReadFrameBody(self.msglen);
if self.msglen > 1024 * 64 {
return Err(Error::with_msg_no_trace(format!(
"larger msglen not yet supported {}",
self.msglen,
)));
}
Ok(None)
}
ConnState::ReadFrameBody(msglen) => {
let data = self.buf.read_bytes(msglen)?.to_vec();
self.msglen = 0;
if false {
let n1 = data.len().min(256);
let s = String::from_utf8_lossy(&data[..n1]);
trace!("parse_item ReadFrameBody msglen {} string {}", msglen, s);
}
self.conn_state = ConnState::ReadFrameFlags;
if !self.is_command {
let g = ZmtpFrame {
msglen: self.msglen,
has_more: self.has_more,
is_command: self.is_command,
data,
};
self.frames.push(g);
}
if self.has_more {
Ok(None)
} else {
let g = ZmtpMessage {
frames: mem::replace(&mut self.frames, vec![]),
};
Ok(Some(ZmtpEvent::ZmtpMessage(g)))
}
}
}
}
}
#[derive(Debug)]
pub struct ZmtpMessage {
frames: Vec<ZmtpFrame>,
}
impl ZmtpMessage {
pub fn frames(&self) -> &Vec<ZmtpFrame> {
&self.frames
}
pub fn emit_to_buffer(&self, out: &mut NetBuf) -> Result<(), Error> {
let n = self.frames.len();
for (i, fr) in self.frames.iter().enumerate() {
let mut flags: u8 = 2;
if i < n - 1 {
flags |= 1;
}
out.put_u8(flags)?;
out.put_u64(fr.data().len() as u64)?;
out.put_slice(fr.data())?;
}
Ok(())
}
}
pub struct ZmtpFrame {
msglen: usize,
has_more: bool,
is_command: bool,
data: Vec<u8>,
}
impl ZmtpFrame {
pub fn data(&self) -> &[u8] {
&self.data
}
}
impl fmt::Debug for ZmtpFrame {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let data = match String::from_utf8(self.data.clone()) {
Ok(s) => s
.chars()
.filter(|x| {
//
x.is_ascii_alphanumeric() || x.is_ascii_punctuation() || x.is_ascii_whitespace()
})
.collect::<String>(),
Err(_) => format!("Binary {{ len: {} }}", self.data.len()),
};
f.debug_struct("ZmtpFrame")
.field("msglen", &self.msglen)
.field("has_more", &self.has_more)
.field("is_command", &self.is_command)
.field("data", &data)
.finish()
}
}
enum Int<T> {
NoWork,
Pending,
Empty,
Item(T),
Done,
}
impl<T> Int<T> {
fn item_count(&self) -> u32 {
if let Int::Item(_) = self {
1
} else {
0
}
}
}
impl<T> fmt::Debug for Int<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::NoWork => write!(f, "NoWork"),
Self::Pending => write!(f, "Pending"),
Self::Empty => write!(f, "Empty"),
Self::Item(_) => write!(f, "Item"),
Self::Done => write!(f, "Done"),
}
}
}
#[derive(Debug)]
enum ZmtpEvent {
ZmtpMessage(ZmtpMessage),
}
impl Stream for Zmtp {
type Item = Result<ZmtpEvent, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.complete {
panic!("poll_next on complete")
} else if self.done {
self.complete = true;
return Ready(None);
}
loop {
let mut item_count = 0;
let serialized: Int<Result<(), Error>> = if self.out_enable && self.outbuf.wrcap() >= RP_REW_PT {
match self.data_rx.poll_next_unpin(cx) {
Ready(Some(item)) => {
let msg = item.make_zmtp_msg().unwrap();
match msg.emit_to_buffer(&mut self.outbuf) {
Ok(_) => Int::Empty,
Err(e) => {
self.done = true;
Int::Item(Err(e))
}
}
/*let mut msgs = Vec::with_capacity(1024 * 8);
msgs.put_u8(1 | 2);
msgs.put_u64(ha.len() as u64);
msgs.put_slice(&ha);
msgs.put_u8(1 | 2);
msgs.put_u64(hb.len() as u64);
msgs.put_slice(&hb);
msgs.put_u8(1 | 2);
msgs.put_u64(hf.len() as u64);
msgs.put_slice(&hf);
msgs.put_u8(2);
msgs.put_u64(hp.len() as u64);
msgs.put_slice(&hp);
self.outbuf.put_slice(&msgs).unwrap();
Int::Empty*/
}
Ready(None) => Int::Done,
Pending => Int::Pending,
}
} else {
Int::NoWork
};
item_count += serialized.item_count();
let write: Int<Result<(), _>> = if item_count > 0 {
Int::NoWork
} else if self.outbuf.len() > 0 {
let (b, w) = self.outbuf_conn();
pin_mut!(w);
match w.poll_write(cx, b) {
Ready(k) => match k {
Ok(k) => match self.outbuf.adv(k) {
Ok(()) => {
info!("sent {} bytes", k);
self.outbuf.rewind_if_needed();
Int::Empty
}
Err(e) => {
error!("advance error {:?}", e);
Int::Item(Err(e))
}
},
Err(e) => {
error!("output write error {:?}", e);
Int::Item(Err(e.into()))
}
},
Pending => Int::Pending,
}
} else {
Int::NoWork
};
info!("write result: {:?} {}", write, self.outbuf.len());
item_count += write.item_count();
let read: Int<Result<(), _>> = if item_count > 0 || self.inp_eof {
Int::NoWork
} else {
if self.buf.cap() < self.conn_state.need_min() {
self.done = true;
let e = Error::with_msg_no_trace(format!(
"buffer too small for need_min {} {}",
self.buf.cap(),
self.conn_state.need_min()
));
Int::Item(Err(e))
} else if self.buf.len() < self.conn_state.need_min() {
let (w, mut rbuf) = self.inpbuf_conn();
pin_mut!(w);
match w.poll_read(cx, &mut rbuf) {
Ready(k) => match k {
Ok(()) => {
let nf = rbuf.filled().len();
if nf == 0 {
info!("EOF");
self.inp_eof = true;
Int::Done
} else {
info!("received {} bytes", rbuf.filled().len());
if false {
let t = rbuf.filled().len();
let t = if t < 32 { t } else { 32 };
info!("got data {:?}", &rbuf.filled()[0..t]);
}
match self.buf.wpadv(nf) {
Ok(()) => Int::Empty,
Err(e) => Int::Item(Err(e)),
}
}
}
Err(e) => Int::Item(Err(e.into())),
},
Pending => Int::Pending,
}
} else {
Int::NoWork
}
};
item_count += read.item_count();
let parsed = if item_count > 0 || self.buf.len() < self.conn_state.need_min() {
Int::NoWork
} else {
match self.parse_item() {
Ok(k) => match k {
Some(k) => Int::Item(Ok(k)),
None => Int::Empty,
},
Err(e) => Int::Item(Err(e)),
}
};
item_count += parsed.item_count();
let _ = item_count;
{
use Int::*;
match (serialized, write, read, parsed) {
(NoWork | Done, NoWork | Done, NoWork | Done, NoWork | Done) => {
warn!("all NoWork or Done");
break Poll::Pending;
}
(Item(Err(e)), _, _, _) => {
self.done = true;
break Poll::Ready(Some(Err(e)));
}
(_, Item(Err(e)), _, _) => {
self.done = true;
break Poll::Ready(Some(Err(e)));
}
(_, _, Item(Err(e)), _) => {
self.done = true;
break Poll::Ready(Some(Err(e)));
}
(_, _, _, Item(Err(e))) => {
self.done = true;
break Poll::Ready(Some(Err(e)));
}
(Item(_), _, _, _) => {
continue;
}
(_, Item(_), _, _) => {
continue;
}
(_, _, Item(_), _) => {
continue;
}
(_, _, _, Item(Ok(item))) => {
break Poll::Ready(Some(Ok(item)));
}
(Empty, _, _, _) => continue,
(_, Empty, _, _) => continue,
(_, _, Empty, _) => continue,
(_, _, _, Empty) => continue,
#[allow(unreachable_patterns)]
(Pending, Pending | NoWork | Done, Pending | NoWork | Done, Pending | NoWork | Done) => {
break Poll::Pending
}
#[allow(unreachable_patterns)]
(Pending | NoWork | Done, Pending, Pending | NoWork | Done, Pending | NoWork | Done) => {
break Poll::Pending
}
#[allow(unreachable_patterns)]
(Pending | NoWork | Done, Pending | NoWork | Done, Pending, Pending | NoWork | Done) => {
break Poll::Pending
}
#[allow(unreachable_patterns)]
(Pending | NoWork | Done, Pending | NoWork | Done, Pending | NoWork | Done, Pending) => {
break Poll::Pending
}
}
};
}
}
}

View File

@@ -1477,10 +1477,16 @@ pub struct ProxyConfig {
pub name: String,
pub listen: String,
pub port: u16,
#[serde(default)]
pub backends_status: Vec<ProxyBackend>,
#[serde(default)]
pub backends: Vec<ProxyBackend>,
#[serde(default)]
pub backends_pulse_map: Vec<ProxyBackend>,
#[serde(default)]
pub backends_search: Vec<ProxyBackend>,
#[serde(default)]
pub backends_event_download: Vec<ProxyBackend>,
pub api_0_search_hosts: Option<Vec<String>>,
pub api_0_search_backends: Option<Vec<String>>,
}