From bd70738e743291a6181f439f4e059b20718b49dc Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 28 Apr 2021 19:41:02 +0200 Subject: [PATCH] First test runs on remote, add U16, fix stuff --- dbconn/src/lib.rs | 7 +++++- disk/src/agg.rs | 42 +++++++++++++++++++++++++++++++--- disk/src/aggtest.rs | 12 +++++----- disk/src/cache/pbv.rs | 13 ++++------- disk/src/channelconfig.rs | 6 +++-- disk/src/dataopen.rs | 27 ++++++++++++++++------ disk/src/gen.rs | 42 +++++++++++++++++++--------------- disk/src/paths.rs | 11 ++++----- disk/src/raw/conn.rs | 2 +- netpod/src/lib.rs | 2 +- retrieval/src/bin/retrieval.rs | 4 ++-- 11 files changed, 110 insertions(+), 58 deletions(-) diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs index 799ccfb..cd7e7f2 100644 --- a/dbconn/src/lib.rs +++ b/dbconn/src/lib.rs @@ -18,7 +18,12 @@ pub async fn channel_exists(channel: &Channel, node_config: &NodeConfig) -> Resu .await?; info!("channel_exists {} rows", rows.len()); for row in rows { - info!(" db on channel search: {:?}", row); + info!( + " db on channel search: {:?} {:?} {:?}", + row, + row.columns(), + row.get::<_, i64>(0) + ); } drop(cjh); Ok(true) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index f8a3326..f324640 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -366,6 +366,8 @@ where S: Stream>, { inp: S, + errored: bool, + completed: bool, } impl Stream for Dim1F32Stream @@ -376,6 +378,13 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + if self.completed { + panic!("poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); + } match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { let mut ret = ValuesDim1 { @@ -388,6 +397,26 @@ where let ty = &k.scalar_types[i1]; let decomp = k.decomps[i1].as_ref().unwrap(); match ty { + U16 => { + const BY: usize = 2; + // do the conversion + let n1 = decomp.len(); + assert!(n1 % ty.bytes() as usize == 0); + let ele_count = n1 / ty.bytes() as usize; + let mut j = Vec::with_capacity(ele_count); + let mut p1 = 0; + for i1 in 0..ele_count { + let u = unsafe { + let mut r = [0u8; BY]; + std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY); + u16::from_be_bytes(r) + }; + j.push(u as f32); + p1 += BY; + } + ret.tss.push(k.tss[i1]); + ret.values.push(j); + } F64 => { const BY: usize = 8; // do the conversion @@ -395,7 +424,6 @@ where assert!(n1 % ty.bytes() as usize == 0); let ele_count = n1 / ty.bytes() as usize; let mut j = Vec::with_capacity(ele_count); - // this is safe for ints and floats unsafe { j.set_len(ele_count); } @@ -413,7 +441,11 @@ where ret.tss.push(k.tss[i1]); ret.values.push(j); } - _ => todo!(), + _ => { + let e = Error::with_msg(format!("Dim1F32Stream unhandled scalar type: {:?}", ty)); + self.errored = true; + return Ready(Some(Err(e))); + } } } Ready(Some(Ok(ret))) @@ -436,7 +468,11 @@ where T: Stream>, { fn into_dim_1_f32_stream(self) -> Dim1F32Stream { - Dim1F32Stream { inp: self } + Dim1F32Stream { + inp: self, + errored: false, + completed: false, + } } } diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 641730c..36b92d4 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -5,7 +5,7 @@ use crate::agg::binnedx::IntoBinnedXBins1; use crate::agg::make_test_node; use futures_util::StreamExt; use netpod::timeunits::*; -use netpod::{BinSpecDimT, Channel, ChannelConfig, NanoRange, ScalarType, Shape}; +use netpod::{BinSpecDimT, Channel, ChannelConfig, NanoRange, Nanos, ScalarType, Shape}; use std::future::ready; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -28,7 +28,7 @@ async fn agg_x_dim_0_inner() { name: "S10BC01-DBAM070:EOM1_T1".into(), }, keyspace: 2, - time_bin_size: DAY, + time_bin_size: Nanos { ns: DAY }, array: false, shape: Shape::Scalar, scalar_type: ScalarType::F64, @@ -40,7 +40,7 @@ async fn agg_x_dim_0_inner() { buffer_size: 1024 * 4, }; let bin_count = 20; - let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; + let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns; let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; let fut1 = super::eventblobs::EventBlobsComplete::new( @@ -98,7 +98,7 @@ async fn agg_x_dim_1_inner() { name: "wave1".into(), }, keyspace: 3, - time_bin_size: DAY, + time_bin_size: Nanos { ns: DAY }, array: true, shape: Shape::Wave(1024), scalar_type: ScalarType::F64, @@ -110,7 +110,7 @@ async fn agg_x_dim_1_inner() { buffer_size: 17, }; let bin_count = 10; - let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; + let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns; let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; let fut1 = super::eventblobs::EventBlobsComplete::new( @@ -160,7 +160,7 @@ async fn merge_0_inner() { name: "wave1".into(), }, keyspace: 3, - time_bin_size: DAY, + time_bin_size: Nanos { ns: DAY }, array: true, shape: Shape::Wave(17), scalar_type: ScalarType::F64, diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index c67cda0..2305f8c 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -108,20 +108,15 @@ impl PreBinnedValueStream { assert!(g / h > 1); assert!(g / h < 200); assert!(g % h == 0); - let bin_size = range.grid_spec.bin_t_len(); let channel = self.channel.clone(); let agg_kind = self.agg_kind.clone(); let node_config = self.node_config.clone(); let patch_it = PreBinnedPatchIterator::from_range(range); let s = futures_util::stream::iter(patch_it) - .map(move |coord| { - PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), &node_config) - }) - .flatten() - .map(move |k| { - error!("NOTE NOTE NOTE try_setup_fetch_prebinned_higher_res ITEM from sub res bin_size {} {:?}", bin_size, k); - k - }); + .map(move |coord| { + PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), &node_config) + }) + .flatten(); self.fut2 = Some(Box::pin(s)); } None => { diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index 43c65e5..2aba793 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -1,5 +1,6 @@ use err::Error; -use netpod::{Channel, NanoRange, Node}; +use netpod::timeunits::MS; +use netpod::{Channel, NanoRange, Nanos, Node}; use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; use nom::Needed; #[allow(unused_imports)] @@ -64,7 +65,7 @@ pub struct ConfigEntry { pub ts: u64, pub pulse: i64, pub ks: i32, - pub bs: i64, + pub bs: Nanos, pub split_count: i32, pub status: i32, pub bb: i8, @@ -133,6 +134,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes> { let (inp, pulse) = be_i64(inp)?; let (inp, ks) = be_i32(inp)?; let (inp, bs) = be_i64(inp)?; + let bs = Nanos { ns: bs as u64 * MS }; let (inp, split_count) = be_i32(inp)?; let (inp, status) = be_i32(inp)?; let (inp, bb) = be_i8(inp)?; diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 56597f2..ed2b963 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -3,6 +3,7 @@ use bytes::BytesMut; use err::Error; use futures_util::StreamExt; use netpod::log::*; +use netpod::timeunits::MS; use netpod::{ChannelConfig, NanoRange, Nanos, Node}; use std::mem::size_of; use tokio::fs::{File, OpenOptions}; @@ -55,19 +56,22 @@ async fn open_files_inner( } } timebins.sort_unstable(); - info!("TIMEBINS FOUND: {:?}", timebins); + let tgt_tb = (range.beg / MS) as f32 / (channel_config.time_bin_size.ns / MS) as f32; + trace!("tgt_tb: {:?}", tgt_tb); + trace!("timebins found: {:?}", timebins); for &tb in &timebins { let ts_bin = Nanos { - ns: tb * channel_config.time_bin_size, + ns: tb * channel_config.time_bin_size.ns, }; if ts_bin.ns >= range.end { continue; } - if ts_bin.ns + channel_config.time_bin_size <= range.beg { + if ts_bin.ns + channel_config.time_bin_size.ns <= range.beg { continue; } - + info!("opening tb {:?}", &tb); let path = paths::datapath(tb, &channel_config, &node); + info!("opening path {:?}", &path); let mut file = OpenOptions::new().read(true).open(&path).await?; info!("opened file {:?} {:?}", &path, &file); @@ -76,14 +80,21 @@ async fn open_files_inner( match OpenOptions::new().read(true).open(&index_path).await { Ok(mut index_file) => { let meta = index_file.metadata().await?; - if meta.len() > 1024 * 1024 * 10 { + if meta.len() > 1024 * 1024 * 20 { return Err(Error::with_msg(format!( "too large index file {} bytes for {}", meta.len(), channel_config.channel.name ))); } - if meta.len() % 16 != 0 { + if meta.len() < 2 { + return Err(Error::with_msg(format!( + "bad meta len {} for {}", + meta.len(), + channel_config.channel.name + ))); + } + if meta.len() % 16 != 2 { return Err(Error::with_msg(format!( "bad meta len {} for {}", meta.len(), @@ -94,7 +105,7 @@ async fn open_files_inner( buf.resize(buf.capacity(), 0); info!("read exact index file {} {}", buf.len(), buf.len() % 16); index_file.read_exact(&mut buf).await?; - match find_ge(range.beg, &buf)? { + match find_ge(range.beg, &buf[2..])? { Some(o) => { info!("FOUND ts IN INDEX: {:?}", o); file.seek(SeekFrom::Start(o.1)).await?; @@ -109,6 +120,7 @@ async fn open_files_inner( ErrorKind::NotFound => { // TODO Read first 1k, assume that channel header fits. // TODO Seek via binary search. Can not read whole file into memory! + error!("TODO Seek directly in scalar file"); todo!("Seek directly in scalar file"); } _ => Err(e)?, @@ -120,6 +132,7 @@ async fn open_files_inner( chtx.send(Ok(file)).await?; } + warn!("OPEN FILES LOOP DONE"); Ok(()) } diff --git a/disk/src/gen.rs b/disk/src/gen.rs index eef12d0..73200f7 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -2,8 +2,8 @@ use crate::ChannelConfigExt; use bitshuffle::bitshuffle_compress; use bytes::{BufMut, BytesMut}; use err::Error; -use netpod::ScalarType; use netpod::{timeunits::*, Channel, ChannelConfig, Node, Shape}; +use netpod::{Nanos, ScalarType}; use std::path::{Path, PathBuf}; use tokio::fs::{File, OpenOptions}; use tokio::io::AsyncWriteExt; @@ -25,7 +25,7 @@ pub async fn gen_test_data() -> Result<(), Error> { name: "wave1".into(), }, keyspace: 3, - time_bin_size: DAY, + time_bin_size: Nanos { ns: DAY }, array: true, scalar_type: ScalarType::F64, shape: Shape::Wave(21), @@ -84,11 +84,11 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> .await .map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?; let mut evix = 0; - let mut ts = 0; - while ts < DAY * 2 { + let mut ts = Nanos { ns: 0 }; + while ts.ns < DAY * 2 { let res = gen_timebin(evix, ts, chn.time_spacing, &channel_path, &chn.config, node, ensemble).await?; evix = res.evix; - ts = res.ts; + ts.ns = res.ts.ns; } Ok(()) } @@ -130,8 +130,8 @@ async fn gen_config( buf.put_i32(0x20202020); buf.put_i64(ts); buf.put_i64(pulse); - buf.put_i32(config.keyspace as i32); - buf.put_i64(config.time_bin_size as i64); + buf.put_u32(config.keyspace as u32); + buf.put_u64(config.time_bin_size.ns); buf.put_i32(sc); buf.put_i32(status); buf.put_i8(bb); @@ -212,25 +212,25 @@ impl CountedFile { struct GenTimebinRes { evix: u64, - ts: u64, + ts: Nanos, } async fn gen_timebin( evix: u64, - ts: u64, + ts: Nanos, ts_spacing: u64, channel_path: &Path, config: &ChannelConfig, node: &Node, ensemble: &Ensemble, ) -> Result { - let tb = ts / config.time_bin_size; + let tb = ts.ns / config.time_bin_size.ns; let path = channel_path .join(format!("{:019}", tb)) .join(format!("{:010}", node.split)); tokio::fs::create_dir_all(&path).await?; - let data_path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size / MS, 0)); - let index_path = path.join(format!("{:019}_{:05}_Data_Index", config.time_bin_size / MS, 0)); + let data_path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size.ns / MS, 0)); + let index_path = path.join(format!("{:019}_{:05}_Data_Index", config.time_bin_size.ns / MS, 0)); info!("open data file {:?}", data_path); let file = OpenOptions::new() .write(true) @@ -247,20 +247,24 @@ async fn gen_timebin( .truncate(true) .open(index_path) .await?; - Some(CountedFile::new(f)) + let mut f = CountedFile::new(f); + f.write_all(b"\x00\x00").await?; + Some(f) } else { None }; gen_datafile_header(&mut file, config).await?; let mut evix = evix; let mut ts = ts; - let tsmax = (tb + 1) * config.time_bin_size; - while ts < tsmax { + let tsmax = Nanos { + ns: (tb + 1) * config.time_bin_size.ns, + }; + while ts.ns < tsmax.ns { if evix % ensemble.nodes.len() as u64 == node.split as u64 { gen_event(&mut file, index_file.as_mut(), evix, ts, config).await?; } evix += 1; - ts += ts_spacing; + ts.ns += ts_spacing; } let ret = GenTimebinRes { evix, ts }; Ok(ret) @@ -282,13 +286,13 @@ async fn gen_event( file: &mut CountedFile, index_file: Option<&mut CountedFile>, evix: u64, - ts: u64, + ts: Nanos, config: &ChannelConfig, ) -> Result<(), Error> { let mut buf = BytesMut::with_capacity(1024 * 16); buf.put_i32(0xcafecafe as u32 as i32); buf.put_u64(0xcafecafe); - buf.put_u64(ts); + buf.put_u64(ts.ns); buf.put_u64(2323); buf.put_u64(0xcafecafe); buf.put_u8(0); @@ -341,7 +345,7 @@ async fn gen_event( file.write_all(buf.as_ref()).await?; if let Some(f) = index_file { let mut buf = BytesMut::with_capacity(16); - buf.put_u64(ts); + buf.put_u64(ts.ns); buf.put_u64(z); f.write_all(&buf).await?; } diff --git a/disk/src/paths.rs b/disk/src/paths.rs index 3dc4761..e39ab1b 100644 --- a/disk/src/paths.rs +++ b/disk/src/paths.rs @@ -11,10 +11,7 @@ pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> Pa .join(config.channel.name.clone()) .join(format!("{:019}", timebin)) .join(format!("{:010}", node.split)) - .join(format!( - "{:019}_00000_Data", - config.time_bin_size / netpod::timeunits::MS - )) + .join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS)) } pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) -> Result { @@ -28,19 +25,19 @@ pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) -> pub fn data_dir_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result { let ret = channel_timebins_dir_path(channel_config, node)? - .join(format!("{:019}", ts.ns / channel_config.time_bin_size)) + .join(format!("{:019}", ts.ns / channel_config.time_bin_size.ns)) .join(format!("{:010}", node.split)); Ok(ret) } pub fn data_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result { - let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size / MS, 0); + let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size.ns / MS, 0); let ret = data_dir_path(ts, channel_config, node)?.join(fname); Ok(ret) } pub fn index_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result { - let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size / MS, 0); + let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size.ns / MS, 0); let ret = data_dir_path(ts, channel_config, node)?.join(fname); Ok(ret) } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index e080471..5ddcf93 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -166,7 +166,7 @@ async fn raw_conn_handler_inner_try( channel_config: netpod::ChannelConfig { channel: evq.channel.clone(), keyspace: entry.ks as u8, - time_bin_size: entry.bs as u64, + time_bin_size: entry.bs, shape: shape, scalar_type: ScalarType::from_dtype_index(entry.dtype.to_i16() as u8), big_endian: entry.is_big_endian, diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 14e8114..d3df7cf 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -193,7 +193,7 @@ impl NanoRange { pub struct ChannelConfig { pub channel: Channel, pub keyspace: u8, - pub time_bin_size: u64, + pub time_bin_size: Nanos, pub scalar_type: ScalarType, pub compression: bool, pub shape: Shape, diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index 694602c..ba82c3c 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -1,5 +1,5 @@ use err::Error; -use netpod::NodeConfig; +use netpod::{Nanos, NodeConfig}; use tokio::io::AsyncReadExt; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -59,7 +59,7 @@ fn simple_fetch() { name: "S10BC01-DBAM070:BAM_CH1_NORM".into(), }, keyspace: 3, - time_bin_size: DAY, + time_bin_size: Nanos { ns: DAY }, array: true, scalar_type: ScalarType::F64, shape: Shape::Wave(err::todoval()),