diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 485507b..048d3b5 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -38,6 +38,16 @@ pub async fn channel_info(channel: &Channel, node_config: &NodeConfigCached) -> let mut pbr = PbFileReader::new(f1).await; pbr.read_header().await?; msgs.push(format!("got header {}", pbr.channel_name())); + let ev = pbr.read_msg().await; + match ev { + Ok(item) => { + msgs.push(format!("got event {:?}", item)); + } + Err(e) => { + msgs.push(format!("can not read event! {:?}", e)); + } + } + msgs.push(format!("got header {}", pbr.channel_name())); } } let ret = ChannelInfo { diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index b3ee25c..f7771a7 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -8,8 +8,11 @@ pub mod generated {} pub mod parse; #[cfg(not(feature = "devread"))] pub mod parsestub; +use items::eventvalues::EventValues; +use items::waveevents::WaveEvents; #[cfg(not(feature = "devread"))] pub use parsestub as parse; + pub mod events; #[cfg(feature = "devread")] #[cfg(test)] @@ -38,3 +41,17 @@ fn unescape_archapp_msg(inp: &[u8]) -> Result, Error> { } Ok(ret) } + +#[derive(Debug)] +pub enum EventsItem { + ScalarByte(EventValues), + ScalarShort(EventValues), + ScalarInt(EventValues), + ScalarFloat(EventValues), + ScalarDouble(EventValues), + WaveByte(WaveEvents), + WaveShort(WaveEvents), + WaveInt(WaveEvents), + WaveFloat(WaveEvents), + WaveDouble(WaveEvents), +} diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index 48ba9e9..98316c2 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -1,8 +1,10 @@ use crate::generated::EPICSEvent::PayloadType; -use crate::unescape_archapp_msg; +use crate::{unescape_archapp_msg, EventsItem}; use archapp_xc::*; use async_channel::{bounded, Receiver}; use err::Error; +use items::eventvalues::EventValues; +use items::waveevents::WaveEvents; use netpod::log::*; use netpod::NodeConfigCached; use protobuf::Message; @@ -20,6 +22,8 @@ pub struct PbFileReader { rp: usize, channel_name: String, payload_type: PayloadType, + year: u32, + month: u32, } impl PbFileReader { @@ -31,6 +35,8 @@ impl PbFileReader { rp: 0, channel_name: String::new(), payload_type: PayloadType::V4_GENERIC_BYTES, + year: 0, + month: 0, } } @@ -43,11 +49,15 @@ impl PbFileReader { .map_err(|_| Error::with_msg("can not parse PayloadInfo"))?; self.channel_name = payload_info.get_pvname().into(); self.payload_type = payload_info.get_field_type(); + self.year = payload_info.get_year() as u32; + + // TODO only the year in the pb? + self.month = 0; self.rp = k + 1; Ok(()) } - pub async fn read_msg(&mut self) -> Result<(), Error> { + pub async fn read_msg(&mut self) -> Result { self.fill_buf().await?; let k = self.find_next_nl()?; let buf = &mut self.buf; @@ -56,10 +66,71 @@ impl PbFileReader { // Handle the different types. // Must anyways reuse the Events NTY types. Where are they? // Attempt with big enum... - let msg = crate::generated::EPICSEvent::VectorFloat::parse_from_bytes(&m) - .map_err(|_| Error::with_msg("can not parse VectorFloat"))?; + use PayloadType::*; + let ei = match self.payload_type { + SCALAR_INT => { + let msg = crate::generated::EPICSEvent::ScalarInt::parse_from_bytes(&m) + .map_err(|_| Error::with_msg("can not parse ScalarInt"))?; + let mut t = EventValues:: { + tss: vec![], + values: vec![], + }; + // TODO Translate by the file-time-offset. + let ts = msg.get_secondsintoyear() as u64; + let v = msg.get_val(); + t.tss.push(ts); + t.values.push(v); + EventsItem::ScalarInt(t) + } + SCALAR_DOUBLE => { + let msg = crate::generated::EPICSEvent::ScalarDouble::parse_from_bytes(&m) + .map_err(|_| Error::with_msg("can not parse ScalarDouble"))?; + let mut t = EventValues:: { + tss: vec![], + values: vec![], + }; + // TODO Translate by the file-time-offset. + let ts = msg.get_secondsintoyear() as u64; + let v = msg.get_val(); + t.tss.push(ts); + t.values.push(v); + EventsItem::ScalarDouble(t) + } + WAVEFORM_FLOAT => { + let msg = crate::generated::EPICSEvent::VectorFloat::parse_from_bytes(&m) + .map_err(|_| Error::with_msg("can not parse VectorFloat"))?; + // TODO homogenize struct and field names w.r.t. EventValues: + let mut t = WaveEvents:: { + tss: vec![], + vals: vec![], + }; + // TODO Translate by the file-time-offset. + let ts = msg.get_secondsintoyear() as u64; + let v = msg.get_val().to_vec(); + t.tss.push(ts); + t.vals.push(v); + EventsItem::WaveFloat(t) + } + WAVEFORM_DOUBLE => { + let msg = crate::generated::EPICSEvent::VectorDouble::parse_from_bytes(&m) + .map_err(|_| Error::with_msg("can not parse VectorDouble"))?; + let mut t = WaveEvents:: { + tss: vec![], + vals: vec![], + }; + // TODO Translate by the file-time-offset. + let ts = msg.get_secondsintoyear() as u64; + let v = msg.get_val().to_vec(); + t.tss.push(ts); + t.vals.push(v); + EventsItem::WaveDouble(t) + } + _ => { + return Err(Error::with_msg(format!("not supported: {:?}", self.payload_type))); + } + }; self.rp = k + 1; - Ok(()) + Ok(ei) } async fn fill_buf(&mut self) -> Result<(), Error> { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index a631453..0eea68a 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -669,10 +669,19 @@ pub async fn archapp_channel_info(req: Request, node_config: &NodeConfigCa let url = Url::parse(&format!("dummy:{}", req.uri()))?; let pairs = get_url_query_pairs(&url); let channel = channel_from_pairs(&pairs)?; - let res = archapp_wrap::channel_info(&channel, node_config).await?; - let buf = serde_json::to_vec(&res)?; - let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, APP_JSON) - .body(Body::from(buf))?; - Ok(ret) + match archapp_wrap::channel_info(&channel, node_config).await { + Ok(res) => { + let buf = serde_json::to_vec(&res)?; + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(Body::from(buf))?; + Ok(ret) + } + Err(e) => { + let ret = response(StatusCode::INTERNAL_SERVER_ERROR) + .header(http::header::CONTENT_TYPE, "text/text") + .body(Body::from(format!("{:?}", e)))?; + Ok(ret) + } + } }