From 1150bb3c5574cf4934107fd255e16084f24b809d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 16 Apr 2021 14:38:46 +0200 Subject: [PATCH] WIP --- disk/src/agg.rs | 3 + disk/src/cache.rs | 12 +- disk/src/channelconfig.rs | 233 +++++++++++++++++++++++++++++++++ disk/src/gen.rs | 72 +++++++++- disk/src/lib.rs | 24 ++++ disk/src/raw.rs | 46 +++++++ httpret/Cargo.toml | 6 +- httpret/src/lib.rs | 24 ++++ netpod/src/lib.rs | 90 ++++++------- retrieval/src/bin/retrieval.rs | 1 + rustfmt.toml | 3 + taskrun/src/lib.rs | 7 + 12 files changed, 462 insertions(+), 59 deletions(-) create mode 100644 disk/src/channelconfig.rs create mode 100644 disk/src/raw.rs create mode 100644 rustfmt.toml diff --git a/disk/src/agg.rs b/disk/src/agg.rs index acc0928..ad54494 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -736,6 +736,7 @@ async fn agg_x_dim_0_inner() { }, keyspace: 2, time_bin_size: DAY, + array: false, shape: Shape::Scalar, scalar_type: ScalarType::F64, big_endian: true, @@ -791,6 +792,7 @@ async fn agg_x_dim_1_inner() { }, keyspace: 3, time_bin_size: DAY, + array: true, shape: Shape::Wave(1024), scalar_type: ScalarType::F64, big_endian: true, @@ -840,6 +842,7 @@ async fn merge_0_inner() { }, keyspace: 3, time_bin_size: DAY, + array: true, shape: Shape::Wave(17), scalar_type: ScalarType::F64, big_endian: true, diff --git a/disk/src/cache.rs b/disk/src/cache.rs index a463a98..1014960 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -54,7 +54,7 @@ pub fn binned_bytes_for_http(node_config: Arc, query: &Query) -> Res // TODO // Translate the Query TimeRange + AggKind into an iterator over the pre-binned patches. - let grid = PreBinnedPatchRange::covering_range(query.range.clone(), query.count, 0); + let grid = PreBinnedPatchRange::covering_range(query.range.clone(), query.count); match grid { Some(spec) => { info!("GOT PreBinnedPatchGridSpec: {:?}", spec); @@ -215,13 +215,13 @@ impl PreBinnedValueStream { beg: self.patch_coord.patch_beg(), end: self.patch_coord.patch_end(), }; - match PreBinnedPatchRange::covering_range(range, 2, 0) { + match PreBinnedPatchRange::covering_range(range, self.patch_coord.bin_count() + 1) { Some(range) => { let h = range.grid_spec.bin_t_len(); + info!("FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", g, h, g/h, g%h, range); assert!(g / h > 1); assert!(g / h < 20); assert!(g % h == 0); - info!("FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", g, h, g/h, g%h, range); let bin_size = range.grid_spec.bin_t_len(); let channel = self.channel.clone(); let agg_kind = self.agg_kind.clone(); @@ -239,6 +239,12 @@ impl PreBinnedValueStream { self.fut2 = Some(Box::pin(s)); } None => { + // TODO now try to read raw data. + // TODO Request the whole pre bin patch so that I have the option to save it as cache file if complete. + + // TODO The merging and other compute will be done by this node. + // TODO This node needs as input the splitted data streams. + // TODO Add a separate tcp server which can provide the parsed, unpacked, event-local-processed, reserialized streams. error!("TODO NO BETTER GRAN FOUND FOR g {}", g); todo!(); } diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs new file mode 100644 index 0000000..4cd369d --- /dev/null +++ b/disk/src/channelconfig.rs @@ -0,0 +1,233 @@ +#[allow(unused_imports)] +use nom::{IResult, bytes::complete::{tag, take, take_while_m_n}, combinator::map_res, sequence::tuple}; +use nom::number::complete::{be_i8, be_u8, be_i16, be_i32, be_i64}; +use crate::{Error, BadError}; +use num_derive::{FromPrimitive, ToPrimitive}; +use num_traits::{ToPrimitive}; +use serde_derive::{Serialize, Deserialize}; + +#[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] +pub enum DType { + Bool = 0, + Bool8 = 1, + Int8 = 2, + Uint8 = 3, + Int16 = 4, + Uint16 = 5, + Character = 6, + Int32 = 7, + Uint32 = 8, + Int64 = 9, + Uint64 = 10, + Float32 = 11, + Float64 = 12, + String = 13, +} + +impl DType { + pub fn to_i16(&self) -> i16 { ToPrimitive::to_i16(self).unwrap() } +} + +#[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] +pub enum CompressionMethod { + BitshuffleLZ4 = 0, +} + +impl CompressionMethod { + pub fn to_i16(&self) -> i16 { ToPrimitive::to_i16(self).unwrap() } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ConfigEntry { + pub ts: i64, + pub pulse: i64, + pub ks: i32, + pub bs: i64, + pub splitCount: i32, + pub status: i32, + pub bb: i8, + pub modulo: i32, + pub offset: i32, + /* + Precision: + 0 'default' whatever that is + -7 f32 + -16 f64 + */ + pub precision: i16, + pub dtype: DType, + pub isCompressed: bool, + pub isShaped: bool, + pub isArray: bool, + pub isBigEndian: bool, + pub compressionMethod: Option, + pub shape: Option>, + pub sourceName: Option, + unit: Option, + description: Option, + optionalFields: Option, + valueConverter: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Config { + pub formatVersion: i16, + pub channelName: String, + pub entries: Vec, +} + +fn parseShortString(inp: &[u8]) -> Result<(&[u8], Option), Error> { + let (inp, len1) = be_i32(inp)?; + if len1 == -1 { + return Ok((inp, None)); + } + if len1 < 4 { + return BadError(format!("bad string len {}", len1)); + } + if len1 > 500 { + return BadError(format!("large string len {}", len1)); + } + let (inp, snb) = take((len1 - 4) as usize)(inp)?; + let s1 = String::from_utf8(snb.to_vec())?; + Ok((inp, Some(s1))) +} + +/* +Parse a single configuration entry. +*/ +pub fn parseEntry(inp: &[u8]) -> Result<(&[u8], Option), Error> { + let (inp, len1) = be_i32(inp)?; + if len1 < 0 || len1 > 4000 { + return BadError(format!("ConfigEntry bad len1 {}", len1)); + } + if inp.len() == 0 { + return Ok((inp, None)); + } + if inp.len() < len1 as usize - 4 { + return BadError(format!("incomplete input")); + } + let inpE = &inp[(len1-8) as usize ..]; + let (inp, ts) = be_i64(inp)?; + let (inp, pulse) = be_i64(inp)?; + let (inp, ks) = be_i32(inp)?; + let (inp, bs) = be_i64(inp)?; + let (inp, splitCount) = be_i32(inp)?; + let (inp, status) = be_i32(inp)?; + let (inp, bb) = be_i8(inp)?; + let (inp, modulo) = be_i32(inp)?; + let (inp, offset) = be_i32(inp)?; + let (inp, precision) = be_i16(inp)?; + let (inp, dtlen) = be_i32(inp)?; + if dtlen > 100 { + return BadError(format!("unexpected data type len {}", dtlen)); + } + let (inp, dtmask) = be_u8(inp)?; + let isCompressed = dtmask & 0x80 != 0; + let isArray = dtmask & 0x40 != 0; + let isBigEndian = dtmask & 0x20 != 0; + let isShaped = dtmask & 0x10 != 0; + let (inp, dtype) = be_i8(inp)?; + if dtype > 13 { + return BadError(format!("unexpected data type {}", dtype)); + } + let dtype = match num_traits::FromPrimitive::from_i8(dtype) { + Some(k) => k, + None => return BadError(format!("Can not convert {} to DType", dtype)) + }; + let (inp, compressionMethod) = match isCompressed { + false => (inp, None), + true => { + let (inp, cm) = be_u8(inp)?; + match num_traits::FromPrimitive::from_u8(cm) { + Some(k) => (inp, Some(k)), + None => return BadError("unknown compression"), + } + } + }; + let (inp, shape) = match isShaped { + false => (inp, None), + true => { + let (mut inp, dim) = be_u8(inp)?; + if dim > 4 { return BadError(format!("unexpected number of dimensions: {}", dim)); } + let mut shape = vec![]; + for _ in 0..dim { + let t1 = be_i32(inp)?; + inp = t1.0; + shape.push(t1.1 as u32); + } + (inp, Some(shape)) + } + }; + let (inp, sourceName) = parseShortString(inp)?; + let (inp, unit) = parseShortString(inp)?; + let (inp, description) = parseShortString(inp)?; + let (inp, optionalFields) = parseShortString(inp)?; + let (inp, valueConverter) = parseShortString(inp)?; + assert_eq!(inp.len(), inpE.len()); + let (inpE, len2) = be_i32(inpE)?; + if len1 != len2 { + return BadError(format!("mismatch len1 {} len2 {}", len1, len2)); + } + Ok((inpE, Some(ConfigEntry { + ts, pulse, ks, bs, splitCount, status, bb, modulo, offset, precision, dtype, + isCompressed, isArray, isShaped, isBigEndian, compressionMethod, shape, + sourceName, unit, description, optionalFields, valueConverter, + }))) +} + +/* +Parse the full configuration file. +*/ +pub fn parseConfig(inp: &[u8]) -> Result { + let (inp, ver) = be_i16(inp)?; + let (inp, len1) = be_i32(inp)?; + if len1 <= 8 || len1 > 500 { + return BadError(format!("no channel name. len1 {}", len1)); + } + let (inp, chn) = take((len1 - 8) as usize)(inp)?; + let (inp, len2) = be_i32(inp)?; + if len1 != len2 { + return BadError(format!("Mismatch len1 {} len2 {}", len1, len2)); + } + let mut entries = vec![]; + let mut inpA = inp; + while inpA.len() > 0 { + let inp = inpA; + let (inp, e) = parseEntry(inp)?; + if let Some(e) = e { entries.push(e); } + inpA = inp; + } + Ok(Config{ + formatVersion: ver, + channelName: String::from_utf8(chn.to_vec())?, + entries: entries, + }) +} + +#[cfg(test)] +fn read_data() -> Vec { + use std::io::Read; + let mut f1 = std::fs::File::open("ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config").unwrap(); + let mut buf = vec![]; + f1.read_to_end(&mut buf).unwrap(); + buf +} + +#[test] fn parse_dummy() { + let config = parseConfig(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, + 0, 0, 0, 1, + ]).unwrap(); + assert_eq!(0, config.formatVersion); + assert_eq!("abc", config.channelName); +} + +#[test] fn open_file() { + let config = parseConfig(&readData()).unwrap(); + assert_eq!(0, config.formatVersion); + assert_eq!(9, config.entries.len()); + for e in &config.entries { + assert!(e.ts >= 631152000000000000); + assert!(e.ts <= 1591106812800073974); + assert!(e.shape.is_some()); + } +} diff --git a/disk/src/gen.rs b/disk/src/gen.rs index cddec8f..a3c57ce 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -15,13 +15,16 @@ use bitshuffle::bitshuffle_compress; use netpod::ScalarType; use std::sync::Arc; use netpod::{Node, Channel, ChannelConfig, Shape, timeunits::*}; +use crate::ChannelConfigExt; #[test] fn test_gen_test_data() { - taskrun::run(async { + let res = taskrun::run(async { gen_test_data().await?; Ok(()) - }).unwrap(); + }); + info!("{:?}", res); + res.unwrap(); } pub async fn gen_test_data() -> Result<(), Error> { @@ -40,6 +43,7 @@ pub async fn gen_test_data() -> Result<(), Error> { }, keyspace: 3, time_bin_size: DAY, + array: true, scalar_type: ScalarType::F64, shape: Shape::Wave(17), big_endian: true, @@ -87,12 +91,12 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> let config_path = node.data_base_path .join("config") .join(&chn.config.channel.name); - tokio::fs::create_dir_all(&config_path).await?; let channel_path = node.data_base_path .join(format!("{}_{}", node.ksprefix, chn.config.keyspace)) .join("byTime") .join(&chn.config.channel.name); tokio::fs::create_dir_all(&channel_path).await?; + gen_config(&config_path, &chn.config, node, ensemble).await.map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?; let mut evix = 0; let mut ts = 0; while ts < DAY { @@ -103,6 +107,68 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> Ok(()) } +async fn gen_config(config_path: &Path, config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { + let path = config_path.join("latest"); + tokio::fs::create_dir_all(&path).await?; + let path = path.join("00000_Config"); + info!("try to open {:?}", path); + let mut file = OpenOptions::new().write(true).create(true).truncate(true).open(path).await?; + let mut buf = BytesMut::with_capacity(1024 * 1); + let ver = 0; + buf.put_i16(ver); + let cnenc = config.channel.name.as_bytes(); + let len1 = cnenc.len() + 8; + buf.put_i32(len1 as i32); + buf.put(cnenc); + buf.put_i32(len1 as i32); + + let ts = 0; + let pulse = 0; + let sc = 0; + let status = 0; + let bb = 0; + let modulo = 0; + let offset = 0; + let precision = 0; + let p1 = buf.len(); + buf.put_i32(0x20202020); + buf.put_i64(ts); + buf.put_i64(pulse); + buf.put_i32(config.keyspace as i32); + buf.put_i64(config.time_bin_size as i64); + buf.put_i32(sc); + buf.put_i32(status); + buf.put_i8(bb); + buf.put_i32(modulo); + buf.put_i32(offset); + buf.put_i16(precision); + + { + // this len does not include itself and there seems to be no copy of it afterwards. + buf.put_i32(0x20202020); + let p3 = buf.len(); + buf.put_u8(config.dtflags()); + buf.put_u8(config.scalar_type.index()); + if config.compression { + let method = 0; + buf.put_i8(method); + } + match config.shape { + Shape::Scalar => {} + Shape::Wave(k) => { buf.put_i32(k as i32); } + } + let len = buf.len() - p3; + buf.as_mut()[p3..].as_mut().put_i32(len as i32); + } + + let p2 = buf.len(); + let len = p2 - p1 + 4; + buf.put_i32(len as i32); + buf.as_mut()[p1..].as_mut().put_i32(len as i32); + file.write(&buf); + Ok(()) +} + struct GenTimebinRes { evix: u64, ts: u64, diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 4f0db5d..0c7afac 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -14,11 +14,14 @@ use std::path::PathBuf; use bitshuffle::bitshuffle_decompress; use netpod::{ScalarType, Shape, Node, ChannelConfig}; use std::sync::Arc; +use crate::dtflags::{COMPRESSION, BIG_ENDIAN, ARRAY, SHAPE}; pub mod agg; pub mod gen; pub mod merge; pub mod cache; +pub mod raw; +pub mod channelconfig; pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Arc) -> Result { @@ -949,3 +952,24 @@ pub mod dtflags { pub const BIG_ENDIAN: u8 = 0x20; pub const SHAPE: u8 = 0x10; } + + +trait ChannelConfigExt { + fn dtflags(&self) -> u8; +} + +impl ChannelConfigExt for ChannelConfig { + + fn dtflags(&self) -> u8 { + let mut ret = 0; + if self.compression { ret |= COMPRESSION; } + match self.shape { + Shape::Scalar => {} + Shape::Wave(_) => { ret |= SHAPE; } + } + if self.big_endian { ret |= BIG_ENDIAN; } + if self.array { ret |= ARRAY; } + ret + } + +} diff --git a/disk/src/raw.rs b/disk/src/raw.rs new file mode 100644 index 0000000..5f40643 --- /dev/null +++ b/disk/src/raw.rs @@ -0,0 +1,46 @@ + + + +/* +Provide ser/de of value data to a good net exchange format. +*/ + + + +async fn local_unpacked_test() { + + // TODO what kind of query format? What information do I need here? + // Don't need exact details of channel because I need to parse the databuffer config anyway. + + /*let query = netpod::AggQuerySingleChannel { + channel_config: ChannelConfig { + channel: Channel { + backend: "ks".into(), + name: "wave1".into(), + }, + keyspace: 3, + time_bin_size: DAY, + shape: Shape::Wave(17), + scalar_type: ScalarType::F64, + big_endian: true, + compression: true, + }, + timebin: 0, + tb_file_count: 1, + buffer_size: 1024 * 8, + };*/ + + let query = todo!(); + let node = todo!(); + + // TODO generate channel configs for my test data. + + // TODO open and parse the channel config. + + // TODO find the matching config entry. (bonus: fuse consecutive compatible entries) + + + use crate::agg::{IntoDim1F32Stream}; + let stream = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) + .into_dim_1_f32_stream(); +} diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index ed6de7e..85cfdb1 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -5,15 +5,17 @@ authors = ["Dominik Werder "] edition = "2018" [dependencies] -hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } +serde_json = "1.0" http = "0.2" url = "2.2" +tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } bytes = "1.0.1" futures-core = "0.3.14" futures-util = "0.3.14" tracing = "0.1.25" -serde_json = "1.0" async-channel = "1.6" err = { path = "../err" } netpod = { path = "../netpod" } disk = { path = "../disk" } +taskrun = { path = "../taskrun" } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 55e7303..21d58e1 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -16,8 +16,11 @@ use sync::Arc; use disk::cache::PreBinnedQuery; use panic::{UnwindSafe, AssertUnwindSafe}; use bytes::Bytes; +use tokio::net::TcpStream; +use tokio::io::AsyncWriteExt; pub async fn host(node_config: Arc) -> Result<(), Error> { + let rawjh = taskrun::spawn(raw_service(node_config.clone())); let addr = SocketAddr::from(([0, 0, 0, 0], node_config.node.port)); let make_service = make_service_fn({ move |conn| { @@ -34,6 +37,7 @@ pub async fn host(node_config: Arc) -> Result<(), Error> { } }); Server::bind(&addr).serve(make_service).await?; + rawjh.await; Ok(()) } @@ -263,3 +267,23 @@ async fn prebinned(req: Request, node_config: Arc) -> Result) -> Result<(), Error> { + let lis = tokio::net::TcpListener::bind("0.0.0.0:5555").await?; + loop { + match lis.accept().await { + Ok((stream, addr)) => { + taskrun::spawn(raw_conn_handler(stream, addr)); + } + Err(e) => Err(e)? + } + } + Ok(()) +} + +async fn raw_conn_handler(mut stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { + info!("RAW HANDLER for {:?}", addr); + stream.write_i32_le(123).await?; + stream.flush().await?; + Ok(()) +} diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 0e0eee4..e80cd7d 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -174,11 +174,13 @@ pub struct ChannelConfig { pub keyspace: u8, pub time_bin_size: u64, pub scalar_type: ScalarType, - pub shape: Shape, - pub big_endian: bool, pub compression: bool, + pub shape: Shape, + pub array: bool, + pub big_endian: bool, } + #[derive(Clone, Debug, Serialize, Deserialize)] pub enum Shape { Scalar, @@ -297,7 +299,7 @@ impl PreBinnedPatchGridSpec { } pub fn is_valid_bin_t_len(bin_t_len: u64) -> bool { - for &j in PATCH_T_LEN_OPTIONS.iter() { + for &j in BIN_T_LEN_OPTIONS.iter() { if bin_t_len == j { return true; } @@ -306,32 +308,17 @@ impl PreBinnedPatchGridSpec { } pub fn patch_t_len(&self) -> u64 { - if self.bin_t_len == PATCH_T_LEN_OPTIONS[0] { - PATCH_T_LEN_OPTIONS[1] - } - else if self.bin_t_len == PATCH_T_LEN_OPTIONS[1] { - PATCH_T_LEN_OPTIONS[3] - } - else if self.bin_t_len == PATCH_T_LEN_OPTIONS[2] { - PATCH_T_LEN_OPTIONS[4] - } - else if self.bin_t_len == PATCH_T_LEN_OPTIONS[3] { - PATCH_T_LEN_OPTIONS[5] - } - else if self.bin_t_len == PATCH_T_LEN_OPTIONS[4] { - PATCH_T_LEN_OPTIONS[5] * 64 - } - else if self.bin_t_len == PATCH_T_LEN_OPTIONS[5] { - PATCH_T_LEN_OPTIONS[5] * 1024 - } - else { - panic!() + for (i1, &j) in BIN_T_LEN_OPTIONS.iter().enumerate() { + if self.bin_t_len == j { + return PATCH_T_LEN_OPTIONS[i1]; + } } + panic!() } } -const PATCH_T_LEN_OPTIONS: [u64; 6] = [ +const BIN_T_LEN_OPTIONS: [u64; 6] = [ SEC * 10, MIN * 10, HOUR, @@ -340,6 +327,15 @@ const PATCH_T_LEN_OPTIONS: [u64; 6] = [ DAY * 4, ]; +const PATCH_T_LEN_OPTIONS: [u64; 6] = [ + MIN * 10, + HOUR, + HOUR * 4, + DAY, + DAY * 4, + DAY * 12, +]; + #[derive(Clone, Debug)] pub struct PreBinnedPatchRange { @@ -350,47 +346,35 @@ pub struct PreBinnedPatchRange { impl PreBinnedPatchRange { - pub fn covering_range(range: NanoRange, min_bin_count: u64, finer: u64) -> Option { + pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option { assert!(min_bin_count >= 1); assert!(min_bin_count <= 2000); let dt = range.delta(); assert!(dt <= DAY * 14); let bs = dt / min_bin_count; //info!("BASIC bs {}", bs); - let mut found_count = 0; - let mut i1 = PATCH_T_LEN_OPTIONS.len(); + let mut i1 = BIN_T_LEN_OPTIONS.len(); loop { if i1 <= 0 { break None; } else { i1 -= 1; - let t = PATCH_T_LEN_OPTIONS[i1]; + let t = BIN_T_LEN_OPTIONS[i1]; //info!("look at threshold {} bs {}", t, bs); if t <= bs { - found_count += 1; - if found_count > finer { - let bs = t; - let ts1 = range.beg / bs * bs; - let ts2 = (range.end + bs - 1) / bs * bs; - let count = range.delta() / bs; - let patch_t_len = if i1 >= PATCH_T_LEN_OPTIONS.len() - 1 { - bs * 8 - } - else { - PATCH_T_LEN_OPTIONS[i1 + 1] * 8 - }; - let offset = ts1 / bs; - break Some(Self { - grid_spec: PreBinnedPatchGridSpec { - bin_t_len: bs, - }, - count, - offset, - }); - } - else { - } + let bs = t; + let ts1 = range.beg / bs * bs; + let ts2 = (range.end + bs - 1) / bs * bs; + let count = range.delta() / bs; + let offset = ts1 / bs; + break Some(Self { + grid_spec: PreBinnedPatchGridSpec { + bin_t_len: bs, + }, + count, + offset, + }); } else { } @@ -425,6 +409,10 @@ impl PreBinnedPatchCoord { self.spec.patch_t_len() * (self.ix + 1) } + pub fn bin_count(&self) -> u64 { + self.patch_t_len() / self.spec.bin_t_len + } + pub fn spec(&self) -> &PreBinnedPatchGridSpec { &self.spec } diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index a3de740..cc166de 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -50,6 +50,7 @@ fn simple_fetch() { }, keyspace: 3, time_bin_size: DAY, + array: true, scalar_type: ScalarType::F64, shape: Shape::Wave(todo!()), big_endian: true, diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..f8f0f42 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,3 @@ +unstable_features = true +empty_item_single_line = false +control_brace_style = "ClosingNextLine" diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index bc8e889..1706602 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -2,6 +2,8 @@ use tracing::{error, warn, info, debug, trace}; use err::Error; use std::panic; +use tokio::task::JoinHandle; +use std::future::Future; pub fn run>>(f: F) -> Result { tracing_init(); @@ -32,3 +34,8 @@ pub fn tracing_init() { .with_env_filter(tracing_subscriber::EnvFilter::new("info,retrieval=trace,disk=trace,tokio_postgres=info")) .init(); } + + +pub fn spawn(task: T) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static { + tokio::spawn(task) +}