diff --git a/crates/disk/src/dataopen.rs b/crates/disk/src/dataopen.rs index 567a51e..e69b57e 100644 --- a/crates/disk/src/dataopen.rs +++ b/crates/disk/src/dataopen.rs @@ -18,6 +18,7 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncSeekExt; use tokio::io::ErrorKind; use tokio::io::SeekFrom; +use tracing::Instrument; #[cfg(test)] const BACKEND: &str = "testbackend-00"; @@ -212,28 +213,33 @@ impl fmt::Debug for OpenedFile { pub fn open_files( range: &NanoRange, fetch_info: &SfChFetchInfo, + reqid: &str, node: Node, ) -> async_channel::Receiver> { + let span = tracing::span!(tracing::Level::DEBUG, "open_files", reqid); let (chtx, chrx) = async_channel::bounded(2); let range = range.clone(); let fetch_info = fetch_info.clone(); - tokio::spawn(async move { - match open_files_inner(&chtx, &range, &fetch_info, node).await { - Ok(_) => {} - Err(e) => { - let e = e.add_public_msg(format!( - "Can not open file for channel: {fetch_info:?} range: {range:?}" - )); - match chtx.send(Err(e.into())).await { - Ok(_) => {} - Err(e) => { - // This case is fine. - debug!("open_files channel send error {:?}", e); + tokio::spawn( + async move { + match open_files_inner(&chtx, &range, &fetch_info, node).await { + Ok(_) => {} + Err(e) => { + let e = e.add_public_msg(format!( + "Can not open file for channel: {fetch_info:?} range: {range:?}" + )); + match chtx.send(Err(e.into())).await { + Ok(_) => {} + Err(e) => { + // This case is fine. + debug!("open_files channel send error {:?}", e); + } } } } } - }); + .instrument(span), + ); chrx } diff --git a/crates/disk/src/eventblobs.rs b/crates/disk/src/eventblobs.rs index 6592e10..dc3d274 100644 --- a/crates/disk/src/eventblobs.rs +++ b/crates/disk/src/eventblobs.rs @@ -72,7 +72,7 @@ impl EventChunkerMultifile { let file_chan = if expand { open_expanded_files(&range, &fetch_info, node) } else { - open_files(&range, &fetch_info, node) + open_files(&range, &fetch_info, reqctx.reqid(), node) }; Self { file_chan, diff --git a/crates/err/Cargo.toml b/crates/err/Cargo.toml index 0ab6c8f..4e0531e 100644 --- a/crates/err/Cargo.toml +++ b/crates/err/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "err" -version = "0.0.3" +version = "0.0.4" authors = ["Dominik Werder "] edition = "2021" @@ -8,17 +8,15 @@ edition = "2021" doctest = false [dependencies] -backtrace = "0.3" +backtrace = "0.3.68" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -serde_cbor = "0.11" -erased-serde = "0.3" +serde_cbor = "0.11.2" rmp-serde = "1.1.1" -async-channel = "1.8.0" -chrono = { version = "0.4", features = ["serde"] } -url = "2.2" -regex = "1.5" -http = "0.2" -#thiserror = "1.0" +async-channel = "1.9.0" +chrono = { version = "0.4.26", features = ["serde"] } +url = "2.4.0" +regex = "1.9.1" +http = "0.2.9" thiserror = { path = "../../../thiserror" } anyhow = "1.0" diff --git a/crates/err/src/lib.rs b/crates/err/src/lib.rs index ea36ca0..85e04a6 100644 --- a/crates/err/src/lib.rs +++ b/crates/err/src/lib.rs @@ -371,12 +371,6 @@ impl From for Error { } } -impl From for Error { - fn from(k: erased_serde::Error) -> Self { - Self::with_msg(k.to_string()) - } -} - impl From for Error { fn from(k: std::fmt::Error) -> Self { Self::with_msg(k.to_string()) diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index eb17877..66bc380 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -604,20 +604,25 @@ impl DataApiPython3DataStream { }; }; if !*header_out { - let byte_order = if b.be[i1] { + let byte_order = if *b.be.get(i1).unwrap() { Api1ByteOrder::Big } else { Api1ByteOrder::Little }; + let comp = if do_decompress { + None + } else { + b.comps.get(i1).unwrap().clone() + }; let head = Api1ChannelHeader::new( channel.name().into(), b.scalar_types.get(i1).unwrap().into(), byte_order, shape.clone(), - b.comps.get(i1).map(|x| x.clone()).unwrap(), + comp.clone(), ); let h = serde_json::to_string(&head)?; - debug!("sending channel header {}", h); + debug!("sending channel header comp {:?} {}", comp, h); let l1 = 1 + h.as_bytes().len() as u32; d.put_u32(l1); d.put_u8(0); diff --git a/crates/parse/Cargo.toml b/crates/parse/Cargo.toml index 02fb862..4c02396 100644 --- a/crates/parse/Cargo.toml +++ b/crates/parse/Cargo.toml @@ -14,7 +14,5 @@ bytes = "1.4" byteorder = "1.4" hex = "0.4.3" nom = "7.1.3" -num-traits = "0.2" -num-derive = "0.3" err = { path = "../err" } netpod = { path = "../netpod" } diff --git a/crates/parse/src/api1_parse.rs b/crates/parse/src/api1_parse.rs index e933f11..64f3160 100644 --- a/crates/parse/src/api1_parse.rs +++ b/crates/parse/src/api1_parse.rs @@ -129,8 +129,8 @@ pub struct Api1ChannelHeader { byte_order: Api1ByteOrder, #[serde(default)] shape: Vec, - #[serde(default, skip_serializing_if = "Option::is_none")] - compression: Option, + #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_compression_method")] + compression: Option, } impl Api1ChannelHeader { @@ -146,7 +146,7 @@ impl Api1ChannelHeader { ty, byte_order, shape: shape.to_u32_vec(), - compression: compression.map(|x| x.to_i16() as usize), + compression, } } @@ -159,6 +159,148 @@ impl Api1ChannelHeader { } } +mod serde_compression_method { + use super::CompressionMethod; + use serde::de; + use serde::de::Visitor; + use serde::Deserializer; + use serde::Serializer; + use std::fmt; + + pub fn serialize(v: &Option, ser: S) -> Result + where + S: Serializer, + { + match v { + Some(v) => { + let n = match v { + CompressionMethod::BitshuffleLZ4 => 1, + }; + ser.serialize_some(&n) + } + None => ser.serialize_none(), + } + } + + struct VisC; + + impl<'de> Visitor<'de> for VisC { + type Value = Option; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "compression method index") + } + + fn visit_u64(self, v: u64) -> Result + where + E: de::Error, + { + match v { + 0 => Ok(None), + 1 => Ok(Some(CompressionMethod::BitshuffleLZ4)), + _ => Err(de::Error::unknown_variant("compression variant index", &["0"])), + } + } + } + + struct Vis; + + impl<'de> Visitor<'de> for Vis { + type Value = Option; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "optional compression method index") + } + + fn visit_some(self, de: D) -> Result + where + D: Deserializer<'de>, + { + de.deserialize_u64(VisC) + } + + fn visit_none(self) -> Result + where + E: de::Error, + { + Ok(None) + } + } + + pub fn deserialize<'de, D>(de: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + de.deserialize_option(Vis) + } +} + +#[test] +fn basic_header_ser_00() { + let h = Api1ChannelHeader { + name: "Name".into(), + ty: Api1ScalarType::F32, + byte_order: Api1ByteOrder::Big, + shape: Vec::new(), + compression: None, + }; + let js = serde_json::to_string(&h).unwrap(); + let vals = serde_json::from_str::(&js).unwrap(); + let x = vals.as_object().unwrap().get("compression"); + assert_eq!(x, None) +} + +#[test] +fn basic_header_ser_01() { + let h = Api1ChannelHeader { + name: "Name".into(), + ty: Api1ScalarType::F32, + byte_order: Api1ByteOrder::Big, + shape: Vec::new(), + compression: Some(CompressionMethod::BitshuffleLZ4), + }; + let js = serde_json::to_string(&h).unwrap(); + let vals = serde_json::from_str::(&js).unwrap(); + let x = vals.as_object().unwrap().get("compression").unwrap().as_i64(); + assert_eq!(x, Some(1)) +} + +#[test] +fn basic_header_deser_00() { + let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN" }"#; + let h: Api1ChannelHeader = serde_json::from_str(js).unwrap(); + assert!(h.compression.is_none()); +} + +#[test] +fn basic_header_deser_01() { + let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": null }"#; + let h: Api1ChannelHeader = serde_json::from_str(js).unwrap(); + assert!(h.compression.is_none()); +} + +#[test] +fn basic_header_deser_02() { + let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": 0 }"#; + let h: Api1ChannelHeader = serde_json::from_str(js).unwrap(); + assert!(h.compression.is_none()); +} + +#[test] +fn basic_header_deser_03() { + let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": 1 }"#; + let h: Api1ChannelHeader = serde_json::from_str(js).unwrap(); + assert!(h.compression.is_some()); + assert_eq!(h.compression, Some(CompressionMethod::BitshuffleLZ4)); +} + +#[test] +fn basic_header_deser_04() { + let js = r#"{ "name": "ch1", "type": "float64", "byteOrder": "LITTLE_ENDIAN", "compression": 2 }"#; + let res = serde_json::from_str::(js); + assert!(res.is_err()); +} + // u32be length_1. // there is exactly length_1 more bytes in this message. // u8 mtype: 0: channel-header, 1: data diff --git a/crates/parse/src/channelconfig.rs b/crates/parse/src/channelconfig.rs index 22fffe1..e9122b6 100644 --- a/crates/parse/src/channelconfig.rs +++ b/crates/parse/src/channelconfig.rs @@ -17,9 +17,6 @@ use nom::number::complete::be_i64; use nom::number::complete::be_i8; use nom::number::complete::be_u8; use nom::Needed; -use num_derive::FromPrimitive; -use num_derive::ToPrimitive; -use num_traits::ToPrimitive; use serde::Deserialize; use serde::Serialize; use std::fmt; @@ -69,15 +66,9 @@ where Err(nom::Err::Error(e)) } -#[derive(Clone, Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum CompressionMethod { - BitshuffleLZ4 = 0, -} - -impl CompressionMethod { - pub fn to_i16(&self) -> i16 { - ToPrimitive::to_i16(self).unwrap() - } + BitshuffleLZ4, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -205,11 +196,9 @@ pub fn parse_entry(inp: &[u8]) -> NRes> { false => (inp, None), true => { let (inp, cm) = be_u8(inp)?; - match num_traits::FromPrimitive::from_u8(cm) { - Some(k) => (inp, Some(k)), - None => { - return mkerr(format!("unknown compression")); - } + match cm { + 0 => (inp, Some(CompressionMethod::BitshuffleLZ4)), + _ => return mkerr(format!("unknown compression")), } } }; diff --git a/crates/scyllaconn/Cargo.toml b/crates/scyllaconn/Cargo.toml index 09cd365..9ec4e70 100644 --- a/crates/scyllaconn/Cargo.toml +++ b/crates/scyllaconn/Cargo.toml @@ -8,21 +8,9 @@ edition = "2021" path = "src/scyllaconn.rs" [dependencies] -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -serde_cbor = "0.11.2" -erased-serde = "0.3" -tokio = { version = "1.23.0", default-features = false, features = ["time", "sync"] } -tracing = "0.1.37" -byteorder = "1.4.3" -bytes = "1.2.1" -num-traits = "0.2.15" -chrono = { version = "0.4.19", features = ["serde"] } -crc32fast = "1.3.2" futures-util = "0.3.24" -async-channel = "1.8.0" -scylla = "0.8.1" -tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4", "with-serde_json-1"] } +async-channel = "1.9.0" +scylla = "0.8.2" err = { path = "../err" } netpod = { path = "../netpod" } query = { path = "../query" } diff --git a/crates/scyllaconn/src/errconv.rs b/crates/scyllaconn/src/errconv.rs index 9841060..c1fef92 100644 --- a/crates/scyllaconn/src/errconv.rs +++ b/crates/scyllaconn/src/errconv.rs @@ -8,15 +8,6 @@ pub trait ErrConv { fn err_conv(self) -> Result; } -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg(e.to_string())), - } - } -} - impl ErrConv for Result> { fn err_conv(self) -> Result { match self {