Read first event of pb file
This commit is contained in:
@@ -38,6 +38,16 @@ pub async fn channel_info(channel: &Channel, node_config: &NodeConfigCached) ->
|
||||
let mut pbr = PbFileReader::new(f1).await;
|
||||
pbr.read_header().await?;
|
||||
msgs.push(format!("got header {}", pbr.channel_name()));
|
||||
let ev = pbr.read_msg().await;
|
||||
match ev {
|
||||
Ok(item) => {
|
||||
msgs.push(format!("got event {:?}", item));
|
||||
}
|
||||
Err(e) => {
|
||||
msgs.push(format!("can not read event! {:?}", e));
|
||||
}
|
||||
}
|
||||
msgs.push(format!("got header {}", pbr.channel_name()));
|
||||
}
|
||||
}
|
||||
let ret = ChannelInfo {
|
||||
|
||||
@@ -8,8 +8,11 @@ pub mod generated {}
|
||||
pub mod parse;
|
||||
#[cfg(not(feature = "devread"))]
|
||||
pub mod parsestub;
|
||||
use items::eventvalues::EventValues;
|
||||
use items::waveevents::WaveEvents;
|
||||
#[cfg(not(feature = "devread"))]
|
||||
pub use parsestub as parse;
|
||||
|
||||
pub mod events;
|
||||
#[cfg(feature = "devread")]
|
||||
#[cfg(test)]
|
||||
@@ -38,3 +41,17 @@ fn unescape_archapp_msg(inp: &[u8]) -> Result<Vec<u8>, Error> {
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum EventsItem {
|
||||
ScalarByte(EventValues<i32>),
|
||||
ScalarShort(EventValues<i32>),
|
||||
ScalarInt(EventValues<i32>),
|
||||
ScalarFloat(EventValues<f32>),
|
||||
ScalarDouble(EventValues<f64>),
|
||||
WaveByte(WaveEvents<i32>),
|
||||
WaveShort(WaveEvents<i32>),
|
||||
WaveInt(WaveEvents<i32>),
|
||||
WaveFloat(WaveEvents<f32>),
|
||||
WaveDouble(WaveEvents<f64>),
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
use crate::generated::EPICSEvent::PayloadType;
|
||||
use crate::unescape_archapp_msg;
|
||||
use crate::{unescape_archapp_msg, EventsItem};
|
||||
use archapp_xc::*;
|
||||
use async_channel::{bounded, Receiver};
|
||||
use err::Error;
|
||||
use items::eventvalues::EventValues;
|
||||
use items::waveevents::WaveEvents;
|
||||
use netpod::log::*;
|
||||
use netpod::NodeConfigCached;
|
||||
use protobuf::Message;
|
||||
@@ -20,6 +22,8 @@ pub struct PbFileReader {
|
||||
rp: usize,
|
||||
channel_name: String,
|
||||
payload_type: PayloadType,
|
||||
year: u32,
|
||||
month: u32,
|
||||
}
|
||||
|
||||
impl PbFileReader {
|
||||
@@ -31,6 +35,8 @@ impl PbFileReader {
|
||||
rp: 0,
|
||||
channel_name: String::new(),
|
||||
payload_type: PayloadType::V4_GENERIC_BYTES,
|
||||
year: 0,
|
||||
month: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,11 +49,15 @@ impl PbFileReader {
|
||||
.map_err(|_| Error::with_msg("can not parse PayloadInfo"))?;
|
||||
self.channel_name = payload_info.get_pvname().into();
|
||||
self.payload_type = payload_info.get_field_type();
|
||||
self.year = payload_info.get_year() as u32;
|
||||
|
||||
// TODO only the year in the pb?
|
||||
self.month = 0;
|
||||
self.rp = k + 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn read_msg(&mut self) -> Result<(), Error> {
|
||||
pub async fn read_msg(&mut self) -> Result<EventsItem, Error> {
|
||||
self.fill_buf().await?;
|
||||
let k = self.find_next_nl()?;
|
||||
let buf = &mut self.buf;
|
||||
@@ -56,10 +66,71 @@ impl PbFileReader {
|
||||
// Handle the different types.
|
||||
// Must anyways reuse the Events NTY types. Where are they?
|
||||
// Attempt with big enum...
|
||||
let msg = crate::generated::EPICSEvent::VectorFloat::parse_from_bytes(&m)
|
||||
.map_err(|_| Error::with_msg("can not parse VectorFloat"))?;
|
||||
use PayloadType::*;
|
||||
let ei = match self.payload_type {
|
||||
SCALAR_INT => {
|
||||
let msg = crate::generated::EPICSEvent::ScalarInt::parse_from_bytes(&m)
|
||||
.map_err(|_| Error::with_msg("can not parse ScalarInt"))?;
|
||||
let mut t = EventValues::<i32> {
|
||||
tss: vec![],
|
||||
values: vec![],
|
||||
};
|
||||
// TODO Translate by the file-time-offset.
|
||||
let ts = msg.get_secondsintoyear() as u64;
|
||||
let v = msg.get_val();
|
||||
t.tss.push(ts);
|
||||
t.values.push(v);
|
||||
EventsItem::ScalarInt(t)
|
||||
}
|
||||
SCALAR_DOUBLE => {
|
||||
let msg = crate::generated::EPICSEvent::ScalarDouble::parse_from_bytes(&m)
|
||||
.map_err(|_| Error::with_msg("can not parse ScalarDouble"))?;
|
||||
let mut t = EventValues::<f64> {
|
||||
tss: vec![],
|
||||
values: vec![],
|
||||
};
|
||||
// TODO Translate by the file-time-offset.
|
||||
let ts = msg.get_secondsintoyear() as u64;
|
||||
let v = msg.get_val();
|
||||
t.tss.push(ts);
|
||||
t.values.push(v);
|
||||
EventsItem::ScalarDouble(t)
|
||||
}
|
||||
WAVEFORM_FLOAT => {
|
||||
let msg = crate::generated::EPICSEvent::VectorFloat::parse_from_bytes(&m)
|
||||
.map_err(|_| Error::with_msg("can not parse VectorFloat"))?;
|
||||
// TODO homogenize struct and field names w.r.t. EventValues:
|
||||
let mut t = WaveEvents::<f32> {
|
||||
tss: vec![],
|
||||
vals: vec![],
|
||||
};
|
||||
// TODO Translate by the file-time-offset.
|
||||
let ts = msg.get_secondsintoyear() as u64;
|
||||
let v = msg.get_val().to_vec();
|
||||
t.tss.push(ts);
|
||||
t.vals.push(v);
|
||||
EventsItem::WaveFloat(t)
|
||||
}
|
||||
WAVEFORM_DOUBLE => {
|
||||
let msg = crate::generated::EPICSEvent::VectorDouble::parse_from_bytes(&m)
|
||||
.map_err(|_| Error::with_msg("can not parse VectorDouble"))?;
|
||||
let mut t = WaveEvents::<f64> {
|
||||
tss: vec![],
|
||||
vals: vec![],
|
||||
};
|
||||
// TODO Translate by the file-time-offset.
|
||||
let ts = msg.get_secondsintoyear() as u64;
|
||||
let v = msg.get_val().to_vec();
|
||||
t.tss.push(ts);
|
||||
t.vals.push(v);
|
||||
EventsItem::WaveDouble(t)
|
||||
}
|
||||
_ => {
|
||||
return Err(Error::with_msg(format!("not supported: {:?}", self.payload_type)));
|
||||
}
|
||||
};
|
||||
self.rp = k + 1;
|
||||
Ok(())
|
||||
Ok(ei)
|
||||
}
|
||||
|
||||
async fn fill_buf(&mut self) -> Result<(), Error> {
|
||||
|
||||
@@ -669,10 +669,19 @@ pub async fn archapp_channel_info(req: Request<Body>, node_config: &NodeConfigCa
|
||||
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||
let pairs = get_url_query_pairs(&url);
|
||||
let channel = channel_from_pairs(&pairs)?;
|
||||
let res = archapp_wrap::channel_info(&channel, node_config).await?;
|
||||
let buf = serde_json::to_vec(&res)?;
|
||||
let ret = response(StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, APP_JSON)
|
||||
.body(Body::from(buf))?;
|
||||
Ok(ret)
|
||||
match archapp_wrap::channel_info(&channel, node_config).await {
|
||||
Ok(res) => {
|
||||
let buf = serde_json::to_vec(&res)?;
|
||||
let ret = response(StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, APP_JSON)
|
||||
.body(Body::from(buf))?;
|
||||
Ok(ret)
|
||||
}
|
||||
Err(e) => {
|
||||
let ret = response(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.header(http::header::CONTENT_TYPE, "text/text")
|
||||
.body(Body::from(format!("{:?}", e)))?;
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user