From 21aa59e0d0fc0167a67d703ce9083b18501be650 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 12 May 2021 22:48:41 +0200 Subject: [PATCH] Extract support for all numeric types --- Cargo.toml | 2 +- disk/src/agg.rs | 104 ++++++++++++++++++++++++++++++++------- disk/src/eventchunker.rs | 8 +-- disk/src/index.rs | 9 ---- disk/src/raw/conn.rs | 32 +++++------- netpod/src/lib.rs | 4 +- 6 files changed, 104 insertions(+), 55 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 514ac3e..556a776 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,6 @@ members = ["retrieval"] [profile.release] debug = 1 -opt-level = 1 +opt-level = 2 #overflow-checks = true #debug-assertions = true diff --git a/disk/src/agg.rs b/disk/src/agg.rs index b982300..db318d4 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -356,10 +356,25 @@ macro_rules! make_get_values { }; } +make_get_values!(get_values_u8_le, u8, from_le_bytes, 1); make_get_values!(get_values_u16_le, u16, from_le_bytes, 2); +make_get_values!(get_values_u32_le, u32, from_le_bytes, 4); +make_get_values!(get_values_u64_le, u64, from_le_bytes, 8); +make_get_values!(get_values_i8_le, i8, from_le_bytes, 1); +make_get_values!(get_values_i16_le, i16, from_le_bytes, 2); +make_get_values!(get_values_i32_le, i32, from_le_bytes, 4); +make_get_values!(get_values_i64_le, i64, from_le_bytes, 8); make_get_values!(get_values_f32_le, f32, from_le_bytes, 4); make_get_values!(get_values_f64_le, f64, from_le_bytes, 8); + +make_get_values!(get_values_u8_be, u8, from_be_bytes, 1); make_get_values!(get_values_u16_be, u16, from_be_bytes, 2); +make_get_values!(get_values_u32_be, u32, from_be_bytes, 4); +make_get_values!(get_values_u64_be, u64, from_be_bytes, 8); +make_get_values!(get_values_i8_be, i8, from_be_bytes, 1); +make_get_values!(get_values_i16_be, i16, from_be_bytes, 2); +make_get_values!(get_values_i32_be, i32, from_be_bytes, 4); +make_get_values!(get_values_i64_be, i64, from_be_bytes, 8); make_get_values!(get_values_f32_be, f32, from_be_bytes, 4); make_get_values!(get_values_f64_be, f64, from_be_bytes, 8); @@ -381,41 +396,96 @@ impl Dim1F32Stream { let be = k.be[i1]; let decomp = k.decomps[i1].as_ref().unwrap(); match ty { - U16 if be => { - let value = get_values_u16_be(decomp, ty)?; + U8 => { + let value = if be { + get_values_u8_be(decomp, ty)? + } else { + get_values_u8_le(decomp, ty)? + }; ret.tss.push(k.tss[i1]); ret.values.push(value); } U16 => { - let value = get_values_u16_le(decomp, ty)?; + let value = if be { + get_values_u16_be(decomp, ty)? + } else { + get_values_u16_le(decomp, ty)? + }; ret.tss.push(k.tss[i1]); ret.values.push(value); } - F32 if be => { - let value = get_values_f32_be(decomp, ty)?; + U32 => { + let value = if be { + get_values_u32_be(decomp, ty)? + } else { + get_values_u32_le(decomp, ty)? + }; + ret.tss.push(k.tss[i1]); + ret.values.push(value); + } + U64 => { + let value = if be { + get_values_u64_be(decomp, ty)? + } else { + get_values_u64_le(decomp, ty)? + }; + ret.tss.push(k.tss[i1]); + ret.values.push(value); + } + I8 => { + let value = if be { + get_values_i8_be(decomp, ty)? + } else { + get_values_i8_le(decomp, ty)? + }; + ret.tss.push(k.tss[i1]); + ret.values.push(value); + } + I16 => { + let value = if be { + get_values_i16_be(decomp, ty)? + } else { + get_values_i16_le(decomp, ty)? + }; + ret.tss.push(k.tss[i1]); + ret.values.push(value); + } + I32 => { + let value = if be { + get_values_i32_be(decomp, ty)? + } else { + get_values_i32_le(decomp, ty)? + }; + ret.tss.push(k.tss[i1]); + ret.values.push(value); + } + I64 => { + let value = if be { + get_values_i64_be(decomp, ty)? + } else { + get_values_i64_le(decomp, ty)? + }; ret.tss.push(k.tss[i1]); ret.values.push(value); } F32 => { - let value = get_values_f32_le(decomp, ty)?; - ret.tss.push(k.tss[i1]); - ret.values.push(value); - } - F64 if be => { - let value = get_values_f64_be(decomp, ty)?; + let value = if be { + get_values_f32_be(decomp, ty)? + } else { + get_values_f32_le(decomp, ty)? + }; ret.tss.push(k.tss[i1]); ret.values.push(value); } F64 => { - let value = get_values_f64_le(decomp, ty)?; + let value = if be { + get_values_f64_be(decomp, ty)? + } else { + get_values_f64_le(decomp, ty)? + }; ret.tss.push(k.tss[i1]); ret.values.push(value); } - _ => { - let e = Error::with_msg(format!("Dim1F32Stream unhandled scalar type: {:?}", ty)); - self.errored = true; - return Err(e); - } } } Ok(ret) diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index a0ad273..2bbca4e 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -182,7 +182,6 @@ impl EventChunker { //debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size); assert!(value_bytes < 1024 * 256); assert!(block_size < 1024 * 32); - //let value_bytes = value_bytes; let type_size = scalar_type.bytes() as u32; let ele_count = value_bytes / type_size as u64; let ele_size = type_size; @@ -208,7 +207,6 @@ impl EventChunker { unsafe { decomp.set_len(decomp_bytes); } - //debug!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index); match bitshuffle_decompress( &buf.as_ref()[p1 as usize..], &mut decomp, @@ -218,7 +216,6 @@ impl EventChunker { ) { Ok(c1) => { assert!(c1 as u32 == k1); - //trace!("decompress result c1 {} k1 {}", c1, k1); if ts < self.range.beg { } else if ts >= self.range.end { Err(Error::with_msg(format!("event after range {}", ts / SEC)))?; @@ -238,13 +235,11 @@ impl EventChunker { }; } else { let p1 = sl.position(); - //info!("len: {} p1: {}", len, p1); if len < p1 as u32 + 4 { let msg = format!("uncomp len: {} p1: {}", len, p1); Err(Error::with_msg(msg))?; } let vlen = len - p1 as u32 - 4; - //info!("vlen: {}", vlen); let decomp = BytesMut::from(&buf[p1 as usize..(p1 as u32 + vlen) as usize]); ret.add_event( ts, @@ -315,8 +310,7 @@ impl Stream for EventChunker { Ready(None) } else if self.parsed_bytes >= self.stats_conf.disk_stats_every.bytes() as u64 { let item = EventDataReadStats { - //parsed_bytes: self.parsed_bytes, - parsed_bytes: 1000, + parsed_bytes: self.parsed_bytes, }; self.parsed_bytes = 0; let ret = EventChunkerItem::EventDataReadStats(item); diff --git a/disk/src/index.rs b/disk/src/index.rs index 0a27c25..db37d05 100644 --- a/disk/src/index.rs +++ b/disk/src/index.rs @@ -154,37 +154,28 @@ pub async fn position_file(mut file: File, beg: u64) -> Result { // } let hres = parse_channel_header(&buf)?; - info!("hres: {:?}", hres); let headoff = 2 + hres.0 as u64; let ev = parse_event(&buf[headoff as usize..])?; - info!("ev: {:?}", ev); let evlen = ev.0 as u64; - info!("flen: {} flen - headoff mod evlen: {}", flen, (flen - headoff) % evlen); let mut j = headoff; let mut k = ((flen - headoff) / evlen - 1) * evlen + headoff; - info!("j {} k {}", j, k); let x = ev.1.ns; let y = read_event_at(k, &mut file).await?.1.ns; - info!("x {} y {}", x, y); if x >= beg { - info!("found A"); file.seek(SeekFrom::Start(j)).await?; return Ok(file); } if y < beg { - info!("found B"); file.seek(SeekFrom::Start(j)).await?; return Ok(file); } loop { if k - j < 2 * evlen { - info!("found C"); file.seek(SeekFrom::Start(k)).await?; return Ok(file); } let m = j + (k - j) / 2 / evlen * evlen; let x = read_event_at(m, &mut file).await?.1.ns; - info!("event at m: {} ts: {}", m, x); if x < beg { j = m; } else { diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 0eafcee..138f65f 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -133,7 +133,6 @@ async fn raw_conn_handler_inner_try( Err(e) => return Err((e, netout))?, }; debug!("found config entry {:?}", entry); - let shape = match &entry.shape { Some(lens) => { if lens.len() == 1 { @@ -147,29 +146,24 @@ async fn raw_conn_handler_inner_try( } None => Shape::Scalar, }; - let query = netpod::AggQuerySingleChannel { - channel_config: netpod::ChannelConfig { - channel: evq.channel.clone(), - keyspace: entry.ks as u8, - time_bin_size: entry.bs, - shape: shape, - scalar_type: entry.scalar_type.clone(), - big_endian: entry.is_big_endian, - array: entry.is_array, - compression: entry.is_compressed, - }, - // TODO use a NanoRange and search for matching files - timebin: 0, - tb_file_count: 1, - // TODO use the requested buffer size - buffer_size: 1024 * 4, + let channel_config = netpod::ChannelConfig { + channel: evq.channel.clone(), + keyspace: entry.ks as u8, + time_bin_size: entry.bs, + shape: shape, + scalar_type: entry.scalar_type.clone(), + big_endian: entry.is_big_endian, + array: entry.is_array, + compression: entry.is_compressed, }; + // TODO use a requested buffer size + let buffer_size = 1024 * 4; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let mut s1 = EventBlobsComplete::new( range.clone(), - query.channel_config.clone(), + channel_config.clone(), node_config.node.clone(), - query.buffer_size as usize, + buffer_size, event_chunker_conf, ) .into_dim_1_f32_stream() diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 8d16cb9..a04400a 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -248,9 +248,9 @@ pub mod timeunits { pub const DAY: u64 = HOUR * 24; } -const BIN_T_LEN_OPTIONS: [u64; 3] = [SEC, MIN * 10, HOUR * 2]; +const BIN_T_LEN_OPTIONS: [u64; 4] = [SEC, MIN * 10, HOUR * 2, DAY]; -const PATCH_T_LEN_OPTIONS: [u64; 3] = [MIN * 20, HOUR * 12, DAY * 16]; +const PATCH_T_LEN_OPTIONS: [u64; 4] = [MIN * 10, HOUR * 2, DAY * 4, DAY * 32]; const BIN_THRESHOLDS: [u64; 31] = [ 2,