Read some bsread and put into scylla
This commit is contained in:
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
/target
|
||||
/Cargo.lock
|
||||
14
Cargo.toml
Normal file
14
Cargo.toml
Normal file
@@ -0,0 +1,14 @@
|
||||
[workspace]
|
||||
members = ["log", "netfetch", "daqingest"]
|
||||
|
||||
[profile.release]
|
||||
opt-level = 1
|
||||
debug = 0
|
||||
overflow-checks = false
|
||||
debug-assertions = false
|
||||
lto = false
|
||||
codegen-units = 32
|
||||
incremental = true
|
||||
|
||||
[patch.crates-io]
|
||||
tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" }
|
||||
21
daqingest/Cargo.toml
Normal file
21
daqingest/Cargo.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
[package]
|
||||
name = "daqingest"
|
||||
version = "0.1.0"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
[lib]
|
||||
path = "src/daqingest.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "daqingest"
|
||||
path = "src/bin/daqingest.rs"
|
||||
|
||||
[dependencies]
|
||||
clap = { version = "3", features = ["derive", "cargo"] }
|
||||
chrono = "0.4"
|
||||
bytes = "1.1"
|
||||
scylla = "0.4"
|
||||
err = { path = "../../daqbuffer/err" }
|
||||
taskrun = { path = "../../daqbuffer/taskrun" }
|
||||
netfetch = { path = "../netfetch" }
|
||||
16
daqingest/src/bin/daqingest.rs
Normal file
16
daqingest/src/bin/daqingest.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
use clap::Parser;
|
||||
use daqingest::{DaqIngestOpts, SubCmd};
|
||||
use err::Error;
|
||||
|
||||
pub fn main() -> Result<(), Error> {
|
||||
taskrun::run(async {
|
||||
if false {
|
||||
return Err(Error::with_msg_no_trace(format!("unknown command")));
|
||||
} else {
|
||||
}
|
||||
let opts = DaqIngestOpts::parse();
|
||||
match opts.subcmd {
|
||||
SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(&k.source, k.rcvbuf).await,
|
||||
}
|
||||
})
|
||||
}
|
||||
24
daqingest/src/daqingest.rs
Normal file
24
daqingest/src/daqingest.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use clap::Parser;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
//#[clap(name = "daqingest", version)]
|
||||
//#[clap(version)]
|
||||
pub struct DaqIngestOpts {
|
||||
#[clap(long, parse(from_occurrences))]
|
||||
pub verbose: u32,
|
||||
#[clap(subcommand)]
|
||||
pub subcmd: SubCmd,
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub enum SubCmd {
|
||||
Bsread(Bsread),
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct Bsread {
|
||||
#[clap(long)]
|
||||
pub source: String,
|
||||
#[clap(long)]
|
||||
pub rcvbuf: Option<u32>,
|
||||
}
|
||||
11
log/Cargo.toml
Normal file
11
log/Cargo.toml
Normal file
@@ -0,0 +1,11 @@
|
||||
[package]
|
||||
name = "log"
|
||||
version = "0.0.1"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
[lib]
|
||||
path = "src/log.rs"
|
||||
|
||||
[dependencies]
|
||||
tracing = "0.1"
|
||||
2
log/src/log.rs
Normal file
2
log/src/log.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
#[allow(unused_imports)]
|
||||
pub use tracing::{debug, error, info, trace, warn};
|
||||
28
netfetch/Cargo.toml
Normal file
28
netfetch/Cargo.toml
Normal file
@@ -0,0 +1,28 @@
|
||||
[package]
|
||||
name = "netfetch"
|
||||
version = "0.0.1-a.0"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
[lib]
|
||||
path = "src/netfetch.rs"
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
serde_cbor = "0.11"
|
||||
tokio = { version = "1.7", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
|
||||
tokio-stream = { version = "0.1", features = ["fs"]}
|
||||
async-channel = "1.6"
|
||||
bytes = "1.0"
|
||||
arrayref = "0.3"
|
||||
byteorder = "1.4"
|
||||
futures-core = "0.3"
|
||||
futures-util = "0.3"
|
||||
scylla = "0.4"
|
||||
md-5 = "0.9"
|
||||
libc = "0.2"
|
||||
log = { path = "../log" }
|
||||
err = { path = "../../daqbuffer/err" }
|
||||
netpod = { path = "../../daqbuffer/netpod" }
|
||||
taskrun = { path = "../../daqbuffer/taskrun" }
|
||||
98
netfetch/src/bsread.rs
Normal file
98
netfetch/src/bsread.rs
Normal file
@@ -0,0 +1,98 @@
|
||||
use crate::zmtp::ZmtpMessage;
|
||||
use err::Error;
|
||||
#[allow(unused)]
|
||||
use log::*;
|
||||
use netpod::{ByteOrder, ScalarType, Shape};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value as JsVal;
|
||||
use std::fmt;
|
||||
|
||||
// TODO
|
||||
pub struct ParseError {
|
||||
pub err: Error,
|
||||
pub msg: ZmtpMessage,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct GlobalTimestamp {
|
||||
pub sec: u64,
|
||||
pub ns: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ChannelDesc {
|
||||
pub name: String,
|
||||
#[serde(rename = "type")]
|
||||
pub ty: String,
|
||||
pub shape: JsVal,
|
||||
pub encoding: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct HeadA {
|
||||
pub htype: String,
|
||||
pub hash: String,
|
||||
pub pulse_id: serde_json::Number,
|
||||
pub global_timestamp: GlobalTimestamp,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct HeadB {
|
||||
pub htype: String,
|
||||
pub channels: Vec<ChannelDesc>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BsreadMessage {
|
||||
pub head_a: HeadA,
|
||||
pub head_b: HeadB,
|
||||
pub values: Vec<Box<dyn fmt::Debug>>,
|
||||
}
|
||||
|
||||
pub fn parse_zmtp_message(msg: &ZmtpMessage) -> Result<BsreadMessage, Error> {
|
||||
if msg.frames().len() < 3 {
|
||||
return Err(Error::with_msg_no_trace("not enough frames for bsread"));
|
||||
}
|
||||
let head_a: HeadA = serde_json::from_slice(&msg.frames()[0].data())?;
|
||||
let head_b: HeadB = serde_json::from_slice(&msg.frames()[1].data())?;
|
||||
let mut values = vec![];
|
||||
if msg.frames().len() == head_b.channels.len() + 3 {
|
||||
for (ch, fr) in head_b.channels.iter().zip(msg.frames()[2..].iter()) {
|
||||
let sty = ScalarType::from_bsread_str(ch.ty.as_str())?;
|
||||
let bo = ByteOrder::from_bsread_str(&ch.encoding)?;
|
||||
let shape = Shape::from_bsread_jsval(&ch.shape)?;
|
||||
match sty {
|
||||
ScalarType::I64 => match &bo {
|
||||
ByteOrder::LE => match &shape {
|
||||
Shape::Scalar => {
|
||||
assert_eq!(fr.data().len(), 8);
|
||||
let v = i64::from_le_bytes(fr.data().try_into()?);
|
||||
values.push(Box::new(v) as _);
|
||||
}
|
||||
Shape::Wave(_) => {}
|
||||
Shape::Image(_, _) => {}
|
||||
},
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
let fr = &msg.frames()[msg.frames().len() - 1];
|
||||
if fr.data().len() == 8 {
|
||||
let pulse = u64::from_le_bytes(fr.data().try_into()?);
|
||||
info!("pulse {}", pulse);
|
||||
}
|
||||
}
|
||||
let ret = BsreadMessage { head_a, head_b, values };
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub struct BsreadCollector {}
|
||||
|
||||
impl BsreadCollector {
|
||||
pub fn new<S: Into<String>>(_addr: S) -> Self {
|
||||
err::todoval()
|
||||
}
|
||||
}
|
||||
80
netfetch/src/ca.rs
Normal file
80
netfetch/src/ca.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
use async_channel::{bounded, Receiver};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use err::{ErrStr, Error};
|
||||
use futures_util::FutureExt;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Message {
|
||||
cmd: u16,
|
||||
payload_len: u16,
|
||||
type_type: u16,
|
||||
data_len: u16,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum FetchItem {
|
||||
Log(String),
|
||||
Message(Message),
|
||||
}
|
||||
|
||||
pub async fn ca_connect_1() -> Result<Receiver<Result<FetchItem, Error>>, Error> {
|
||||
let (tx, rx) = bounded(16);
|
||||
let tx2 = tx.clone();
|
||||
tokio::task::spawn(
|
||||
async move {
|
||||
let mut conn = tokio::net::TcpStream::connect("S30CB06-CVME-LLRF2.psi.ch:5064").await?;
|
||||
let (mut inp, mut out) = conn.split();
|
||||
tx.send(Ok(FetchItem::Log(format!("connected")))).await.errstr()?;
|
||||
let mut buf = [0; 64];
|
||||
|
||||
let mut b2 = BytesMut::with_capacity(128);
|
||||
b2.put_u16(0x00);
|
||||
b2.put_u16(0);
|
||||
b2.put_u16(0);
|
||||
b2.put_u16(0xb);
|
||||
b2.put_u32(0);
|
||||
b2.put_u32(0);
|
||||
out.write_all(&b2).await?;
|
||||
tx.send(Ok(FetchItem::Log(format!("written")))).await.errstr()?;
|
||||
let n1 = inp.read(&mut buf).await?;
|
||||
tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf))))
|
||||
.await
|
||||
.errstr()?;
|
||||
|
||||
// Search to get cid:
|
||||
let chn = b"SATCB01-DBPM220:Y2";
|
||||
b2.clear();
|
||||
b2.put_u16(0x06);
|
||||
b2.put_u16((16 + chn.len()) as u16);
|
||||
b2.put_u16(0x00);
|
||||
b2.put_u16(0x0b);
|
||||
b2.put_u32(0x71803472);
|
||||
b2.put_u32(0x71803472);
|
||||
b2.put_slice(chn);
|
||||
out.write_all(&b2).await?;
|
||||
tx.send(Ok(FetchItem::Log(format!("written")))).await.errstr()?;
|
||||
let n1 = inp.read(&mut buf).await?;
|
||||
tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf))))
|
||||
.await
|
||||
.errstr()?;
|
||||
|
||||
Ok::<_, Error>(())
|
||||
}
|
||||
.then({
|
||||
move |item| async move {
|
||||
match item {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
tx2.send(Ok(FetchItem::Log(format!("Seeing error: {:?}", e))))
|
||||
.await
|
||||
.errstr()?;
|
||||
}
|
||||
}
|
||||
Ok::<_, Error>(())
|
||||
}
|
||||
}),
|
||||
);
|
||||
Ok(rx)
|
||||
}
|
||||
143
netfetch/src/netbuf.rs
Normal file
143
netfetch/src/netbuf.rs
Normal file
@@ -0,0 +1,143 @@
|
||||
use err::Error;
|
||||
use tokio::io::ReadBuf;
|
||||
|
||||
pub const BUFCAP: usize = 1024 * 128;
|
||||
pub const RP_REW_PT: usize = 1024 * 64;
|
||||
|
||||
pub struct NetBuf {
|
||||
buf: Vec<u8>,
|
||||
wp: usize,
|
||||
rp: usize,
|
||||
}
|
||||
|
||||
impl NetBuf {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
buf: vec![0; BUFCAP],
|
||||
wp: 0,
|
||||
rp: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.wp - self.rp
|
||||
}
|
||||
|
||||
pub fn cap(&self) -> usize {
|
||||
self.buf.len()
|
||||
}
|
||||
|
||||
pub fn wcap(&self) -> usize {
|
||||
self.buf.len() - self.wp
|
||||
}
|
||||
|
||||
pub fn data(&self) -> &[u8] {
|
||||
&self.buf[self.rp..self.wp]
|
||||
}
|
||||
|
||||
pub fn adv(&mut self, x: usize) -> Result<(), Error> {
|
||||
if self.len() < x {
|
||||
return Err(Error::with_msg_no_trace("not enough bytes"));
|
||||
} else {
|
||||
self.rp += x;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wadv(&mut self, x: usize) -> Result<(), Error> {
|
||||
if self.wcap() < x {
|
||||
return Err(Error::with_msg_no_trace("not enough space"));
|
||||
} else {
|
||||
self.wp += x;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_u8(&mut self) -> Result<u8, Error> {
|
||||
type T = u8;
|
||||
const TS: usize = std::mem::size_of::<T>();
|
||||
if self.len() < TS {
|
||||
return Err(Error::with_msg_no_trace("not enough bytes"));
|
||||
} else {
|
||||
let val = self.buf[self.rp];
|
||||
self.rp += TS;
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_u64(&mut self) -> Result<u64, Error> {
|
||||
type T = u64;
|
||||
const TS: usize = std::mem::size_of::<T>();
|
||||
if self.len() < TS {
|
||||
return Err(Error::with_msg_no_trace("not enough bytes"));
|
||||
} else {
|
||||
let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?);
|
||||
self.rp += TS;
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_bytes(&mut self, n: usize) -> Result<&[u8], Error> {
|
||||
if self.len() < n {
|
||||
return Err(Error::with_msg_no_trace("not enough bytes"));
|
||||
} else {
|
||||
let val = self.buf[self.rp..self.rp + n].as_ref();
|
||||
self.rp += n;
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_buf_for_fill(&mut self) -> ReadBuf {
|
||||
self.rewind_if_needed();
|
||||
let read_buf = ReadBuf::new(&mut self.buf[self.wp..]);
|
||||
read_buf
|
||||
}
|
||||
|
||||
pub fn rewind_if_needed(&mut self) {
|
||||
if self.rp != 0 && self.rp == self.wp {
|
||||
self.rp = 0;
|
||||
self.wp = 0;
|
||||
} else if self.rp > RP_REW_PT {
|
||||
self.buf.copy_within(self.rp..self.wp, 0);
|
||||
self.wp -= self.rp;
|
||||
self.rp = 0;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> {
|
||||
self.rewind_if_needed();
|
||||
if self.wcap() < buf.len() {
|
||||
return Err(Error::with_msg_no_trace("not enough space"));
|
||||
} else {
|
||||
self.buf[self.wp..self.wp + buf.len()].copy_from_slice(buf);
|
||||
self.wp += buf.len();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn put_u8(&mut self, v: u8) -> Result<(), Error> {
|
||||
type T = u8;
|
||||
const TS: usize = std::mem::size_of::<T>();
|
||||
self.rewind_if_needed();
|
||||
if self.wcap() < TS {
|
||||
return Err(Error::with_msg_no_trace("not enough space"));
|
||||
} else {
|
||||
self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes());
|
||||
self.wp += TS;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn put_u64(&mut self, v: u64) -> Result<(), Error> {
|
||||
type T = u64;
|
||||
const TS: usize = std::mem::size_of::<T>();
|
||||
self.rewind_if_needed();
|
||||
if self.wcap() < TS {
|
||||
return Err(Error::with_msg_no_trace("not enough space"));
|
||||
} else {
|
||||
self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes());
|
||||
self.wp += TS;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
6
netfetch/src/netfetch.rs
Normal file
6
netfetch/src/netfetch.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
pub mod bsread;
|
||||
pub mod ca;
|
||||
pub mod netbuf;
|
||||
#[cfg(test)]
|
||||
pub mod test;
|
||||
pub mod zmtp;
|
||||
1
netfetch/src/test.rs
Normal file
1
netfetch/src/test.rs
Normal file
@@ -0,0 +1 @@
|
||||
// TODO
|
||||
827
netfetch/src/zmtp.rs
Normal file
827
netfetch/src/zmtp.rs
Normal file
@@ -0,0 +1,827 @@
|
||||
use crate::bsread::parse_zmtp_message;
|
||||
use crate::bsread::ChannelDesc;
|
||||
use crate::bsread::GlobalTimestamp;
|
||||
use crate::bsread::HeadA;
|
||||
use crate::bsread::HeadB;
|
||||
use crate::netbuf::NetBuf;
|
||||
use crate::netbuf::RP_REW_PT;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
#[allow(unused)]
|
||||
use bytes::BufMut;
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::{pin_mut, StreamExt};
|
||||
use log::*;
|
||||
use netpod::timeunits::*;
|
||||
use scylla::batch::Consistency;
|
||||
use serde_json::Value as JsVal;
|
||||
use std::ffi::CStr;
|
||||
use std::fmt;
|
||||
use std::mem;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
#[allow(unused)]
|
||||
fn test_listen() -> Result<(), Error> {
|
||||
use std::time::Duration;
|
||||
let fut = async move {
|
||||
let _ = tokio::time::timeout(Duration::from_millis(16000), futures_util::future::ready(0u32)).await;
|
||||
Ok::<_, Error>(())
|
||||
};
|
||||
taskrun::run(fut)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn test_service() -> Result<(), Error> {
|
||||
let fut = async move {
|
||||
let sock = tokio::net::TcpListener::bind("0.0.0.0:9999").await?;
|
||||
loop {
|
||||
info!("accepting...");
|
||||
let (conn, remote) = sock.accept().await?;
|
||||
info!("new connection from {:?}", remote);
|
||||
let mut zmtp = Zmtp::new(conn, SocketType::PUSH);
|
||||
let fut = async move {
|
||||
while let Some(item) = zmtp.next().await {
|
||||
info!("item from {:?} {:?}", remote, item);
|
||||
}
|
||||
Ok::<_, Error>(())
|
||||
};
|
||||
taskrun::spawn(fut);
|
||||
}
|
||||
};
|
||||
taskrun::run(fut)
|
||||
}
|
||||
|
||||
pub async fn zmtp_client(addr: &str, rcvbuf: Option<u32>) -> Result<(), Error> {
|
||||
let mut conn = tokio::net::TcpStream::connect(addr).await?;
|
||||
if let Some(v) = rcvbuf {
|
||||
set_rcv_sock_opts(&mut conn, v)?;
|
||||
}
|
||||
let mut zmtp = Zmtp::new(conn, SocketType::PULL);
|
||||
let mut i1 = 0u64;
|
||||
let mut msgc = 0u64;
|
||||
let mut vals = vec![];
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_node("127.0.0.1:19042")
|
||||
.default_consistency(Consistency::One)
|
||||
.build()
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let qu1 = scy
|
||||
.prepare("insert into ks1.pulse (tsA, tsB, pulse) values (?, ?, ?) using ttl 120")
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
while let Some(item) = zmtp.next().await {
|
||||
match item {
|
||||
Ok(ev) => match ev {
|
||||
ZmtpEvent::ZmtpCommand(cmd) => {
|
||||
info!("{:?}", cmd);
|
||||
}
|
||||
ZmtpEvent::ZmtpMessage(msg) => {
|
||||
msgc += 1;
|
||||
trace!("Message frames: {}", msg.frames.len());
|
||||
match parse_zmtp_message(&msg) {
|
||||
Ok(bm) => {
|
||||
trace!("{:?}", bm);
|
||||
trace!("len A {} len B {}", bm.head_b.channels.len(), bm.values.len());
|
||||
let mut i3 = u32::MAX;
|
||||
for (i, ch) in bm.head_b.channels.iter().enumerate() {
|
||||
if ch.name == "SINEG01-RLLE-STA:MASTER-EVRPULSEID" {
|
||||
i3 = i as u32;
|
||||
}
|
||||
}
|
||||
if i3 < u32::MAX {
|
||||
trace!("insert value frame {}", i3);
|
||||
let i4 = 2 * i3 + 2;
|
||||
if i4 >= msg.frames.len() as u32 {
|
||||
} else {
|
||||
let fr = &msg.frames[i4 as usize];
|
||||
trace!("data len {}", fr.data.len());
|
||||
let pulse_f64 = f64::from_be_bytes(fr.data[..].try_into().unwrap());
|
||||
trace!("pulse_f64 {pulse_f64}");
|
||||
let pulse = pulse_f64 as u64;
|
||||
if false {
|
||||
// TODO this next frame should be described somehow in the json header or?
|
||||
info!("next val len {}", msg.frames[i4 as usize + 1].data.len());
|
||||
let ts_a = u64::from_be_bytes(
|
||||
msg.frames[i4 as usize + 1].data[0..8].try_into().unwrap(),
|
||||
);
|
||||
let ts_b = u64::from_be_bytes(
|
||||
msg.frames[i4 as usize + 1].data[8..16].try_into().unwrap(),
|
||||
);
|
||||
info!("ts_a {ts_a} ts_b {ts_b}");
|
||||
}
|
||||
vals.push((bm.head_a.global_timestamp.sec, bm.head_a.global_timestamp.ns, pulse));
|
||||
if vals.len() >= 20 {
|
||||
for &(sec, ns, pulse) in &vals {
|
||||
scy.execute(&qu1, (sec as i32, ns as i32, pulse as i64))
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
}
|
||||
vals.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{}", e);
|
||||
for frame in &msg.frames {
|
||||
info!("Frame: {:?}", frame);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("{}", e);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
i1 += 1;
|
||||
if false && i1 > 10000 {
|
||||
break;
|
||||
}
|
||||
if false && msgc > 10000 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> {
|
||||
use std::mem::size_of;
|
||||
use std::os::unix::prelude::AsRawFd;
|
||||
let fd = conn.as_raw_fd();
|
||||
unsafe {
|
||||
type N = libc::c_int;
|
||||
let n: N = rcvbuf as _;
|
||||
let ec = libc::setsockopt(
|
||||
fd,
|
||||
libc::SOL_SOCKET,
|
||||
libc::SO_RCVBUF,
|
||||
&n as *const N as _,
|
||||
size_of::<N>() as _,
|
||||
);
|
||||
if ec != 0 {
|
||||
error!("ec {ec}");
|
||||
if ec != 0 {
|
||||
return Err(Error::with_msg_no_trace(format!("can not set socket option")));
|
||||
}
|
||||
}
|
||||
}
|
||||
unsafe {
|
||||
type N = libc::c_int;
|
||||
let mut n: N = -1;
|
||||
let mut l = size_of::<N>() as libc::socklen_t;
|
||||
let ec = libc::getsockopt(
|
||||
fd,
|
||||
libc::SOL_SOCKET,
|
||||
libc::SO_RCVBUF,
|
||||
&mut n as *mut N as _,
|
||||
&mut l as _,
|
||||
);
|
||||
if ec != 0 {
|
||||
let errno = *libc::__errno_location();
|
||||
let es = CStr::from_ptr(libc::strerror(errno));
|
||||
warn!("can not query socket option ec {ec} errno {errno} es {es:?}");
|
||||
error!("can not query socket option");
|
||||
} else {
|
||||
info!("SO_RCVBUF {n}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
enum ConnState {
|
||||
InitSend,
|
||||
InitRecv1,
|
||||
InitRecv2,
|
||||
InitRecv3,
|
||||
InitRecv4,
|
||||
InitRecv5,
|
||||
ReadFrameFlags,
|
||||
ReadFrameShort,
|
||||
ReadFrameLong,
|
||||
ReadFrameBody(usize),
|
||||
}
|
||||
|
||||
impl ConnState {
|
||||
fn need_min(&self) -> usize {
|
||||
use ConnState::*;
|
||||
match self {
|
||||
InitSend => 0,
|
||||
InitRecv1 => 1,
|
||||
InitRecv2 => 9,
|
||||
InitRecv3 => 1,
|
||||
InitRecv4 => 1,
|
||||
InitRecv5 => 52,
|
||||
ReadFrameFlags => 1,
|
||||
ReadFrameShort => 1,
|
||||
ReadFrameLong => 8,
|
||||
ReadFrameBody(msglen) => *msglen,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum SocketType {
|
||||
PUSH,
|
||||
PULL,
|
||||
}
|
||||
|
||||
pub struct Zmtp {
|
||||
done: bool,
|
||||
complete: bool,
|
||||
socket_type: SocketType,
|
||||
conn: TcpStream,
|
||||
conn_state: ConnState,
|
||||
buf: NetBuf,
|
||||
outbuf: NetBuf,
|
||||
out_enable: bool,
|
||||
msglen: usize,
|
||||
has_more: bool,
|
||||
is_command: bool,
|
||||
peer_ver: (u8, u8),
|
||||
frames: Vec<ZmtpFrame>,
|
||||
inp_eof: bool,
|
||||
data_tx: Sender<u32>,
|
||||
data_rx: Receiver<u32>,
|
||||
}
|
||||
|
||||
impl Zmtp {
|
||||
fn new(conn: TcpStream, socket_type: SocketType) -> Self {
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
Self {
|
||||
done: false,
|
||||
complete: false,
|
||||
socket_type,
|
||||
conn,
|
||||
conn_state: ConnState::InitSend,
|
||||
buf: NetBuf::new(),
|
||||
outbuf: NetBuf::new(),
|
||||
out_enable: false,
|
||||
msglen: 0,
|
||||
has_more: false,
|
||||
is_command: false,
|
||||
peer_ver: (0, 0),
|
||||
frames: vec![],
|
||||
inp_eof: false,
|
||||
data_tx: tx,
|
||||
data_rx: rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn out_channel(&self) -> Sender<u32> {
|
||||
self.data_tx.clone()
|
||||
}
|
||||
|
||||
fn inpbuf_conn(&mut self) -> (&mut TcpStream, ReadBuf) {
|
||||
(&mut self.conn, self.buf.read_buf_for_fill())
|
||||
}
|
||||
|
||||
fn outbuf_conn(&mut self) -> (&[u8], &mut TcpStream) {
|
||||
(self.outbuf.data(), &mut self.conn)
|
||||
}
|
||||
|
||||
fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Option<Poll<Result<ZmtpEvent, Error>>> {
|
||||
use Poll::*;
|
||||
let mut item_count = 0;
|
||||
let serialized: Int<Result<(), Error>> = if self.out_enable && self.outbuf.wcap() >= RP_REW_PT {
|
||||
match self.data_rx.poll_next_unpin(cx) {
|
||||
Ready(Some(_item)) => {
|
||||
// TODO item should be something that we can convert into a zmtp message.
|
||||
Int::Empty
|
||||
}
|
||||
Ready(None) => Int::Done,
|
||||
Pending => Int::Pend,
|
||||
}
|
||||
} else {
|
||||
Int::NoWork
|
||||
};
|
||||
item_count += serialized.item_count();
|
||||
let write: Int<Result<(), _>> = if item_count > 0 {
|
||||
Int::NoWork
|
||||
} else if self.outbuf.len() > 0 {
|
||||
let (b, w) = self.outbuf_conn();
|
||||
pin_mut!(w);
|
||||
match w.poll_write(cx, b) {
|
||||
Ready(k) => match k {
|
||||
Ok(k) => match self.outbuf.adv(k) {
|
||||
Ok(()) => {
|
||||
info!("sent {} bytes", k);
|
||||
self.outbuf.rewind_if_needed();
|
||||
Int::Empty
|
||||
}
|
||||
Err(e) => {
|
||||
error!("advance error {:?}", e);
|
||||
Int::Item(Err(e))
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("output write error {:?}", e);
|
||||
Int::Item(Err(e.into()))
|
||||
}
|
||||
},
|
||||
Pending => Int::Pend,
|
||||
}
|
||||
} else {
|
||||
Int::NoWork
|
||||
};
|
||||
match write {
|
||||
Int::NoWork => {}
|
||||
_ => {
|
||||
info!("write result: {:?} {}", write, self.outbuf.len());
|
||||
}
|
||||
}
|
||||
item_count += write.item_count();
|
||||
let read: Int<Result<(), _>> = if item_count > 0 || self.inp_eof {
|
||||
Int::NoWork
|
||||
} else {
|
||||
if self.buf.cap() < self.conn_state.need_min() {
|
||||
self.done = true;
|
||||
let e = Error::with_msg_no_trace(format!(
|
||||
"buffer too small for need_min {} {}",
|
||||
self.buf.cap(),
|
||||
self.conn_state.need_min()
|
||||
));
|
||||
Int::Item(Err(e))
|
||||
} else if self.buf.len() < self.conn_state.need_min() {
|
||||
let (w, mut rbuf) = self.inpbuf_conn();
|
||||
pin_mut!(w);
|
||||
match w.poll_read(cx, &mut rbuf) {
|
||||
Ready(k) => match k {
|
||||
Ok(()) => {
|
||||
let nf = rbuf.filled().len();
|
||||
if nf == 0 {
|
||||
info!("EOF");
|
||||
self.inp_eof = true;
|
||||
Int::Done
|
||||
} else {
|
||||
trace!("received {} bytes", rbuf.filled().len());
|
||||
if false {
|
||||
let t = rbuf.filled().len();
|
||||
let t = if t < 32 { t } else { 32 };
|
||||
trace!("got data {:?}", &rbuf.filled()[0..t]);
|
||||
}
|
||||
match self.buf.wadv(nf) {
|
||||
Ok(()) => Int::Empty,
|
||||
Err(e) => Int::Item(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => Int::Item(Err(e.into())),
|
||||
},
|
||||
Pending => Int::Pend,
|
||||
}
|
||||
} else {
|
||||
Int::NoWork
|
||||
}
|
||||
};
|
||||
item_count += read.item_count();
|
||||
let parsed = if item_count > 0 || self.buf.len() < self.conn_state.need_min() {
|
||||
Int::NoWork
|
||||
} else {
|
||||
match self.parse_item() {
|
||||
Ok(k) => match k {
|
||||
Some(k) => Int::Item(Ok(k)),
|
||||
None => Int::Empty,
|
||||
},
|
||||
Err(e) => Int::Item(Err(e)),
|
||||
}
|
||||
};
|
||||
item_count += parsed.item_count();
|
||||
let _ = item_count;
|
||||
{
|
||||
use Int::*;
|
||||
match (serialized, write, read, parsed) {
|
||||
(NoWork | Done, NoWork | Done, NoWork | Done, NoWork | Done) => {
|
||||
warn!("all NoWork or Done");
|
||||
return Some(Pending);
|
||||
}
|
||||
(Item(Err(e)), _, _, _) => {
|
||||
self.done = true;
|
||||
return Some(Ready(Err(e)));
|
||||
}
|
||||
(_, Item(Err(e)), _, _) => {
|
||||
self.done = true;
|
||||
return Some(Ready(Err(e)));
|
||||
}
|
||||
(_, _, Item(Err(e)), _) => {
|
||||
self.done = true;
|
||||
return Some(Ready(Err(e)));
|
||||
}
|
||||
(_, _, _, Item(Err(e))) => {
|
||||
self.done = true;
|
||||
return Some(Ready(Err(e)));
|
||||
}
|
||||
(Item(_), _, _, _) => {
|
||||
return None;
|
||||
}
|
||||
(_, Item(_), _, _) => {
|
||||
return None;
|
||||
}
|
||||
(_, _, Item(_), _) => {
|
||||
return None;
|
||||
}
|
||||
(_, _, _, Item(Ok(item))) => {
|
||||
return Some(Ready(Ok(item)));
|
||||
}
|
||||
(Empty, _, _, _) => return None,
|
||||
(_, Empty, _, _) => return None,
|
||||
(_, _, Empty, _) => return None,
|
||||
(_, _, _, Empty) => return None,
|
||||
#[allow(unreachable_patterns)]
|
||||
(Pend, Pend | NoWork | Done, Pend | NoWork | Done, Pend | NoWork | Done) => return Some(Pending),
|
||||
#[allow(unreachable_patterns)]
|
||||
(Pend | NoWork | Done, Pend, Pend | NoWork | Done, Pend | NoWork | Done) => return Some(Pending),
|
||||
#[allow(unreachable_patterns)]
|
||||
(Pend | NoWork | Done, Pend | NoWork | Done, Pend, Pend | NoWork | Done) => return Some(Pending),
|
||||
#[allow(unreachable_patterns)]
|
||||
(Pend | NoWork | Done, Pend | NoWork | Done, Pend | NoWork | Done, Pend) => return Some(Pending),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn parse_item(&mut self) -> Result<Option<ZmtpEvent>, Error> {
|
||||
match self.conn_state {
|
||||
ConnState::InitSend => {
|
||||
info!("parse_item InitSend");
|
||||
self.outbuf.put_slice(&[0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0])?;
|
||||
self.conn_state = ConnState::InitRecv1;
|
||||
Ok(None)
|
||||
}
|
||||
ConnState::InitRecv1 => {
|
||||
let b = self.buf.read_u8()?;
|
||||
if b != 0xff {
|
||||
Err(Error::with_msg_no_trace(format!("InitRecv1 peer is not zmtp 3.x")))
|
||||
} else {
|
||||
self.conn_state = ConnState::InitRecv2;
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
ConnState::InitRecv2 => {
|
||||
self.buf.adv(8)?;
|
||||
let b = self.buf.read_u8()?;
|
||||
if b & 0x01 != 1 {
|
||||
Err(Error::with_msg_no_trace(format!("InitRecv2 peer is not zmtp 3.x")))
|
||||
} else {
|
||||
self.conn_state = ConnState::InitRecv3;
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
ConnState::InitRecv3 => {
|
||||
let maj = self.buf.read_u8()?;
|
||||
if maj != 3 {
|
||||
Err(Error::with_msg_no_trace(format!("InitRecv3 peer is not zmtp 3.x")))
|
||||
} else {
|
||||
self.peer_ver.0 = maj;
|
||||
self.outbuf.put_slice(&[0x4e, 0x55, 0x4c, 0x4c])?;
|
||||
let a = vec![0; 48];
|
||||
self.outbuf.put_slice(&a)?;
|
||||
self.conn_state = ConnState::InitRecv4;
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
ConnState::InitRecv4 => {
|
||||
let minver = self.buf.read_u8()?;
|
||||
if minver > 1 {
|
||||
Err(Error::with_msg_no_trace(format!(
|
||||
"InitRecv3 peer is not zmtp 3.0 or 3.1"
|
||||
)))
|
||||
} else {
|
||||
self.peer_ver.1 = minver;
|
||||
info!("InitRecv4 peer version {:?}", self.peer_ver);
|
||||
self.conn_state = ConnState::InitRecv5;
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
ConnState::InitRecv5 => {
|
||||
{
|
||||
let b2 = self.buf.read_bytes(20)?;
|
||||
let mut i = 0;
|
||||
while i < b2.len() && b2[i] != 0 {
|
||||
i += 1;
|
||||
}
|
||||
if i >= b2.len() {
|
||||
return Err(Error::with_msg_no_trace(format!("InitRecv5 bad mechanism from peer")));
|
||||
} else {
|
||||
let sec = String::from_utf8(b2[..i].to_vec())?;
|
||||
info!("Peer security mechanism {} [{}]", sec.len(), sec);
|
||||
}
|
||||
}
|
||||
self.buf.adv(32)?;
|
||||
match self.socket_type {
|
||||
SocketType::PUSH => {
|
||||
self.outbuf
|
||||
.put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PUSH"[..])?;
|
||||
}
|
||||
SocketType::PULL => {
|
||||
self.outbuf
|
||||
.put_slice(&b"\x04\x1a\x05READY\x0bSocket-Type\x00\x00\x00\x04PULL"[..])?;
|
||||
}
|
||||
}
|
||||
self.out_enable = true;
|
||||
self.conn_state = ConnState::ReadFrameFlags;
|
||||
Ok(None)
|
||||
}
|
||||
ConnState::ReadFrameFlags => {
|
||||
let flags = self.buf.read_u8()?;
|
||||
let has_more = flags & 0x01 != 0;
|
||||
let long_size = flags & 0x02 != 0;
|
||||
let is_command = flags & 0x04 != 0;
|
||||
self.has_more = has_more;
|
||||
self.is_command = is_command;
|
||||
trace!(
|
||||
"parse_item ReadFrameFlags has_more {} long_size {} is_command {}",
|
||||
has_more,
|
||||
long_size,
|
||||
is_command
|
||||
);
|
||||
if long_size {
|
||||
self.conn_state = ConnState::ReadFrameLong;
|
||||
} else {
|
||||
self.conn_state = ConnState::ReadFrameShort;
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
ConnState::ReadFrameShort => {
|
||||
self.msglen = self.buf.read_u8()? as usize;
|
||||
trace!("parse_item ReadFrameShort msglen {}", self.msglen);
|
||||
self.conn_state = ConnState::ReadFrameBody(self.msglen);
|
||||
if self.msglen > 1024 * 64 {
|
||||
return Err(Error::with_msg_no_trace(format!(
|
||||
"larger msglen not yet supported {}",
|
||||
self.msglen,
|
||||
)));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
ConnState::ReadFrameLong => {
|
||||
self.msglen = self.buf.read_u64()? as usize;
|
||||
trace!("parse_item ReadFrameShort msglen {}", self.msglen);
|
||||
self.conn_state = ConnState::ReadFrameBody(self.msglen);
|
||||
if self.msglen > 1024 * 64 {
|
||||
return Err(Error::with_msg_no_trace(format!(
|
||||
"larger msglen not yet supported {}",
|
||||
self.msglen,
|
||||
)));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
ConnState::ReadFrameBody(msglen) => {
|
||||
// TODO do not copy here...
|
||||
let data = self.buf.read_bytes(msglen)?.to_vec();
|
||||
self.msglen = 0;
|
||||
if false {
|
||||
let n1 = data.len().min(256);
|
||||
let s = String::from_utf8_lossy(&data[..n1]);
|
||||
trace!("parse_item ReadFrameBody msglen {} string {}", msglen, s);
|
||||
}
|
||||
self.conn_state = ConnState::ReadFrameFlags;
|
||||
if self.is_command {
|
||||
info!("command data {:?}", data);
|
||||
if data.len() >= 7 {
|
||||
if &data[0..5] == b"\x04PING" {
|
||||
if data.len() > 32 {
|
||||
// TODO close connection?
|
||||
error!("Oversized PING");
|
||||
} else {
|
||||
let ttl = u16::from_be_bytes(data[5..7].try_into().unwrap());
|
||||
let ctx = &data[7..];
|
||||
info!("GOT PING ttl {ttl} ctx.len {}", ctx.len());
|
||||
if self.outbuf.wcap() < data.len() {
|
||||
error!("can not respond with PONG because output buffer full");
|
||||
} else {
|
||||
let size = 5 + ctx.len() as u8;
|
||||
self.outbuf.put_u8(0x04).unwrap();
|
||||
self.outbuf.put_u8(size).unwrap();
|
||||
self.outbuf.put_slice(b"\x04PONG").unwrap();
|
||||
self.outbuf.put_slice(ctx).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let g = ZmtpFrame {
|
||||
msglen: self.msglen,
|
||||
has_more: self.has_more,
|
||||
is_command: self.is_command,
|
||||
data,
|
||||
};
|
||||
self.frames.clear();
|
||||
Ok(Some(ZmtpEvent::ZmtpCommand(g)))
|
||||
} else {
|
||||
let g = ZmtpFrame {
|
||||
msglen: self.msglen,
|
||||
has_more: self.has_more,
|
||||
is_command: self.is_command,
|
||||
data,
|
||||
};
|
||||
self.frames.push(g);
|
||||
if self.has_more {
|
||||
Ok(None)
|
||||
} else {
|
||||
let g = ZmtpMessage {
|
||||
frames: mem::replace(&mut self.frames, vec![]),
|
||||
};
|
||||
Ok(Some(ZmtpEvent::ZmtpMessage(g)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ZmtpMessage {
|
||||
frames: Vec<ZmtpFrame>,
|
||||
}
|
||||
|
||||
impl ZmtpMessage {
|
||||
pub fn frames(&self) -> &Vec<ZmtpFrame> {
|
||||
&self.frames
|
||||
}
|
||||
|
||||
pub fn emit_to_buffer(&self, out: &mut NetBuf) -> Result<(), Error> {
|
||||
let n = self.frames.len();
|
||||
for (i, fr) in self.frames.iter().enumerate() {
|
||||
let mut flags: u8 = 2;
|
||||
if i < n - 1 {
|
||||
flags |= 1;
|
||||
}
|
||||
out.put_u8(flags)?;
|
||||
out.put_u64(fr.data().len() as u64)?;
|
||||
out.put_slice(fr.data())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ZmtpFrame {
|
||||
msglen: usize,
|
||||
has_more: bool,
|
||||
is_command: bool,
|
||||
data: Vec<u8>,
|
||||
}
|
||||
|
||||
impl ZmtpFrame {
|
||||
pub fn data(&self) -> &[u8] {
|
||||
&self.data
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for ZmtpFrame {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let data = match String::from_utf8(self.data.clone()) {
|
||||
Ok(s) => s
|
||||
.chars()
|
||||
.take(32)
|
||||
.filter(|x| {
|
||||
//
|
||||
x.is_ascii_alphanumeric() || x.is_ascii_punctuation() || x.is_ascii_whitespace()
|
||||
})
|
||||
.collect::<String>(),
|
||||
Err(_) => format!("Binary {{ len: {} }}", self.data.len()),
|
||||
};
|
||||
f.debug_struct("ZmtpFrame")
|
||||
.field("msglen", &self.msglen)
|
||||
.field("has_more", &self.has_more)
|
||||
.field("is_command", &self.is_command)
|
||||
.field("data.len", &self.data.len())
|
||||
.field("data", &data)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
enum Int<T> {
|
||||
NoWork,
|
||||
Pend,
|
||||
Empty,
|
||||
Item(T),
|
||||
Done,
|
||||
}
|
||||
|
||||
impl<T> Int<T> {
|
||||
fn item_count(&self) -> u32 {
|
||||
if let Int::Item(_) = self {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for Int<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
Self::NoWork => write!(f, "NoWork"),
|
||||
Self::Pend => write!(f, "Pend"),
|
||||
Self::Empty => write!(f, "Empty"),
|
||||
Self::Item(_) => write!(f, "Item"),
|
||||
Self::Done => write!(f, "Done"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ZmtpEvent {
|
||||
ZmtpCommand(ZmtpFrame),
|
||||
ZmtpMessage(ZmtpMessage),
|
||||
}
|
||||
|
||||
impl Stream for Zmtp {
|
||||
type Item = Result<ZmtpEvent, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
if self.complete {
|
||||
panic!("poll_next on complete")
|
||||
} else if self.done {
|
||||
self.complete = true;
|
||||
return Ready(None);
|
||||
} else {
|
||||
loop {
|
||||
match Self::loop_body(self.as_mut(), cx) {
|
||||
Some(Ready(k)) => break Ready(Some(k)),
|
||||
Some(Pending) => break Pending,
|
||||
None => continue,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
struct DummyData {
|
||||
ts: u64,
|
||||
pulse: u64,
|
||||
value: i64,
|
||||
}
|
||||
|
||||
impl DummyData {
|
||||
#[allow(unused)]
|
||||
fn make_zmtp_msg(&self) -> Result<ZmtpMessage, Error> {
|
||||
let head_b = HeadB {
|
||||
htype: "bsr_d-1.1".into(),
|
||||
channels: vec![ChannelDesc {
|
||||
name: "TESTCHAN".into(),
|
||||
ty: "int64".into(),
|
||||
shape: JsVal::Array(vec![JsVal::Number(serde_json::Number::from(1))]),
|
||||
encoding: "little".into(),
|
||||
}],
|
||||
};
|
||||
let hb = serde_json::to_vec(&head_b).unwrap();
|
||||
use md5::Digest;
|
||||
let mut h = md5::Md5::new();
|
||||
h.update(&hb);
|
||||
let mut md5hex = String::with_capacity(32);
|
||||
for c in h.finalize() {
|
||||
use fmt::Write;
|
||||
write!(&mut md5hex, "{:02x}", c).unwrap();
|
||||
}
|
||||
let head_a = HeadA {
|
||||
htype: "bsr_m-1.1".into(),
|
||||
hash: md5hex,
|
||||
pulse_id: serde_json::Number::from(self.pulse),
|
||||
global_timestamp: GlobalTimestamp {
|
||||
sec: self.ts / SEC,
|
||||
ns: self.ts % SEC,
|
||||
},
|
||||
};
|
||||
// TODO write directly to output buffer.
|
||||
let ha = serde_json::to_vec(&head_a).unwrap();
|
||||
let hf = self.value.to_le_bytes().to_vec();
|
||||
let hp = [(self.ts / SEC).to_be_bytes(), (self.ts % SEC).to_be_bytes()].concat();
|
||||
let mut msg = ZmtpMessage { frames: vec![] };
|
||||
let fr = ZmtpFrame {
|
||||
msglen: 0,
|
||||
has_more: false,
|
||||
is_command: false,
|
||||
data: ha,
|
||||
};
|
||||
msg.frames.push(fr);
|
||||
let fr = ZmtpFrame {
|
||||
msglen: 0,
|
||||
has_more: false,
|
||||
is_command: false,
|
||||
data: hb,
|
||||
};
|
||||
msg.frames.push(fr);
|
||||
let fr = ZmtpFrame {
|
||||
msglen: 0,
|
||||
has_more: false,
|
||||
is_command: false,
|
||||
data: hf,
|
||||
};
|
||||
msg.frames.push(fr);
|
||||
let fr = ZmtpFrame {
|
||||
msglen: 0,
|
||||
has_more: false,
|
||||
is_command: false,
|
||||
data: hp,
|
||||
};
|
||||
msg.frames.push(fr);
|
||||
Ok(msg)
|
||||
}
|
||||
}
|
||||
2
rustfmt.toml
Normal file
2
rustfmt.toml
Normal file
@@ -0,0 +1,2 @@
|
||||
max_width = 120
|
||||
#control_brace_style = "ClosingNextLine"
|
||||
Reference in New Issue
Block a user