Start fetching events
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
[workspace]
|
||||
members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet"]
|
||||
members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet", "httpclient"]
|
||||
|
||||
[profile.release]
|
||||
debug = 1
|
||||
|
||||
@@ -1,40 +1,157 @@
|
||||
use crate::parse::PbFileReader;
|
||||
use crate::EventsItem;
|
||||
use chrono::{TimeZone, Utc};
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use items::Framable;
|
||||
use items::eventvalues::EventValues;
|
||||
use items::{Framable, RangeCompletableItem, StreamItem};
|
||||
use netpod::log::*;
|
||||
use netpod::query::RawEventsQuery;
|
||||
use netpod::{ArchiverAppliance, Channel, ChannelInfo, NodeConfigCached, Shape};
|
||||
use netpod::timeunits::DAY;
|
||||
use netpod::{ArchiverAppliance, Channel, ChannelInfo, ScalarType, Shape};
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use tokio::fs::{read_dir, File};
|
||||
|
||||
pub async fn make_event_pipe(
|
||||
_evq: &RawEventsQuery,
|
||||
_aa: &ArchiverAppliance,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
|
||||
err::todoval()
|
||||
struct DataFilename {
|
||||
year: u32,
|
||||
month: u32,
|
||||
}
|
||||
|
||||
pub async fn channel_info(channel: &Channel, node_config: &NodeConfigCached) -> Result<ChannelInfo, Error> {
|
||||
fn parse_data_filename(s: &str) -> Result<DataFilename, Error> {
|
||||
if !s.ends_with(".pb") {
|
||||
return Err(Error::with_msg_no_trace("not a .pb file"));
|
||||
}
|
||||
if s.len() < 12 {
|
||||
return Err(Error::with_msg_no_trace("filename too short"));
|
||||
}
|
||||
let j = &s[s.len() - 11..];
|
||||
if &j[0..1] != ":" {
|
||||
return Err(Error::with_msg_no_trace("no colon"));
|
||||
}
|
||||
if &j[5..6] != "_" {
|
||||
return Err(Error::with_msg_no_trace("no underscore"));
|
||||
}
|
||||
let year: u32 = j[1..5].parse()?;
|
||||
let month: u32 = j[6..8].parse()?;
|
||||
let ret = DataFilename { year, month };
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn make_event_pipe(
|
||||
evq: &RawEventsQuery,
|
||||
aa: &ArchiverAppliance,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
|
||||
info!("make_event_pipe {:?}", evq);
|
||||
let evq = evq.clone();
|
||||
let DirAndPrefix { dir, prefix } = directory_for_channel_files(&evq.channel, aa)?;
|
||||
let channel_info = channel_info(&evq.channel, aa).await?;
|
||||
//let dtbeg = Utc.timestamp((evq.range.beg / 1000000000) as i64, (evq.range.beg % 1000000000) as u32);
|
||||
let (tx, rx) = async_channel::bounded(16);
|
||||
let block1 = async move {
|
||||
let mut rd = tokio::fs::read_dir(&dir).await?;
|
||||
while let Some(de) = rd.next_entry().await? {
|
||||
let s = de.file_name().to_string_lossy().into_owned();
|
||||
if s.starts_with(&prefix) && s.ends_with(".pb") {
|
||||
match parse_data_filename(&s) {
|
||||
Ok(df) => {
|
||||
let ts0 = Utc.ymd(df.year as i32, df.month, 0).and_hms(0, 0, 0);
|
||||
let ts1 = ts0.timestamp() as u64 * 1000000000 + ts0.timestamp_subsec_nanos() as u64;
|
||||
if evq.range.beg < ts1 + DAY * 32 && evq.range.end > ts1 {
|
||||
let f1 = File::open(de.path()).await?;
|
||||
info!("opened {:?}", de.path());
|
||||
let mut pbr = PbFileReader::new(f1).await;
|
||||
pbr.read_header().await?;
|
||||
loop {
|
||||
match pbr.read_msg().await {
|
||||
Ok(ev) => match ev {
|
||||
EventsItem::ScalarDouble(h) => {
|
||||
//
|
||||
let (x, y) = h
|
||||
.tss
|
||||
.into_iter()
|
||||
.zip(h.values.into_iter())
|
||||
.filter_map(|(j, k)| {
|
||||
if j < evq.range.beg || j >= evq.range.end {
|
||||
None
|
||||
} else {
|
||||
Some((j, k))
|
||||
}
|
||||
})
|
||||
.fold((vec![], vec![]), |(mut a, mut b), (j, k)| {
|
||||
a.push(j);
|
||||
b.push(k);
|
||||
(a, b)
|
||||
});
|
||||
let b = EventValues { tss: x, values: y };
|
||||
let b = Ok(StreamItem::DataItem(RangeCompletableItem::Data(b)));
|
||||
tx.send(Box::new(b) as Box<dyn Framable>).await?;
|
||||
}
|
||||
_ => {
|
||||
//
|
||||
error!("case not covered");
|
||||
return Err(Error::with_msg_no_trace("todo"));
|
||||
}
|
||||
},
|
||||
Err(e) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok::<_, Error>(())
|
||||
};
|
||||
let block2 = async move {
|
||||
match block1.await {
|
||||
Ok(_) => {
|
||||
info!("block1 done ok");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{:?}", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
tokio::task::spawn(block2);
|
||||
Ok(Box::pin(rx))
|
||||
}
|
||||
|
||||
struct DirAndPrefix {
|
||||
dir: PathBuf,
|
||||
prefix: String,
|
||||
}
|
||||
|
||||
fn directory_for_channel_files(channel: &Channel, aa: &ArchiverAppliance) -> Result<DirAndPrefix, Error> {
|
||||
// SARUN11/CVME/DBLM546/IOC_CPU_LOAD
|
||||
// SARUN11-CVME-DBLM546:IOC_CPU_LOAD
|
||||
let a: Vec<_> = channel.name.split("-").map(|s| s.split(":")).flatten().collect();
|
||||
let path1 = node_config
|
||||
.node
|
||||
.archiver_appliance
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.data_base_path
|
||||
.clone();
|
||||
let path2 = a.iter().take(a.len() - 1).fold(path1, |a, &x| a.join(x));
|
||||
let path = aa.data_base_path.clone();
|
||||
let path = a.iter().take(a.len() - 1).fold(path, |a, &x| a.join(x));
|
||||
let ret = DirAndPrefix {
|
||||
dir: path,
|
||||
prefix: a
|
||||
.last()
|
||||
.ok_or_else(|| Error::with_msg_no_trace("no prefix in file"))?
|
||||
.to_string(),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result<ChannelInfo, Error> {
|
||||
let DirAndPrefix { dir, prefix } = directory_for_channel_files(channel, aa)?;
|
||||
let mut msgs = vec![];
|
||||
msgs.push(format!("a: {:?}", a));
|
||||
msgs.push(format!("path2: {}", path2.to_string_lossy()));
|
||||
let mut rd = tokio::fs::read_dir(&path2).await?;
|
||||
msgs.push(format!("path: {}", dir.to_string_lossy()));
|
||||
let mut scalar_type = None;
|
||||
let mut shape = None;
|
||||
let mut rd = read_dir(&dir).await?;
|
||||
while let Some(de) = rd.next_entry().await? {
|
||||
let s = de.file_name().to_string_lossy().into_owned();
|
||||
if s.starts_with(a.last().unwrap()) && s.ends_with(".pb") {
|
||||
if s.starts_with(&prefix) && s.ends_with(".pb") {
|
||||
msgs.push(s);
|
||||
let f1 = tokio::fs::File::open(de.path()).await?;
|
||||
let f1 = File::open(de.path()).await?;
|
||||
let mut pbr = PbFileReader::new(f1).await;
|
||||
pbr.read_header().await?;
|
||||
msgs.push(format!("got header {}", pbr.channel_name()));
|
||||
@@ -42,6 +159,57 @@ pub async fn channel_info(channel: &Channel, node_config: &NodeConfigCached) ->
|
||||
match ev {
|
||||
Ok(item) => {
|
||||
msgs.push(format!("got event {:?}", item));
|
||||
shape = Some(match &item {
|
||||
EventsItem::ScalarByte(_) => Shape::Scalar,
|
||||
EventsItem::ScalarShort(_) => Shape::Scalar,
|
||||
EventsItem::ScalarInt(_) => Shape::Scalar,
|
||||
EventsItem::ScalarFloat(_) => Shape::Scalar,
|
||||
EventsItem::ScalarDouble(_) => Shape::Scalar,
|
||||
EventsItem::WaveByte(item) => Shape::Wave(
|
||||
item.vals
|
||||
.first()
|
||||
.ok_or_else(|| Error::with_msg_no_trace("empty event batch"))?
|
||||
.len() as u32,
|
||||
),
|
||||
EventsItem::WaveShort(item) => Shape::Wave(
|
||||
item.vals
|
||||
.first()
|
||||
.ok_or_else(|| Error::with_msg_no_trace("empty event batch"))?
|
||||
.len() as u32,
|
||||
),
|
||||
EventsItem::WaveInt(item) => Shape::Wave(
|
||||
item.vals
|
||||
.first()
|
||||
.ok_or_else(|| Error::with_msg_no_trace("empty event batch"))?
|
||||
.len() as u32,
|
||||
),
|
||||
EventsItem::WaveFloat(item) => Shape::Wave(
|
||||
item.vals
|
||||
.first()
|
||||
.ok_or_else(|| Error::with_msg_no_trace("empty event batch"))?
|
||||
.len() as u32,
|
||||
),
|
||||
EventsItem::WaveDouble(item) => Shape::Wave(
|
||||
item.vals
|
||||
.first()
|
||||
.ok_or_else(|| Error::with_msg_no_trace("empty event batch"))?
|
||||
.len() as u32,
|
||||
),
|
||||
});
|
||||
// These type mappings are defined by the protobuffer schema.
|
||||
scalar_type = Some(match item {
|
||||
EventsItem::ScalarByte(_) => ScalarType::U8,
|
||||
EventsItem::ScalarShort(_) => ScalarType::I32,
|
||||
EventsItem::ScalarInt(_) => ScalarType::I32,
|
||||
EventsItem::ScalarFloat(_) => ScalarType::F32,
|
||||
EventsItem::ScalarDouble(_) => ScalarType::F64,
|
||||
EventsItem::WaveByte(_) => ScalarType::U8,
|
||||
EventsItem::WaveShort(_) => ScalarType::I32,
|
||||
EventsItem::WaveInt(_) => ScalarType::I32,
|
||||
EventsItem::WaveFloat(_) => ScalarType::F32,
|
||||
EventsItem::WaveDouble(_) => ScalarType::F64,
|
||||
});
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
msgs.push(format!("can not read event! {:?}", e));
|
||||
@@ -50,8 +218,12 @@ pub async fn channel_info(channel: &Channel, node_config: &NodeConfigCached) ->
|
||||
msgs.push(format!("got header {}", pbr.channel_name()));
|
||||
}
|
||||
}
|
||||
let shape = shape.ok_or_else(|| Error::with_msg("could not determine shape"))?;
|
||||
let scalar_type = scalar_type.ok_or_else(|| Error::with_msg("could not determine scalar_type"))?;
|
||||
let ret = ChannelInfo {
|
||||
shape: Shape::Scalar,
|
||||
scalar_type,
|
||||
byte_order: None,
|
||||
shape,
|
||||
msg: JsonValue::Array(msgs.into_iter().map(JsonValue::String).collect()),
|
||||
};
|
||||
Ok(ret)
|
||||
|
||||
@@ -2,11 +2,12 @@ use crate::generated::EPICSEvent::PayloadType;
|
||||
use crate::{unescape_archapp_msg, EventsItem};
|
||||
use archapp_xc::*;
|
||||
use async_channel::{bounded, Receiver};
|
||||
use chrono::{TimeZone, Utc};
|
||||
use err::Error;
|
||||
use items::eventvalues::EventValues;
|
||||
use items::waveevents::WaveEvents;
|
||||
use netpod::log::*;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::{ArchiverAppliance, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached};
|
||||
use protobuf::Message;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value as JsonValue;
|
||||
@@ -25,16 +26,32 @@ pub struct PbFileReader {
|
||||
year: u32,
|
||||
}
|
||||
|
||||
fn parse_scalar_byte(m: &[u8], year: u32) -> Result<EventsItem, Error> {
|
||||
let msg = crate::generated::EPICSEvent::ScalarByte::parse_from_bytes(m)
|
||||
.map_err(|_| Error::with_msg(format!("can not parse pb-type {}", "ScalarByte")))?;
|
||||
let mut t = EventValues::<i32> {
|
||||
tss: vec![],
|
||||
values: vec![],
|
||||
};
|
||||
let yd = Utc.ymd(year as i32, 1, 1).and_hms(0, 0, 0);
|
||||
let ts = yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64;
|
||||
let v = msg.get_val().first().map_or(0, |k| *k as i32);
|
||||
t.tss.push(ts);
|
||||
t.values.push(v);
|
||||
Ok(EventsItem::ScalarByte(t))
|
||||
}
|
||||
|
||||
macro_rules! scalar_parse {
|
||||
($m:expr, $pbt:ident, $eit:ident, $evty:ident) => {{
|
||||
($m:expr, $year:expr, $pbt:ident, $eit:ident, $evty:ident) => {{
|
||||
let msg = crate::generated::EPICSEvent::$pbt::parse_from_bytes($m)
|
||||
.map_err(|_| Error::with_msg(format!("can not parse pb-type {}", stringify!($pbt))))?;
|
||||
let mut t = EventValues::<$evty> {
|
||||
tss: vec![],
|
||||
values: vec![],
|
||||
};
|
||||
// TODO Translate by the file-time-offset.
|
||||
let ts = msg.get_secondsintoyear() as u64;
|
||||
let yd = Utc.ymd($year as i32, 1, 1).and_hms(0, 0, 0);
|
||||
let ts =
|
||||
yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64;
|
||||
let v = msg.get_val();
|
||||
t.tss.push(ts);
|
||||
t.values.push(v);
|
||||
@@ -43,15 +60,16 @@ macro_rules! scalar_parse {
|
||||
}
|
||||
|
||||
macro_rules! wave_parse {
|
||||
($m:expr, $pbt:ident, $eit:ident, $evty:ident) => {{
|
||||
($m:expr, $year:expr, $pbt:ident, $eit:ident, $evty:ident) => {{
|
||||
let msg = crate::generated::EPICSEvent::$pbt::parse_from_bytes($m)
|
||||
.map_err(|_| Error::with_msg(format!("can not parse pb-type {}", stringify!($pbt))))?;
|
||||
let mut t = WaveEvents::<$evty> {
|
||||
tss: vec![],
|
||||
vals: vec![],
|
||||
};
|
||||
// TODO Translate by the file-time-offset.
|
||||
let ts = msg.get_secondsintoyear() as u64;
|
||||
let yd = Utc.ymd($year as i32, 1, 1).and_hms(0, 0, 0);
|
||||
let ts =
|
||||
yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64;
|
||||
let v = msg.get_val();
|
||||
t.tss.push(ts);
|
||||
t.vals.push(v.to_vec());
|
||||
@@ -59,11 +77,13 @@ macro_rules! wave_parse {
|
||||
}};
|
||||
}
|
||||
|
||||
const MIN_BUF_FILL: usize = 1024 * 16;
|
||||
|
||||
impl PbFileReader {
|
||||
pub async fn new(file: File) -> Self {
|
||||
Self {
|
||||
file,
|
||||
buf: vec![0; 1024 * 128],
|
||||
buf: vec![0; MIN_BUF_FILL * 4],
|
||||
wp: 0,
|
||||
rp: 0,
|
||||
channel_name: String::new(),
|
||||
@@ -93,42 +113,39 @@ impl PbFileReader {
|
||||
let m = unescape_archapp_msg(&buf[self.rp..k])?;
|
||||
use PayloadType::*;
|
||||
let ei = match self.payload_type {
|
||||
SCALAR_BYTE => {
|
||||
//scalar_parse!(&m, ScalarByte, ScalarByte, u8)
|
||||
err::todoval()
|
||||
}
|
||||
SCALAR_BYTE => parse_scalar_byte(&m, self.year)?,
|
||||
SCALAR_ENUM => {
|
||||
scalar_parse!(&m, ScalarEnum, ScalarInt, i32)
|
||||
scalar_parse!(&m, self.year, ScalarEnum, ScalarInt, i32)
|
||||
}
|
||||
SCALAR_SHORT => {
|
||||
scalar_parse!(&m, ScalarShort, ScalarShort, i32)
|
||||
scalar_parse!(&m, self.year, ScalarShort, ScalarShort, i32)
|
||||
}
|
||||
SCALAR_INT => {
|
||||
scalar_parse!(&m, ScalarInt, ScalarInt, i32)
|
||||
scalar_parse!(&m, self.year, ScalarInt, ScalarInt, i32)
|
||||
}
|
||||
SCALAR_FLOAT => {
|
||||
scalar_parse!(&m, ScalarFloat, ScalarFloat, f32)
|
||||
scalar_parse!(&m, self.year, ScalarFloat, ScalarFloat, f32)
|
||||
}
|
||||
SCALAR_DOUBLE => {
|
||||
scalar_parse!(&m, ScalarDouble, ScalarDouble, f64)
|
||||
scalar_parse!(&m, self.year, ScalarDouble, ScalarDouble, f64)
|
||||
}
|
||||
WAVEFORM_BYTE => {
|
||||
wave_parse!(&m, VectorChar, WaveByte, u8)
|
||||
wave_parse!(&m, self.year, VectorChar, WaveByte, u8)
|
||||
}
|
||||
WAVEFORM_SHORT => {
|
||||
wave_parse!(&m, VectorShort, WaveShort, i32)
|
||||
wave_parse!(&m, self.year, VectorShort, WaveShort, i32)
|
||||
}
|
||||
WAVEFORM_ENUM => {
|
||||
wave_parse!(&m, VectorEnum, WaveInt, i32)
|
||||
wave_parse!(&m, self.year, VectorEnum, WaveInt, i32)
|
||||
}
|
||||
WAVEFORM_INT => {
|
||||
wave_parse!(&m, VectorInt, WaveInt, i32)
|
||||
wave_parse!(&m, self.year, VectorInt, WaveInt, i32)
|
||||
}
|
||||
WAVEFORM_FLOAT => {
|
||||
wave_parse!(&m, VectorFloat, WaveFloat, f32)
|
||||
wave_parse!(&m, self.year, VectorFloat, WaveFloat, f32)
|
||||
}
|
||||
WAVEFORM_DOUBLE => {
|
||||
wave_parse!(&m, VectorDouble, WaveDouble, f64)
|
||||
wave_parse!(&m, self.year, VectorDouble, WaveDouble, f64)
|
||||
}
|
||||
SCALAR_STRING | WAVEFORM_STRING | V4_GENERIC_BYTES => {
|
||||
return Err(Error::with_msg(format!("not supported: {:?}", self.payload_type)));
|
||||
@@ -139,10 +156,10 @@ impl PbFileReader {
|
||||
}
|
||||
|
||||
async fn fill_buf(&mut self) -> Result<(), Error> {
|
||||
if self.wp - self.rp >= 1024 * 16 {
|
||||
if self.wp - self.rp >= MIN_BUF_FILL {
|
||||
return Ok(());
|
||||
}
|
||||
if self.rp >= 1024 * 42 {
|
||||
if self.rp >= self.buf.len() - MIN_BUF_FILL {
|
||||
let n = self.wp - self.rp;
|
||||
for i in 0..n {
|
||||
self.buf[i] = self.buf[self.rp + i];
|
||||
@@ -397,3 +414,14 @@ pub async fn scan_files_inner(
|
||||
tokio::spawn(block2);
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
pub async fn channel_config(q: &ChannelConfigQuery, aa: &ArchiverAppliance) -> Result<ChannelConfigResponse, Error> {
|
||||
let ci = crate::events::channel_info(&q.channel, aa).await?;
|
||||
let ret = ChannelConfigResponse {
|
||||
channel: q.channel.clone(),
|
||||
scalar_type: ci.scalar_type,
|
||||
byte_order: ci.byte_order,
|
||||
shape: ci.shape,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use err::Error;
|
||||
use futures_core::Stream;
|
||||
use items::Framable;
|
||||
use netpod::query::RawEventsQuery;
|
||||
use netpod::{ArchiverAppliance, Channel, ChannelInfo, NodeConfigCached};
|
||||
use netpod::{ArchiverAppliance, Channel, ChannelConfigQuery, ChannelConfigResponse, ChannelInfo, NodeConfigCached};
|
||||
use std::collections::BTreeMap;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
@@ -24,5 +24,9 @@ pub async fn make_event_pipe(
|
||||
}
|
||||
|
||||
pub async fn channel_info(channel: &Channel, node_config: &NodeConfigCached) -> Result<ChannelInfo, Error> {
|
||||
archapp::events::channel_info(channel, node_config).await
|
||||
archapp::events::channel_info(channel, node_config.node.archiver_appliance.as_ref().unwrap()).await
|
||||
}
|
||||
|
||||
pub async fn channel_config(q: &ChannelConfigQuery, aa: &ArchiverAppliance) -> Result<ChannelConfigResponse, Error> {
|
||||
archapp::parse::channel_config(q, aa).await
|
||||
}
|
||||
|
||||
@@ -39,3 +39,4 @@ bitshuffle = { path = "../bitshuffle" }
|
||||
dbconn = { path = "../dbconn" }
|
||||
parse = { path = "../parse" }
|
||||
items = { path = "../items" }
|
||||
httpclient = { path = "../httpclient" }
|
||||
|
||||
@@ -11,8 +11,7 @@ use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items::numops::{BoolNum, NumOps};
|
||||
use items::{Appendable, EventsNodeProcessor, Framable, FrameType, PushableIndex, Sitemty, TimeBinnableType};
|
||||
use netpod::{AggKind, ByteOrder, NodeConfigCached, ScalarType, Shape};
|
||||
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
|
||||
use netpod::{AggKind, ByteOrder, ChannelConfigQuery, NodeConfigCached, ScalarType, Shape};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Serialize;
|
||||
use std::pin::Pin;
|
||||
@@ -123,6 +122,9 @@ macro_rules! match_end {
|
||||
};
|
||||
}
|
||||
|
||||
// TODO is the distinction on byte order necessary here?
|
||||
// We should rely on the "events" http api to deliver data, and the cache, both
|
||||
// of those have fixed endianness.
|
||||
fn make_num_pipeline(
|
||||
scalar_type: ScalarType,
|
||||
byte_order: ByteOrder,
|
||||
@@ -166,28 +168,17 @@ pub async fn pre_binned_bytes_for_http(
|
||||
));
|
||||
return Err(err);
|
||||
}
|
||||
let channel_config = match read_local_config(&query.channel(), &node_config.node).await {
|
||||
Ok(k) => k,
|
||||
Err(e) => {
|
||||
if e.msg().contains("ErrorKind::NotFound") {
|
||||
let s = futures_util::stream::empty();
|
||||
let ret = Box::pin(s);
|
||||
return Ok(ret);
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
let entry_res = extract_matching_config_entry(&query.patch().patch_range(), &channel_config)?;
|
||||
let entry = match entry_res {
|
||||
MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found")),
|
||||
MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found")),
|
||||
MatchingConfigEntry::Entry(entry) => entry,
|
||||
|
||||
let q = ChannelConfigQuery {
|
||||
channel: query.channel().clone(),
|
||||
range: query.patch().patch_range(),
|
||||
};
|
||||
let conf = httpclient::get_channel_config(&q, node_config).await?;
|
||||
let ret = make_num_pipeline(
|
||||
entry.scalar_type.clone(),
|
||||
entry.byte_order.clone(),
|
||||
entry.to_shape()?,
|
||||
conf.scalar_type.clone(),
|
||||
// TODO actually, make_num_pipeline should not depend on endianness.
|
||||
conf.byte_order.unwrap_or(ByteOrder::LE).clone(),
|
||||
conf.shape.clone(),
|
||||
query.agg_kind().clone(),
|
||||
query.clone(),
|
||||
node_config,
|
||||
|
||||
@@ -433,3 +433,13 @@ impl ChannelExecFunction for PlainEventsJson {
|
||||
Box::pin(futures_util::stream::empty())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dummy_impl() {
|
||||
let channel: Channel = err::todoval();
|
||||
let range: NanoRange = err::todoval();
|
||||
let agg_kind: AggKind = err::todoval();
|
||||
let node_config: NodeConfigCached = err::todoval();
|
||||
let timeout: Duration = err::todoval();
|
||||
let f = PlainEventsJson::new(channel.clone(), range.clone(), 0, timeout, node_config.clone(), false);
|
||||
let _ = channel_exec(f, &channel, &range, agg_kind, &node_config);
|
||||
}
|
||||
|
||||
@@ -34,6 +34,14 @@ impl Error {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_msg_no_trace<S: Into<String>>(s: S) -> Self {
|
||||
Self {
|
||||
msg: s.into(),
|
||||
trace: None,
|
||||
trace_str: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn msg(&self) -> &str {
|
||||
&self.msg
|
||||
}
|
||||
|
||||
22
httpclient/Cargo.toml
Normal file
22
httpclient/Cargo.toml
Normal file
@@ -0,0 +1,22 @@
|
||||
[package]
|
||||
name = "httpclient"
|
||||
version = "0.0.1-a.0"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
serde_json = "1.0"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
http = "0.2"
|
||||
url = "2.2"
|
||||
tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
|
||||
hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] }
|
||||
hyper-tls = { version="0.5.0" }
|
||||
bytes = "1.0.1"
|
||||
futures-core = "0.3.14"
|
||||
futures-util = "0.3.14"
|
||||
tracing = "0.1.25"
|
||||
async-channel = "1.6"
|
||||
err = { path = "../err" }
|
||||
netpod = { path = "../netpod" }
|
||||
parse = { path = "../parse" }
|
||||
27
httpclient/src/lib.rs
Normal file
27
httpclient/src/lib.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
use err::Error;
|
||||
use hyper::{Body, Method};
|
||||
use netpod::{AppendToUrl, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached};
|
||||
use url::Url;
|
||||
|
||||
pub async fn get_channel_config(
|
||||
q: &ChannelConfigQuery,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<ChannelConfigResponse, Error> {
|
||||
let mut url = Url::parse(&format!(
|
||||
"http://{}:{}/api/4/channel/config",
|
||||
"localhost", node_config.node.port
|
||||
))?;
|
||||
q.append_to_url(&mut url);
|
||||
let req = hyper::Request::builder()
|
||||
.method(Method::GET)
|
||||
.uri(url.as_str())
|
||||
.body(Body::empty())?;
|
||||
let client = hyper::Client::new();
|
||||
let res = client.request(req).await?;
|
||||
if !res.status().is_success() {
|
||||
return Err(Error::with_msg("http client error"));
|
||||
}
|
||||
let buf = hyper::body::to_bytes(res.into_body()).await?;
|
||||
let ret: ChannelConfigResponse = serde_json::from_slice(&buf)?;
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -13,8 +13,8 @@ use hyper::{server::Server, Body, Request, Response};
|
||||
use net::SocketAddr;
|
||||
use netpod::log::*;
|
||||
use netpod::{
|
||||
channel_from_pairs, get_url_query_pairs, AggKind, Channel, FromUrl, NodeConfigCached, APP_JSON, APP_JSON_LINES,
|
||||
APP_OCTET,
|
||||
channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached, APP_JSON,
|
||||
APP_JSON_LINES, APP_OCTET,
|
||||
};
|
||||
use nodenet::conn::events_service;
|
||||
use panic::{AssertUnwindSafe, UnwindSafe};
|
||||
@@ -431,11 +431,19 @@ async fn plain_events(req: Request<Body>, node_config: &NodeConfigCached) -> Res
|
||||
.get(http::header::ACCEPT)
|
||||
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
|
||||
if accept == APP_JSON {
|
||||
Ok(plain_events_json(req, node_config).await?)
|
||||
Ok(plain_events_json(req, node_config).await.map_err(|e| {
|
||||
error!("{:?}", e);
|
||||
e
|
||||
})?)
|
||||
} else if accept == APP_OCTET {
|
||||
Ok(plain_events_binary(req, node_config).await?)
|
||||
Ok(plain_events_binary(req, node_config).await.map_err(|e| {
|
||||
error!("{:?}", e);
|
||||
e
|
||||
})?)
|
||||
} else {
|
||||
Err(Error::with_msg(format!("unexpected Accept: {:?}", accept)))
|
||||
let e = Error::with_msg(format!("unexpected Accept: {:?}", accept));
|
||||
error!("{:?}", e);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -455,6 +463,7 @@ async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached)
|
||||
}
|
||||
|
||||
async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
info!("plain_events_json req: {:?}", req);
|
||||
let (head, _body) = req.into_parts();
|
||||
let query = PlainEventsJsonQuery::from_request_head(&head)?;
|
||||
let op = disk::channelexec::PlainEventsJson::new(
|
||||
@@ -610,17 +619,19 @@ pub async fn update_search_cache(req: Request<Body>, node_config: &NodeConfigCac
|
||||
}
|
||||
|
||||
pub async fn channel_config(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
info!("channel_config");
|
||||
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||
let pairs = get_url_query_pairs(&url);
|
||||
let _dry = pairs.contains_key("dry");
|
||||
let channel = Channel {
|
||||
backend: node_config.node.backend.clone(),
|
||||
name: pairs.get("channelName").unwrap().into(),
|
||||
//let pairs = get_url_query_pairs(&url);
|
||||
let q = ChannelConfigQuery::from_url(&url)?;
|
||||
info!("ChannelConfigQuery {:?}", q);
|
||||
let conf = if let Some(aa) = &node_config.node.archiver_appliance {
|
||||
archapp_wrap::channel_config(&q, aa).await?
|
||||
} else {
|
||||
parse::channelconfig::channel_config(&q, &node_config.node).await?
|
||||
};
|
||||
let res = parse::channelconfig::read_local_config(&channel, &node_config.node).await?;
|
||||
let ret = response(StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, APP_JSON)
|
||||
.body(Body::from(serde_json::to_string(&res)?))?;
|
||||
.body(Body::from(serde_json::to_string(&conf)?))?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
|
||||
@@ -970,9 +970,16 @@ pub fn get_url_query_pairs(url: &Url) -> BTreeMap<String, String> {
|
||||
BTreeMap::from_iter(url.query_pairs().map(|(j, k)| (j.to_string(), k.to_string())))
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
/**
|
||||
Request type of the channel/config api. \
|
||||
At least on some backends the channel configuration may change depending on the queried range.
|
||||
Therefore, the query includes the range.
|
||||
The presence of a configuration in some range does not imply that there is any data available.
|
||||
*/
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ChannelConfigQuery {
|
||||
pub channel: Channel,
|
||||
pub range: NanoRange,
|
||||
}
|
||||
|
||||
impl HasBackend for ChannelConfigQuery {
|
||||
@@ -990,8 +997,14 @@ impl HasTimeout for ChannelConfigQuery {
|
||||
impl FromUrl for ChannelConfigQuery {
|
||||
fn from_url(url: &Url) -> Result<Self, Error> {
|
||||
let pairs = get_url_query_pairs(url);
|
||||
let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?;
|
||||
let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?;
|
||||
let ret = Self {
|
||||
channel: channel_from_pairs(&pairs)?,
|
||||
range: NanoRange {
|
||||
beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(),
|
||||
end: end_date.parse::<DateTime<Utc>>()?.to_nanos(),
|
||||
},
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -999,12 +1012,30 @@ impl FromUrl for ChannelConfigQuery {
|
||||
|
||||
impl AppendToUrl for ChannelConfigQuery {
|
||||
fn append_to_url(&self, url: &mut Url) {
|
||||
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
|
||||
let mut g = url.query_pairs_mut();
|
||||
g.append_pair("channelBackend", &self.channel.backend);
|
||||
g.append_pair("channelName", &self.channel.name);
|
||||
g.append_pair(
|
||||
"begDate",
|
||||
&Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(),
|
||||
);
|
||||
g.append_pair(
|
||||
"endDate",
|
||||
&Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ChannelConfigResponse {
|
||||
pub channel: Channel,
|
||||
#[serde(rename = "scalarType")]
|
||||
pub scalar_type: ScalarType,
|
||||
pub byte_order: Option<ByteOrder>,
|
||||
pub shape: Shape,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct EventQueryJsonStringFrame(pub String);
|
||||
|
||||
@@ -1013,6 +1044,8 @@ Provide basic information about a channel, especially it's shape.
|
||||
*/
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ChannelInfo {
|
||||
pub scalar_type: ScalarType,
|
||||
pub byte_order: Option<ByteOrder>,
|
||||
pub shape: Shape,
|
||||
pub msg: serde_json::Value,
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use err::Error;
|
||||
use netpod::timeunits::MS;
|
||||
use netpod::{ByteOrder, Channel, NanoRange, Nanos, Node, ScalarType, Shape};
|
||||
use netpod::{
|
||||
ByteOrder, Channel, ChannelConfigQuery, ChannelConfigResponse, NanoRange, Nanos, Node, ScalarType, Shape,
|
||||
};
|
||||
use nom::bytes::complete::take;
|
||||
use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8};
|
||||
use nom::Needed;
|
||||
@@ -253,6 +255,23 @@ pub fn parse_config(inp: &[u8]) -> NRes<Config> {
|
||||
Ok((inp, ret))
|
||||
}
|
||||
|
||||
pub async fn channel_config(q: &ChannelConfigQuery, node: &Node) -> Result<ChannelConfigResponse, Error> {
|
||||
let conf = read_local_config(&q.channel, node).await?;
|
||||
let entry_res = extract_matching_config_entry(&q.range, &conf)?;
|
||||
let entry = match entry_res {
|
||||
MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found")),
|
||||
MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found")),
|
||||
MatchingConfigEntry::Entry(entry) => entry,
|
||||
};
|
||||
let ret = ChannelConfigResponse {
|
||||
channel: q.channel.clone(),
|
||||
scalar_type: entry.scalar_type.clone(),
|
||||
byte_order: Some(entry.byte_order.clone()),
|
||||
shape: entry.to_shape()?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn read_local_config(channel: &Channel, node: &Node) -> Result<Config, Error> {
|
||||
let path = node
|
||||
.data_base_path
|
||||
|
||||
Reference in New Issue
Block a user