This commit is contained in:
Dominik Werder
2023-07-19 19:02:28 +02:00
parent 907eed350d
commit df091c0eb7
10 changed files with 188 additions and 77 deletions

View File

@@ -18,6 +18,7 @@ use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use tokio::io::ErrorKind;
use tokio::io::SeekFrom;
use tracing::Instrument;
#[cfg(test)]
const BACKEND: &str = "testbackend-00";
@@ -212,28 +213,33 @@ impl fmt::Debug for OpenedFile {
pub fn open_files(
range: &NanoRange,
fetch_info: &SfChFetchInfo,
reqid: &str,
node: Node,
) -> async_channel::Receiver<Result<OpenedFileSet, Error>> {
let span = tracing::span!(tracing::Level::DEBUG, "open_files", reqid);
let (chtx, chrx) = async_channel::bounded(2);
let range = range.clone();
let fetch_info = fetch_info.clone();
tokio::spawn(async move {
match open_files_inner(&chtx, &range, &fetch_info, node).await {
Ok(_) => {}
Err(e) => {
let e = e.add_public_msg(format!(
"Can not open file for channel: {fetch_info:?} range: {range:?}"
));
match chtx.send(Err(e.into())).await {
Ok(_) => {}
Err(e) => {
// This case is fine.
debug!("open_files channel send error {:?}", e);
tokio::spawn(
async move {
match open_files_inner(&chtx, &range, &fetch_info, node).await {
Ok(_) => {}
Err(e) => {
let e = e.add_public_msg(format!(
"Can not open file for channel: {fetch_info:?} range: {range:?}"
));
match chtx.send(Err(e.into())).await {
Ok(_) => {}
Err(e) => {
// This case is fine.
debug!("open_files channel send error {:?}", e);
}
}
}
}
}
});
.instrument(span),
);
chrx
}

View File

@@ -72,7 +72,7 @@ impl EventChunkerMultifile {
let file_chan = if expand {
open_expanded_files(&range, &fetch_info, node)
} else {
open_files(&range, &fetch_info, node)
open_files(&range, &fetch_info, reqctx.reqid(), node)
};
Self {
file_chan,

View File

@@ -1,6 +1,6 @@
[package]
name = "err"
version = "0.0.3"
version = "0.0.4"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
@@ -8,17 +8,15 @@ edition = "2021"
doctest = false
[dependencies]
backtrace = "0.3"
backtrace = "0.3.68"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11"
erased-serde = "0.3"
serde_cbor = "0.11.2"
rmp-serde = "1.1.1"
async-channel = "1.8.0"
chrono = { version = "0.4", features = ["serde"] }
url = "2.2"
regex = "1.5"
http = "0.2"
#thiserror = "1.0"
async-channel = "1.9.0"
chrono = { version = "0.4.26", features = ["serde"] }
url = "2.4.0"
regex = "1.9.1"
http = "0.2.9"
thiserror = { path = "../../../thiserror" }
anyhow = "1.0"

View File

@@ -371,12 +371,6 @@ impl From<serde_cbor::Error> for Error {
}
}
impl From<erased_serde::Error> for Error {
fn from(k: erased_serde::Error) -> Self {
Self::with_msg(k.to_string())
}
}
impl From<std::fmt::Error> for Error {
fn from(k: std::fmt::Error) -> Self {
Self::with_msg(k.to_string())

View File

@@ -604,20 +604,25 @@ impl DataApiPython3DataStream {
};
};
if !*header_out {
let byte_order = if b.be[i1] {
let byte_order = if *b.be.get(i1).unwrap() {
Api1ByteOrder::Big
} else {
Api1ByteOrder::Little
};
let comp = if do_decompress {
None
} else {
b.comps.get(i1).unwrap().clone()
};
let head = Api1ChannelHeader::new(
channel.name().into(),
b.scalar_types.get(i1).unwrap().into(),
byte_order,
shape.clone(),
b.comps.get(i1).map(|x| x.clone()).unwrap(),
comp.clone(),
);
let h = serde_json::to_string(&head)?;
debug!("sending channel header {}", h);
debug!("sending channel header comp {:?} {}", comp, h);
let l1 = 1 + h.as_bytes().len() as u32;
d.put_u32(l1);
d.put_u8(0);

View File

@@ -14,7 +14,5 @@ bytes = "1.4"
byteorder = "1.4"
hex = "0.4.3"
nom = "7.1.3"
num-traits = "0.2"
num-derive = "0.3"
err = { path = "../err" }
netpod = { path = "../netpod" }

View File

@@ -129,8 +129,8 @@ pub struct Api1ChannelHeader {
byte_order: Api1ByteOrder,
#[serde(default)]
shape: Vec<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
compression: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none", with = "serde_compression_method")]
compression: Option<CompressionMethod>,
}
impl Api1ChannelHeader {
@@ -146,7 +146,7 @@ impl Api1ChannelHeader {
ty,
byte_order,
shape: shape.to_u32_vec(),
compression: compression.map(|x| x.to_i16() as usize),
compression,
}
}
@@ -159,6 +159,148 @@ impl Api1ChannelHeader {
}
}
mod serde_compression_method {
use super::CompressionMethod;
use serde::de;
use serde::de::Visitor;
use serde::Deserializer;
use serde::Serializer;
use std::fmt;
pub fn serialize<S>(v: &Option<CompressionMethod>, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match v {
Some(v) => {
let n = match v {
CompressionMethod::BitshuffleLZ4 => 1,
};
ser.serialize_some(&n)
}
None => ser.serialize_none(),
}
}
struct VisC;
impl<'de> Visitor<'de> for VisC {
type Value = Option<CompressionMethod>;
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "compression method index")
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
match v {
0 => Ok(None),
1 => Ok(Some(CompressionMethod::BitshuffleLZ4)),
_ => Err(de::Error::unknown_variant("compression variant index", &["0"])),
}
}
}
struct Vis;
impl<'de> Visitor<'de> for Vis {
type Value = Option<CompressionMethod>;
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "optional compression method index")
}
fn visit_some<D>(self, de: D) -> Result<Self::Value, D::Error>
where
D: Deserializer<'de>,
{
de.deserialize_u64(VisC)
}
fn visit_none<E>(self) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(None)
}
}
pub fn deserialize<'de, D>(de: D) -> Result<Option<CompressionMethod>, D::Error>
where
D: Deserializer<'de>,
{
de.deserialize_option(Vis)
}
}
#[test]
fn basic_header_ser_00() {
let h = Api1ChannelHeader {
name: "Name".into(),
ty: Api1ScalarType::F32,
byte_order: Api1ByteOrder::Big,
shape: Vec::new(),
compression: None,
};
let js = serde_json::to_string(&h).unwrap();
let vals = serde_json::from_str::<serde_json::Value>(&js).unwrap();
let x = vals.as_object().unwrap().get("compression");
assert_eq!(x, None)
}
#[test]
fn basic_header_ser_01() {
let h = Api1ChannelHeader {
name: "Name".into(),
ty: Api1ScalarType::F32,
byte_order: Api1ByteOrder::Big,
shape: Vec::new(),
compression: Some(CompressionMethod::BitshuffleLZ4),
};
let js = serde_json::to_string(&h).unwrap();
let vals = serde_json::from_str::<serde_json::Value>(&js).unwrap();
let x = vals.as_object().unwrap().get("compression").unwrap().as_i64();
assert_eq!(x, Some(1))
}
#[test]
fn basic_header_deser_00() {
let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN" }"#;
let h: Api1ChannelHeader = serde_json::from_str(js).unwrap();
assert!(h.compression.is_none());
}
#[test]
fn basic_header_deser_01() {
let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": null }"#;
let h: Api1ChannelHeader = serde_json::from_str(js).unwrap();
assert!(h.compression.is_none());
}
#[test]
fn basic_header_deser_02() {
let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": 0 }"#;
let h: Api1ChannelHeader = serde_json::from_str(js).unwrap();
assert!(h.compression.is_none());
}
#[test]
fn basic_header_deser_03() {
let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": 1 }"#;
let h: Api1ChannelHeader = serde_json::from_str(js).unwrap();
assert!(h.compression.is_some());
assert_eq!(h.compression, Some(CompressionMethod::BitshuffleLZ4));
}
#[test]
fn basic_header_deser_04() {
let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": 2 }"#;
let res = serde_json::from_str::<Api1ChannelHeader>(js);
assert!(res.is_err());
}
// u32be length_1.
// there is exactly length_1 more bytes in this message.
// u8 mtype: 0: channel-header, 1: data

View File

@@ -17,9 +17,6 @@ use nom::number::complete::be_i64;
use nom::number::complete::be_i8;
use nom::number::complete::be_u8;
use nom::Needed;
use num_derive::FromPrimitive;
use num_derive::ToPrimitive;
use num_traits::ToPrimitive;
use serde::Deserialize;
use serde::Serialize;
use std::fmt;
@@ -69,15 +66,9 @@ where
Err(nom::Err::Error(e))
}
#[derive(Clone, Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum CompressionMethod {
BitshuffleLZ4 = 0,
}
impl CompressionMethod {
pub fn to_i16(&self) -> i16 {
ToPrimitive::to_i16(self).unwrap()
}
BitshuffleLZ4,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -205,11 +196,9 @@ pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
false => (inp, None),
true => {
let (inp, cm) = be_u8(inp)?;
match num_traits::FromPrimitive::from_u8(cm) {
Some(k) => (inp, Some(k)),
None => {
return mkerr(format!("unknown compression"));
}
match cm {
0 => (inp, Some(CompressionMethod::BitshuffleLZ4)),
_ => return mkerr(format!("unknown compression")),
}
}
};

View File

@@ -8,21 +8,9 @@ edition = "2021"
path = "src/scyllaconn.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11.2"
erased-serde = "0.3"
tokio = { version = "1.23.0", default-features = false, features = ["time", "sync"] }
tracing = "0.1.37"
byteorder = "1.4.3"
bytes = "1.2.1"
num-traits = "0.2.15"
chrono = { version = "0.4.19", features = ["serde"] }
crc32fast = "1.3.2"
futures-util = "0.3.24"
async-channel = "1.8.0"
scylla = "0.8.1"
tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4", "with-serde_json-1"] }
async-channel = "1.9.0"
scylla = "0.8.2"
err = { path = "../err" }
netpod = { path = "../netpod" }
query = { path = "../query" }

View File

@@ -8,15 +8,6 @@ pub trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
}
impl<T> ErrConv<T> for Result<T, tokio_postgres::Error> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg(e.to_string())),
}
}
}
impl<T, A> ErrConv<T> for Result<T, async_channel::SendError<A>> {
fn err_conv(self) -> Result<T, Error> {
match self {