First test runs on remote, add U16, fix stuff
This commit is contained in:
@@ -366,6 +366,8 @@ where
|
||||
S: Stream<Item = Result<EventFull, Error>>,
|
||||
{
|
||||
inp: S,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
}
|
||||
|
||||
impl<S> Stream for Dim1F32Stream<S>
|
||||
@@ -376,6 +378,13 @@ where
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
if self.completed {
|
||||
panic!("poll_next on completed");
|
||||
}
|
||||
if self.errored {
|
||||
self.completed = true;
|
||||
return Ready(None);
|
||||
}
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
let mut ret = ValuesDim1 {
|
||||
@@ -388,6 +397,26 @@ where
|
||||
let ty = &k.scalar_types[i1];
|
||||
let decomp = k.decomps[i1].as_ref().unwrap();
|
||||
match ty {
|
||||
U16 => {
|
||||
const BY: usize = 2;
|
||||
// do the conversion
|
||||
let n1 = decomp.len();
|
||||
assert!(n1 % ty.bytes() as usize == 0);
|
||||
let ele_count = n1 / ty.bytes() as usize;
|
||||
let mut j = Vec::with_capacity(ele_count);
|
||||
let mut p1 = 0;
|
||||
for i1 in 0..ele_count {
|
||||
let u = unsafe {
|
||||
let mut r = [0u8; BY];
|
||||
std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY);
|
||||
u16::from_be_bytes(r)
|
||||
};
|
||||
j.push(u as f32);
|
||||
p1 += BY;
|
||||
}
|
||||
ret.tss.push(k.tss[i1]);
|
||||
ret.values.push(j);
|
||||
}
|
||||
F64 => {
|
||||
const BY: usize = 8;
|
||||
// do the conversion
|
||||
@@ -395,7 +424,6 @@ where
|
||||
assert!(n1 % ty.bytes() as usize == 0);
|
||||
let ele_count = n1 / ty.bytes() as usize;
|
||||
let mut j = Vec::with_capacity(ele_count);
|
||||
// this is safe for ints and floats
|
||||
unsafe {
|
||||
j.set_len(ele_count);
|
||||
}
|
||||
@@ -413,7 +441,11 @@ where
|
||||
ret.tss.push(k.tss[i1]);
|
||||
ret.values.push(j);
|
||||
}
|
||||
_ => todo!(),
|
||||
_ => {
|
||||
let e = Error::with_msg(format!("Dim1F32Stream unhandled scalar type: {:?}", ty));
|
||||
self.errored = true;
|
||||
return Ready(Some(Err(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ready(Some(Ok(ret)))
|
||||
@@ -436,7 +468,11 @@ where
|
||||
T: Stream<Item = Result<EventFull, Error>>,
|
||||
{
|
||||
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<T> {
|
||||
Dim1F32Stream { inp: self }
|
||||
Dim1F32Stream {
|
||||
inp: self,
|
||||
errored: false,
|
||||
completed: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::agg::binnedx::IntoBinnedXBins1;
|
||||
use crate::agg::make_test_node;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::{BinSpecDimT, Channel, ChannelConfig, NanoRange, ScalarType, Shape};
|
||||
use netpod::{BinSpecDimT, Channel, ChannelConfig, NanoRange, Nanos, ScalarType, Shape};
|
||||
use std::future::ready;
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
@@ -28,7 +28,7 @@ async fn agg_x_dim_0_inner() {
|
||||
name: "S10BC01-DBAM070:EOM1_T1".into(),
|
||||
},
|
||||
keyspace: 2,
|
||||
time_bin_size: DAY,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
array: false,
|
||||
shape: Shape::Scalar,
|
||||
scalar_type: ScalarType::F64,
|
||||
@@ -40,7 +40,7 @@ async fn agg_x_dim_0_inner() {
|
||||
buffer_size: 1024 * 4,
|
||||
};
|
||||
let bin_count = 20;
|
||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
|
||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns;
|
||||
let ts2 = ts1 + HOUR * 24;
|
||||
let range = NanoRange { beg: ts1, end: ts2 };
|
||||
let fut1 = super::eventblobs::EventBlobsComplete::new(
|
||||
@@ -98,7 +98,7 @@ async fn agg_x_dim_1_inner() {
|
||||
name: "wave1".into(),
|
||||
},
|
||||
keyspace: 3,
|
||||
time_bin_size: DAY,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
array: true,
|
||||
shape: Shape::Wave(1024),
|
||||
scalar_type: ScalarType::F64,
|
||||
@@ -110,7 +110,7 @@ async fn agg_x_dim_1_inner() {
|
||||
buffer_size: 17,
|
||||
};
|
||||
let bin_count = 10;
|
||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
|
||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns;
|
||||
let ts2 = ts1 + HOUR * 24;
|
||||
let range = NanoRange { beg: ts1, end: ts2 };
|
||||
let fut1 = super::eventblobs::EventBlobsComplete::new(
|
||||
@@ -160,7 +160,7 @@ async fn merge_0_inner() {
|
||||
name: "wave1".into(),
|
||||
},
|
||||
keyspace: 3,
|
||||
time_bin_size: DAY,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
array: true,
|
||||
shape: Shape::Wave(17),
|
||||
scalar_type: ScalarType::F64,
|
||||
|
||||
13
disk/src/cache/pbv.rs
vendored
13
disk/src/cache/pbv.rs
vendored
@@ -108,20 +108,15 @@ impl PreBinnedValueStream {
|
||||
assert!(g / h > 1);
|
||||
assert!(g / h < 200);
|
||||
assert!(g % h == 0);
|
||||
let bin_size = range.grid_spec.bin_t_len();
|
||||
let channel = self.channel.clone();
|
||||
let agg_kind = self.agg_kind.clone();
|
||||
let node_config = self.node_config.clone();
|
||||
let patch_it = PreBinnedPatchIterator::from_range(range);
|
||||
let s = futures_util::stream::iter(patch_it)
|
||||
.map(move |coord| {
|
||||
PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), &node_config)
|
||||
})
|
||||
.flatten()
|
||||
.map(move |k| {
|
||||
error!("NOTE NOTE NOTE try_setup_fetch_prebinned_higher_res ITEM from sub res bin_size {} {:?}", bin_size, k);
|
||||
k
|
||||
});
|
||||
.map(move |coord| {
|
||||
PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), &node_config)
|
||||
})
|
||||
.flatten();
|
||||
self.fut2 = Some(Box::pin(s));
|
||||
}
|
||||
None => {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use err::Error;
|
||||
use netpod::{Channel, NanoRange, Node};
|
||||
use netpod::timeunits::MS;
|
||||
use netpod::{Channel, NanoRange, Nanos, Node};
|
||||
use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8};
|
||||
use nom::Needed;
|
||||
#[allow(unused_imports)]
|
||||
@@ -64,7 +65,7 @@ pub struct ConfigEntry {
|
||||
pub ts: u64,
|
||||
pub pulse: i64,
|
||||
pub ks: i32,
|
||||
pub bs: i64,
|
||||
pub bs: Nanos,
|
||||
pub split_count: i32,
|
||||
pub status: i32,
|
||||
pub bb: i8,
|
||||
@@ -133,6 +134,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
|
||||
let (inp, pulse) = be_i64(inp)?;
|
||||
let (inp, ks) = be_i32(inp)?;
|
||||
let (inp, bs) = be_i64(inp)?;
|
||||
let bs = Nanos { ns: bs as u64 * MS };
|
||||
let (inp, split_count) = be_i32(inp)?;
|
||||
let (inp, status) = be_i32(inp)?;
|
||||
let (inp, bb) = be_i8(inp)?;
|
||||
|
||||
@@ -3,6 +3,7 @@ use bytes::BytesMut;
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::MS;
|
||||
use netpod::{ChannelConfig, NanoRange, Nanos, Node};
|
||||
use std::mem::size_of;
|
||||
use tokio::fs::{File, OpenOptions};
|
||||
@@ -55,19 +56,22 @@ async fn open_files_inner(
|
||||
}
|
||||
}
|
||||
timebins.sort_unstable();
|
||||
info!("TIMEBINS FOUND: {:?}", timebins);
|
||||
let tgt_tb = (range.beg / MS) as f32 / (channel_config.time_bin_size.ns / MS) as f32;
|
||||
trace!("tgt_tb: {:?}", tgt_tb);
|
||||
trace!("timebins found: {:?}", timebins);
|
||||
for &tb in &timebins {
|
||||
let ts_bin = Nanos {
|
||||
ns: tb * channel_config.time_bin_size,
|
||||
ns: tb * channel_config.time_bin_size.ns,
|
||||
};
|
||||
if ts_bin.ns >= range.end {
|
||||
continue;
|
||||
}
|
||||
if ts_bin.ns + channel_config.time_bin_size <= range.beg {
|
||||
if ts_bin.ns + channel_config.time_bin_size.ns <= range.beg {
|
||||
continue;
|
||||
}
|
||||
|
||||
info!("opening tb {:?}", &tb);
|
||||
let path = paths::datapath(tb, &channel_config, &node);
|
||||
info!("opening path {:?}", &path);
|
||||
let mut file = OpenOptions::new().read(true).open(&path).await?;
|
||||
info!("opened file {:?} {:?}", &path, &file);
|
||||
|
||||
@@ -76,14 +80,21 @@ async fn open_files_inner(
|
||||
match OpenOptions::new().read(true).open(&index_path).await {
|
||||
Ok(mut index_file) => {
|
||||
let meta = index_file.metadata().await?;
|
||||
if meta.len() > 1024 * 1024 * 10 {
|
||||
if meta.len() > 1024 * 1024 * 20 {
|
||||
return Err(Error::with_msg(format!(
|
||||
"too large index file {} bytes for {}",
|
||||
meta.len(),
|
||||
channel_config.channel.name
|
||||
)));
|
||||
}
|
||||
if meta.len() % 16 != 0 {
|
||||
if meta.len() < 2 {
|
||||
return Err(Error::with_msg(format!(
|
||||
"bad meta len {} for {}",
|
||||
meta.len(),
|
||||
channel_config.channel.name
|
||||
)));
|
||||
}
|
||||
if meta.len() % 16 != 2 {
|
||||
return Err(Error::with_msg(format!(
|
||||
"bad meta len {} for {}",
|
||||
meta.len(),
|
||||
@@ -94,7 +105,7 @@ async fn open_files_inner(
|
||||
buf.resize(buf.capacity(), 0);
|
||||
info!("read exact index file {} {}", buf.len(), buf.len() % 16);
|
||||
index_file.read_exact(&mut buf).await?;
|
||||
match find_ge(range.beg, &buf)? {
|
||||
match find_ge(range.beg, &buf[2..])? {
|
||||
Some(o) => {
|
||||
info!("FOUND ts IN INDEX: {:?}", o);
|
||||
file.seek(SeekFrom::Start(o.1)).await?;
|
||||
@@ -109,6 +120,7 @@ async fn open_files_inner(
|
||||
ErrorKind::NotFound => {
|
||||
// TODO Read first 1k, assume that channel header fits.
|
||||
// TODO Seek via binary search. Can not read whole file into memory!
|
||||
error!("TODO Seek directly in scalar file");
|
||||
todo!("Seek directly in scalar file");
|
||||
}
|
||||
_ => Err(e)?,
|
||||
@@ -120,6 +132,7 @@ async fn open_files_inner(
|
||||
|
||||
chtx.send(Ok(file)).await?;
|
||||
}
|
||||
warn!("OPEN FILES LOOP DONE");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -2,8 +2,8 @@ use crate::ChannelConfigExt;
|
||||
use bitshuffle::bitshuffle_compress;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use err::Error;
|
||||
use netpod::ScalarType;
|
||||
use netpod::{timeunits::*, Channel, ChannelConfig, Node, Shape};
|
||||
use netpod::{Nanos, ScalarType};
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs::{File, OpenOptions};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
@@ -25,7 +25,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
name: "wave1".into(),
|
||||
},
|
||||
keyspace: 3,
|
||||
time_bin_size: DAY,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
array: true,
|
||||
scalar_type: ScalarType::F64,
|
||||
shape: Shape::Wave(21),
|
||||
@@ -84,11 +84,11 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) ->
|
||||
.await
|
||||
.map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?;
|
||||
let mut evix = 0;
|
||||
let mut ts = 0;
|
||||
while ts < DAY * 2 {
|
||||
let mut ts = Nanos { ns: 0 };
|
||||
while ts.ns < DAY * 2 {
|
||||
let res = gen_timebin(evix, ts, chn.time_spacing, &channel_path, &chn.config, node, ensemble).await?;
|
||||
evix = res.evix;
|
||||
ts = res.ts;
|
||||
ts.ns = res.ts.ns;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -130,8 +130,8 @@ async fn gen_config(
|
||||
buf.put_i32(0x20202020);
|
||||
buf.put_i64(ts);
|
||||
buf.put_i64(pulse);
|
||||
buf.put_i32(config.keyspace as i32);
|
||||
buf.put_i64(config.time_bin_size as i64);
|
||||
buf.put_u32(config.keyspace as u32);
|
||||
buf.put_u64(config.time_bin_size.ns);
|
||||
buf.put_i32(sc);
|
||||
buf.put_i32(status);
|
||||
buf.put_i8(bb);
|
||||
@@ -212,25 +212,25 @@ impl CountedFile {
|
||||
|
||||
struct GenTimebinRes {
|
||||
evix: u64,
|
||||
ts: u64,
|
||||
ts: Nanos,
|
||||
}
|
||||
|
||||
async fn gen_timebin(
|
||||
evix: u64,
|
||||
ts: u64,
|
||||
ts: Nanos,
|
||||
ts_spacing: u64,
|
||||
channel_path: &Path,
|
||||
config: &ChannelConfig,
|
||||
node: &Node,
|
||||
ensemble: &Ensemble,
|
||||
) -> Result<GenTimebinRes, Error> {
|
||||
let tb = ts / config.time_bin_size;
|
||||
let tb = ts.ns / config.time_bin_size.ns;
|
||||
let path = channel_path
|
||||
.join(format!("{:019}", tb))
|
||||
.join(format!("{:010}", node.split));
|
||||
tokio::fs::create_dir_all(&path).await?;
|
||||
let data_path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size / MS, 0));
|
||||
let index_path = path.join(format!("{:019}_{:05}_Data_Index", config.time_bin_size / MS, 0));
|
||||
let data_path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size.ns / MS, 0));
|
||||
let index_path = path.join(format!("{:019}_{:05}_Data_Index", config.time_bin_size.ns / MS, 0));
|
||||
info!("open data file {:?}", data_path);
|
||||
let file = OpenOptions::new()
|
||||
.write(true)
|
||||
@@ -247,20 +247,24 @@ async fn gen_timebin(
|
||||
.truncate(true)
|
||||
.open(index_path)
|
||||
.await?;
|
||||
Some(CountedFile::new(f))
|
||||
let mut f = CountedFile::new(f);
|
||||
f.write_all(b"\x00\x00").await?;
|
||||
Some(f)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
gen_datafile_header(&mut file, config).await?;
|
||||
let mut evix = evix;
|
||||
let mut ts = ts;
|
||||
let tsmax = (tb + 1) * config.time_bin_size;
|
||||
while ts < tsmax {
|
||||
let tsmax = Nanos {
|
||||
ns: (tb + 1) * config.time_bin_size.ns,
|
||||
};
|
||||
while ts.ns < tsmax.ns {
|
||||
if evix % ensemble.nodes.len() as u64 == node.split as u64 {
|
||||
gen_event(&mut file, index_file.as_mut(), evix, ts, config).await?;
|
||||
}
|
||||
evix += 1;
|
||||
ts += ts_spacing;
|
||||
ts.ns += ts_spacing;
|
||||
}
|
||||
let ret = GenTimebinRes { evix, ts };
|
||||
Ok(ret)
|
||||
@@ -282,13 +286,13 @@ async fn gen_event(
|
||||
file: &mut CountedFile,
|
||||
index_file: Option<&mut CountedFile>,
|
||||
evix: u64,
|
||||
ts: u64,
|
||||
ts: Nanos,
|
||||
config: &ChannelConfig,
|
||||
) -> Result<(), Error> {
|
||||
let mut buf = BytesMut::with_capacity(1024 * 16);
|
||||
buf.put_i32(0xcafecafe as u32 as i32);
|
||||
buf.put_u64(0xcafecafe);
|
||||
buf.put_u64(ts);
|
||||
buf.put_u64(ts.ns);
|
||||
buf.put_u64(2323);
|
||||
buf.put_u64(0xcafecafe);
|
||||
buf.put_u8(0);
|
||||
@@ -341,7 +345,7 @@ async fn gen_event(
|
||||
file.write_all(buf.as_ref()).await?;
|
||||
if let Some(f) = index_file {
|
||||
let mut buf = BytesMut::with_capacity(16);
|
||||
buf.put_u64(ts);
|
||||
buf.put_u64(ts.ns);
|
||||
buf.put_u64(z);
|
||||
f.write_all(&buf).await?;
|
||||
}
|
||||
|
||||
@@ -11,10 +11,7 @@ pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> Pa
|
||||
.join(config.channel.name.clone())
|
||||
.join(format!("{:019}", timebin))
|
||||
.join(format!("{:010}", node.split))
|
||||
.join(format!(
|
||||
"{:019}_00000_Data",
|
||||
config.time_bin_size / netpod::timeunits::MS
|
||||
))
|
||||
.join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS))
|
||||
}
|
||||
|
||||
pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) -> Result<PathBuf, Error> {
|
||||
@@ -28,19 +25,19 @@ pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) ->
|
||||
|
||||
pub fn data_dir_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result<PathBuf, Error> {
|
||||
let ret = channel_timebins_dir_path(channel_config, node)?
|
||||
.join(format!("{:019}", ts.ns / channel_config.time_bin_size))
|
||||
.join(format!("{:019}", ts.ns / channel_config.time_bin_size.ns))
|
||||
.join(format!("{:010}", node.split));
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn data_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result<PathBuf, Error> {
|
||||
let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size / MS, 0);
|
||||
let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size.ns / MS, 0);
|
||||
let ret = data_dir_path(ts, channel_config, node)?.join(fname);
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn index_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result<PathBuf, Error> {
|
||||
let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size / MS, 0);
|
||||
let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size.ns / MS, 0);
|
||||
let ret = data_dir_path(ts, channel_config, node)?.join(fname);
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -166,7 +166,7 @@ async fn raw_conn_handler_inner_try(
|
||||
channel_config: netpod::ChannelConfig {
|
||||
channel: evq.channel.clone(),
|
||||
keyspace: entry.ks as u8,
|
||||
time_bin_size: entry.bs as u64,
|
||||
time_bin_size: entry.bs,
|
||||
shape: shape,
|
||||
scalar_type: ScalarType::from_dtype_index(entry.dtype.to_i16() as u8),
|
||||
big_endian: entry.is_big_endian,
|
||||
|
||||
Reference in New Issue
Block a user