This commit is contained in:
Dominik Werder
2023-06-20 19:00:08 +02:00
parent 1b3267c1a1
commit 2d566cc9ca
11 changed files with 452 additions and 355 deletions

View File

@@ -9,11 +9,9 @@ use bytes::BytesMut;
use disk::eventchunker::EventChunkerConf;
use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes;
use disk::raw::conn::make_local_event_blobs_stream;
use futures_util::stream;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryFutureExt;
use futures_util::TryStreamExt;
use http::Method;
use http::StatusCode;
@@ -32,7 +30,6 @@ use netpod::query::api1::Api1Query;
use netpod::range::evrange::NanoRange;
use netpod::timeunits::SEC;
use netpod::ByteSize;
use netpod::ChConf;
use netpod::ChannelSearchQuery;
use netpod::ChannelSearchResult;
use netpod::ChannelTypeConfigGen;
@@ -40,23 +37,19 @@ use netpod::DiskIoTune;
use netpod::NodeConfigCached;
use netpod::PerfOpts;
use netpod::ProxyConfig;
use netpod::ScalarType;
use netpod::SfChFetchInfo;
use netpod::SfDbChannel;
use netpod::Shape;
use netpod::ACCEPT_ALL;
use netpod::APP_JSON;
use netpod::APP_OCTET;
use parse::channelconfig::extract_matching_config_entry;
use parse::channelconfig::read_local_config;
use parse::channelconfig::ChannelConfigs;
use parse::channelconfig::ConfigEntry;
use parse::api1_parse::Api1ByteOrder;
use parse::api1_parse::Api1ChannelHeader;
use query::api4::events::PlainEventsQuery;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
@@ -519,127 +512,6 @@ async fn process_answer(res: Response<Body>) -> Result<JsonValue, Error> {
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Api1ScalarType {
#[serde(rename = "uint8")]
U8,
#[serde(rename = "uint16")]
U16,
#[serde(rename = "uint32")]
U32,
#[serde(rename = "uint64")]
U64,
#[serde(rename = "int8")]
I8,
#[serde(rename = "int16")]
I16,
#[serde(rename = "int32")]
I32,
#[serde(rename = "int64")]
I64,
#[serde(rename = "float32")]
F32,
#[serde(rename = "float64")]
F64,
#[serde(rename = "bool")]
BOOL,
#[serde(rename = "string")]
STRING,
}
impl Api1ScalarType {
pub fn to_str(&self) -> &'static str {
use Api1ScalarType as A;
match self {
A::U8 => "uint8",
A::U16 => "uint16",
A::U32 => "uint32",
A::U64 => "uint64",
A::I8 => "int8",
A::I16 => "int16",
A::I32 => "int32",
A::I64 => "int64",
A::F32 => "float32",
A::F64 => "float64",
A::BOOL => "bool",
A::STRING => "string",
}
}
}
impl fmt::Display for Api1ScalarType {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", self.to_str())
}
}
impl From<&ScalarType> for Api1ScalarType {
fn from(k: &ScalarType) -> Self {
use Api1ScalarType as B;
use ScalarType as A;
match k {
A::U8 => B::U8,
A::U16 => B::U16,
A::U32 => B::U32,
A::U64 => B::U64,
A::I8 => B::I8,
A::I16 => B::I16,
A::I32 => B::I32,
A::I64 => B::I64,
A::F32 => B::F32,
A::F64 => B::F64,
A::BOOL => B::BOOL,
A::STRING => B::STRING,
}
}
}
impl From<ScalarType> for Api1ScalarType {
fn from(x: ScalarType) -> Self {
(&x).into()
}
}
#[test]
fn test_custom_variant_name() {
let val = Api1ScalarType::F32;
assert_eq!(format!("{val:?}"), "F32");
assert_eq!(format!("{val}"), "float32");
let s = serde_json::to_string(&val).unwrap();
assert_eq!(s, "\"float32\"");
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Api1ByteOrder {
#[serde(rename = "LITTLE_ENDIAN")]
Little,
#[serde(rename = "BIG_ENDIAN")]
Big,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Api1ChannelHeader {
name: String,
#[serde(rename = "type")]
ty: Api1ScalarType,
#[serde(rename = "byteOrder")]
byte_order: Api1ByteOrder,
#[serde(default)]
shape: Vec<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
compression: Option<usize>,
}
impl Api1ChannelHeader {
pub fn name(&self) -> &str {
&self.name
}
pub fn ty(&self) -> Api1ScalarType {
self.ty.clone()
}
}
async fn find_ch_conf(
range: NanoRange,
channel: SfDbChannel,
@@ -728,21 +600,19 @@ impl DataApiPython3DataStream {
None
};
};
let compression = if let Some(_) = &b.comps[i1] { Some(1) } else { None };
if !*header_out {
let head = Api1ChannelHeader {
name: channel.name().into(),
ty: (&b.scalar_types[i1]).into(),
byte_order: if b.be[i1] {
Api1ByteOrder::Big
} else {
Api1ByteOrder::Little
},
// The shape is inconsistent on the events.
// Seems like the config is to be trusted in this case.
shape: shape.to_u32_vec(),
compression,
let byte_order = if b.be[i1] {
Api1ByteOrder::Big
} else {
Api1ByteOrder::Little
};
let head = Api1ChannelHeader::new(
channel.name().into(),
b.scalar_types.get(i1).unwrap().into(),
byte_order,
shape.clone(),
b.comps.get(i1).map(|x| x.clone()).unwrap(),
);
let h = serde_json::to_string(&head)?;
info!("sending channel header {}", h);
let l1 = 1 + h.as_bytes().len() as u32;