From 34217967e9ef5663cd6e394e03fef1265c8975ea Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 12 May 2021 17:45:12 +0200 Subject: [PATCH] Run first time with scalar, refactor value extraction --- Cargo.toml | 2 +- dbconn/src/lib.rs | 37 +++++++++ disk/src/agg.rs | 121 ++++++++++++++++++--------- disk/src/binnedstream.rs | 3 +- disk/src/cache/pbv.rs | 2 +- disk/src/dataopen.rs | 57 +------------ disk/src/eventchunker.rs | 27 ++++-- disk/src/frame/inmem.rs | 17 ++-- disk/src/index.rs | 172 ++++++++++++++++++++++++++++++++++++++- disk/src/lib.rs | 3 +- disk/src/raw/conn.rs | 13 +-- err/src/lib.rs | 5 +- httpret/src/lib.rs | 31 +++++++ retrieval/src/client.rs | 1 + 14 files changed, 363 insertions(+), 128 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b7fe624..514ac3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,6 @@ members = ["retrieval"] [profile.release] debug = 1 -#opt-level = 1 +opt-level = 1 #overflow-checks = true #debug-assertions = true diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs index dda2fbd..3ee0338 100644 --- a/dbconn/src/lib.rs +++ b/dbconn/src/lib.rs @@ -49,3 +49,40 @@ pub async fn database_size(node_config: &NodeConfigCached) -> Result let size = size as u64; Ok(size) } + +pub struct TableSizes { + pub sizes: Vec<(String, String)>, +} + +pub async fn table_sizes(node_config: &NodeConfigCached) -> Result { + let sql = format!( + "{} {} {} {} {} {} {}", + "SELECT nspname || '.' || relname AS relation, pg_size_pretty(pg_total_relation_size(C.oid)) AS total_size", + "FROM pg_class C", + "LEFT JOIN pg_namespace N ON (N.oid = C.relnamespace)", + "WHERE nspname NOT IN ('pg_catalog', 'information_schema')", + "AND C.relkind <> 'i'", + "AND nspname !~ '^pg_toast'", + "ORDER BY pg_total_relation_size(C.oid) DESC LIMIT 20", + ); + let sql = sql.as_str(); + let cl = create_connection(node_config).await?; + let rows = cl.query(sql, &[]).await?; + let mut sizes = TableSizes { sizes: vec![] }; + for row in rows { + sizes.sizes.push((row.get(0), row.get(1))); + } + sizes.sizes.push((format!("dummy0"), format!("A"))); + sizes.sizes.push((format!("dummy1"), format!("B"))); + Ok(sizes) +} + +pub async fn random_channel(node_config: &NodeConfigCached) -> Result { + let sql = "select name from channels order by rowid limit 1 offset (random() * (select count(rowid) from channels))::bigint"; + let cl = create_connection(node_config).await?; + let rows = cl.query(sql, &[]).await?; + if rows.len() == 0 { + Err(Error::with_msg("can not get random channel"))?; + } + Ok(rows[0].get(0)) +} diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 9a99a3d..b982300 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -6,6 +6,7 @@ use super::eventchunker::EventFull; use crate::agg::binnedt::AggregatableTdim; use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem}; use crate::eventchunker::EventChunkerItem; +use bytes::BytesMut; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -307,6 +308,61 @@ pub struct Dim1F32Stream { completed: bool, } +trait NumEx { + const BY: usize; +} + +struct NumF32; +impl NumEx for NumF32 { + const BY: usize = 4; +} + +macro_rules! make_get_values { + ($n:ident, $TY:ident, $FROM_BYTES:ident, $BY:expr) => { + fn $n(decomp: &BytesMut, ty: &ScalarType) -> Result, Error> { + let n1 = decomp.len(); + if ty.bytes() as usize != $BY { + Err(Error::with_msg(format!( + "ty.bytes() != BY {} vs {}", + ty.bytes(), + $BY + )))?; + } + if n1 % ty.bytes() as usize != 0 { + Err(Error::with_msg(format!( + "n1 % ty.bytes() as usize != 0 {} vs {}", + n1, + ty.bytes() + )))?; + } + let ele_count = n1 / ty.bytes() as usize; + let mut j = Vec::with_capacity(ele_count); + let mut p2 = j.as_mut_ptr(); + let mut p1 = 0; + for _ in 0..ele_count { + unsafe { + let mut r = [0u8; $BY]; + std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), $BY); + *p2 = $TY::$FROM_BYTES(r) as f32; + p1 += $BY; + p2 = p2.add(1); + }; + } + unsafe { + j.set_len(ele_count); + } + Ok(j) + } + }; +} + +make_get_values!(get_values_u16_le, u16, from_le_bytes, 2); +make_get_values!(get_values_f32_le, f32, from_le_bytes, 4); +make_get_values!(get_values_f64_le, f64, from_le_bytes, 8); +make_get_values!(get_values_u16_be, u16, from_be_bytes, 2); +make_get_values!(get_values_f32_be, f32, from_be_bytes, 4); +make_get_values!(get_values_f64_be, f64, from_be_bytes, 8); + impl Dim1F32Stream { pub fn new(inp: S) -> Self { Self { @@ -322,51 +378,38 @@ impl Dim1F32Stream { for i1 in 0..k.tss.len() { // TODO iterate sibling arrays after single bounds check let ty = &k.scalar_types[i1]; + let be = k.be[i1]; let decomp = k.decomps[i1].as_ref().unwrap(); match ty { - U16 => { - const BY: usize = 2; - // do the conversion - let n1 = decomp.len(); - assert!(n1 % ty.bytes() as usize == 0); - let ele_count = n1 / ty.bytes() as usize; - let mut j = Vec::with_capacity(ele_count); - let mut p1 = 0; - for _ in 0..ele_count { - let u = unsafe { - let mut r = [0u8; BY]; - std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY); - u16::from_be_bytes(r) - }; - j.push(u as f32); - p1 += BY; - } + U16 if be => { + let value = get_values_u16_be(decomp, ty)?; ret.tss.push(k.tss[i1]); - ret.values.push(j); + ret.values.push(value); + } + U16 => { + let value = get_values_u16_le(decomp, ty)?; + ret.tss.push(k.tss[i1]); + ret.values.push(value); + } + F32 if be => { + let value = get_values_f32_be(decomp, ty)?; + ret.tss.push(k.tss[i1]); + ret.values.push(value); + } + F32 => { + let value = get_values_f32_le(decomp, ty)?; + ret.tss.push(k.tss[i1]); + ret.values.push(value); + } + F64 if be => { + let value = get_values_f64_be(decomp, ty)?; + ret.tss.push(k.tss[i1]); + ret.values.push(value); } F64 => { - const BY: usize = 8; - // do the conversion - let n1 = decomp.len(); - assert!(n1 % ty.bytes() as usize == 0); - let ele_count = n1 / ty.bytes() as usize; - let mut j = Vec::with_capacity(ele_count); - unsafe { - j.set_len(ele_count); - } - let mut p1 = 0; - for i1 in 0..ele_count { - let u = unsafe { - let mut r = [0u8; BY]; - std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY); - f64::from_be_bytes(r) - //f64::from_be_bytes(std::mem::transmute::<_, [u8; 8]>(&decomp[p1])) - }; - j[i1] = u as f32; - p1 += BY; - } + let value = get_values_f64_le(decomp, ty)?; ret.tss.push(k.tss[i1]); - ret.values.push(j); + ret.values.push(value); } _ => { let e = Error::with_msg(format!("Dim1F32Stream unhandled scalar type: {:?}", ty)); diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 41ac5f3..f2ec23c 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -48,7 +48,7 @@ impl BinnedStreamFromPreBinnedPatches { match PreBinnedValueFetchedStream::new(&query, &node_config) { Ok(k) => Box::pin(k), Err(e) => { - error!("see error {:?}", e); + error!("error from PreBinnedValueFetchedStream::new {:?}", e); Box::pin(futures_util::stream::iter(vec![Err(e)])) } }; @@ -73,7 +73,6 @@ impl BinnedStreamFromPreBinnedPatches { } Ok(PreBinnedItem::RangeComplete) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)), Ok(PreBinnedItem::EventDataReadStats(stats)) => { - //info!("BinnedStream ''''''''''''''''''' observes stats {:?}", stats); Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats))) } Ok(PreBinnedItem::Log(item)) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Log(item))), diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 19a74f5..8e5c68b 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -254,7 +254,7 @@ impl Stream for PreBinnedValueStream { match self.query.cache_usage { super::CacheUsage::Use | super::CacheUsage::Recreate => { let msg = format!( - "Write cache file query: {:?} bin count: {}", + "write cache file query: {:?} bin count: {}", self.query.patch, self.values.ts1s.len() ); diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index d89f19b..cca2639 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -5,7 +5,6 @@ use futures_util::StreamExt; use netpod::log::*; use netpod::timeunits::MS; use netpod::{ChannelConfig, NanoRange, Nanos, Node}; -use std::mem::size_of; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom}; @@ -38,7 +37,6 @@ async fn open_files_inner( node: Node, ) -> Result<(), Error> { let channel_config = channel_config.clone(); - // TODO reduce usage of `query` and see what we actually need. let mut timebins = vec![]; { let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?; @@ -74,7 +72,6 @@ async fn open_files_inner( debug!("opening path {:?}", &path); let mut file = OpenOptions::new().read(true).open(&path).await?; debug!("opened file {:?} {:?}", &path, &file); - { let index_path = paths::index_path(ts_bin, &channel_config, &node)?; match OpenOptions::new().read(true).open(&index_path).await { @@ -105,7 +102,7 @@ async fn open_files_inner( buf.resize(buf.capacity(), 0); debug!("read exact index file {} {}", buf.len(), buf.len() % 16); index_file.read_exact(&mut buf).await?; - match find_ge(range.beg, &buf[2..])? { + match super::index::find_ge(range.beg, &buf[2..])? { Some(o) => { debug!("FOUND ts IN INDEX: {:?}", o); file.seek(SeekFrom::Start(o.1)).await?; @@ -118,65 +115,15 @@ async fn open_files_inner( } Err(e) => match e.kind() { ErrorKind::NotFound => { - // TODO Read first 1k, assume that channel header fits. - // TODO Seek via binary search. Can not read whole file into memory! - error!("TODO Seek directly in scalar file"); - todo!("Seek directly in scalar file"); + file = super::index::position_file(file, range.beg).await?; } _ => Err(e)?, }, } } - - // TODO Since I want to seek into the data file, the consumer of this channel must not expect a file channel name header. - chtx.send(Ok(file)).await?; } // TODO keep track of number of running debug!("open_files_inner done"); Ok(()) } - -fn find_ge(h: u64, buf: &[u8]) -> Result, Error> { - trace!("find_ge {}", h); - const N: usize = 2 * size_of::(); - let n1 = buf.len(); - if n1 % N != 0 { - return Err(Error::with_msg(format!("find_ge bad len {}", n1))); - } - if n1 == 0 { - warn!("Empty index data"); - return Ok(None); - } - let n1 = n1 / N; - let a = unsafe { - let ptr = &buf[0] as *const u8 as *const ([u8; 8], [u8; 8]); - std::slice::from_raw_parts(ptr, n1) - }; - let mut j = 0; - let mut k = n1 - 1; - let x = u64::from_be_bytes(a[j].0); - let y = u64::from_be_bytes(a[k].0); - trace!("first/last ts: {} {}", x, y); - if x >= h { - return Ok(Some((u64::from_be_bytes(a[j].0), u64::from_be_bytes(a[j].1)))); - } - if y < h { - return Ok(None); - } - loop { - if k - j < 2 { - let ret = (u64::from_be_bytes(a[k].0), u64::from_be_bytes(a[k].1)); - trace!("FOUND {:?}", ret); - return Ok(Some(ret)); - } - let m = (k + j) / 2; - let x = u64::from_be_bytes(a[m].0); - trace!("CHECK NEW M: {}", x); - if x < h { - j = m; - } else { - k = m; - } - } -} diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 3c21e47..a0ad273 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -160,7 +160,7 @@ impl EventChunker { use super::dtflags::*; let is_compressed = type_flags & COMPRESSION != 0; let is_array = type_flags & ARRAY != 0; - let _is_big_endian = type_flags & BIG_ENDIAN != 0; + let is_big_endian = type_flags & BIG_ENDIAN != 0; let is_shaped = type_flags & SHAPE != 0; if let Shape::Wave(_) = self.channel_config.shape { assert!(is_array); @@ -228,6 +228,7 @@ impl EventChunker { pulse, Some(decomp), ScalarType::from_dtype_index(type_index)?, + is_big_endian, ); } } @@ -236,9 +237,22 @@ impl EventChunker { } }; } else { - Err(Error::with_msg(format!( - "TODO uncompressed event parsing not yet implemented" - )))?; + let p1 = sl.position(); + //info!("len: {} p1: {}", len, p1); + if len < p1 as u32 + 4 { + let msg = format!("uncomp len: {} p1: {}", len, p1); + Err(Error::with_msg(msg))?; + } + let vlen = len - p1 as u32 - 4; + //info!("vlen: {}", vlen); + let decomp = BytesMut::from(&buf[p1 as usize..(p1 as u32 + vlen) as usize]); + ret.add_event( + ts, + pulse, + Some(decomp), + ScalarType::from_dtype_index(type_index)?, + is_big_endian, + ); } buf.advance(len as usize); parsed_bytes += len as u64; @@ -259,6 +273,7 @@ pub struct EventFull { pub pulses: Vec, pub decomps: Vec>, pub scalar_types: Vec, + pub be: Vec, } impl EventFull { @@ -268,14 +283,16 @@ impl EventFull { pulses: vec![], decomps: vec![], scalar_types: vec![], + be: vec![], } } - fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option, scalar_type: ScalarType) { + fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option, scalar_type: ScalarType, be: bool) { self.tss.push(ts); self.pulses.push(pulse); self.decomps.push(decomp); self.scalar_types.push(scalar_type); + self.be.push(be); } } diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index 61f654f..d300344 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -109,15 +109,16 @@ where let tyid = u32::from_le_bytes(*arrayref::array_ref![buf, 8, 4]); let len = u32::from_le_bytes(*arrayref::array_ref![buf, 12, 4]); if magic != INMEM_FRAME_MAGIC { - error!("InMemoryFrameAsyncReadStream tryparse incorrect magic: {}", magic); - return ( - Some(Some(Err(Error::with_msg(format!( - "InMemoryFrameAsyncReadStream tryparse incorrect magic: {}", - magic - ))))), - buf, - wp, + let z = nb.min(32); + let u = String::from_utf8_lossy(&buf[0..z]); + let e = Error::with_msg("INCORRECT MAGIC"); + error!("incorrect magic buf as utf8: {:?} error: {:?}", u, e); + let msg = format!( + "InMemoryFrameAsyncReadStream tryparse incorrect magic: {} buf as utf8: {:?}", + magic, u ); + error!("{}", msg); + return (Some(Some(Err(Error::with_msg(format!("{}", msg))))), buf, wp); } if len == 0 { if nb != INMEM_FRAME_HEAD + INMEM_FRAME_FOOT { diff --git a/disk/src/index.rs b/disk/src/index.rs index e7e3076..0a27c25 100644 --- a/disk/src/index.rs +++ b/disk/src/index.rs @@ -1,8 +1,10 @@ +use arrayref::array_ref; use err::Error; use netpod::log::*; use netpod::{ChannelConfig, Nanos, Node}; -use tokio::fs::OpenOptions; -use tokio::io::ErrorKind; +use std::mem::size_of; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom}; pub async fn find_start_pos_for_config( ts: Nanos, @@ -24,3 +26,169 @@ pub async fn find_start_pos_for_config( }; Ok(ret) } + +pub fn find_ge(h: u64, buf: &[u8]) -> Result, Error> { + const N: usize = 2 * size_of::(); + let n1 = buf.len(); + if n1 % N != 0 { + return Err(Error::with_msg(format!("find_ge bad len {}", n1))); + } + if n1 == 0 { + warn!("Empty index data"); + return Ok(None); + } + let n1 = n1 / N; + let a = unsafe { + let ptr = &buf[0] as *const u8 as *const ([u8; 8], [u8; 8]); + std::slice::from_raw_parts(ptr, n1) + }; + let mut j = 0; + let mut k = n1 - 1; + let x = u64::from_be_bytes(a[j].0); + let y = u64::from_be_bytes(a[k].0); + if x >= h { + return Ok(Some((u64::from_be_bytes(a[j].0), u64::from_be_bytes(a[j].1)))); + } + if y < h { + return Ok(None); + } + loop { + if k - j < 2 { + let ret = (u64::from_be_bytes(a[k].0), u64::from_be_bytes(a[k].1)); + return Ok(Some(ret)); + } + let m = (k + j) / 2; + let x = u64::from_be_bytes(a[m].0); + if x < h { + j = m; + } else { + k = m; + } + } +} + +async fn read(buf: &mut [u8], file: &mut File) -> Result { + let mut wp = 0; + loop { + let n1 = file.read(&mut buf[wp..]).await?; + if n1 == 0 { + break; + } else { + wp += n1; + } + if wp >= buf.len() { + break; + } + } + Ok(wp) +} + +pub fn parse_channel_header(buf: &[u8]) -> Result<(u32,), Error> { + if buf.len() < 6 { + return Err(Error::with_msg(format!("parse_channel_header buf len: {}", buf.len()))); + } + let ver = i16::from_be_bytes(*array_ref![buf, 0, 2]); + if ver != 0 { + return Err(Error::with_msg(format!("unknown file version: {}", ver))); + } + let len1 = u32::from_be_bytes(*array_ref![buf, 2, 4]); + if len1 < 9 || len1 > 256 { + return Err(Error::with_msg(format!("unexpected data file header len1: {}", len1))); + } + if buf.len() < 2 + len1 as usize { + return Err(Error::with_msg(format!( + "data file header not contained in buffer len1: {} vs {}", + len1, + buf.len() + ))); + } + let len2 = u32::from_be_bytes(*array_ref![buf, 2 + len1 as usize - 4, 4]); + if len1 != len2 { + return Err(Error::with_msg(format!("len mismatch len1: {} len2: {}", len1, len2))); + } + Ok((len1 as u32,)) +} + +pub fn parse_event(buf: &[u8]) -> Result<(u32, Nanos), Error> { + if buf.len() < 4 { + return Err(Error::with_msg(format!("parse_event buf len: {}", buf.len()))); + } + let len1 = u32::from_be_bytes(*array_ref![buf, 0, 4]); + if len1 < 9 || len1 > 512 { + return Err(Error::with_msg(format!("unexpected event len1: {}", len1))); + } + if buf.len() < len1 as usize { + return Err(Error::with_msg(format!( + "event not contained in buffer len1: {} vs {}", + len1, + buf.len() + ))); + } + let len2 = u32::from_be_bytes(*array_ref![buf, len1 as usize - 4, 4]); + if len1 != len2 { + return Err(Error::with_msg(format!("len mismatch len1: {} len2: {}", len1, len2))); + } + let ts = u64::from_be_bytes(*array_ref![buf, 12, 8]); + Ok((len1 as u32, Nanos { ns: ts })) +} + +pub async fn read_event_at(pos: u64, file: &mut File) -> Result<(u32, Nanos), Error> { + file.seek(SeekFrom::Start(pos)).await?; + let mut buf = vec![0; 1024]; + let _n1 = read(&mut buf, file).await?; + let ev = parse_event(&buf)?; + Ok(ev) +} + +pub async fn position_file(mut file: File, beg: u64) -> Result { + // Read first chunk which should include channel name packet, and a first event. + // It can be that file is empty. + // It can be that there is a a channel header but zero events. + let flen = file.seek(SeekFrom::End(0)).await?; + file.seek(SeekFrom::Start(0)).await?; + let mut buf = vec![0; 1024]; + let n1 = read(&mut buf, &mut file).await?; + if n1 < buf.len() { + // file has less content than our buffer + } else { + // + } + let hres = parse_channel_header(&buf)?; + info!("hres: {:?}", hres); + let headoff = 2 + hres.0 as u64; + let ev = parse_event(&buf[headoff as usize..])?; + info!("ev: {:?}", ev); + let evlen = ev.0 as u64; + info!("flen: {} flen - headoff mod evlen: {}", flen, (flen - headoff) % evlen); + let mut j = headoff; + let mut k = ((flen - headoff) / evlen - 1) * evlen + headoff; + info!("j {} k {}", j, k); + let x = ev.1.ns; + let y = read_event_at(k, &mut file).await?.1.ns; + info!("x {} y {}", x, y); + if x >= beg { + info!("found A"); + file.seek(SeekFrom::Start(j)).await?; + return Ok(file); + } + if y < beg { + info!("found B"); + file.seek(SeekFrom::Start(j)).await?; + return Ok(file); + } + loop { + if k - j < 2 * evlen { + info!("found C"); + file.seek(SeekFrom::Start(k)).await?; + return Ok(file); + } + let m = j + (k - j) / 2 / evlen * evlen; + let x = read_event_at(m, &mut file).await?.1.ns; + info!("event at m: {} ts: {}", m, x); + if x < beg { + j = m; + } else { + k = m; + } + } +} diff --git a/disk/src/lib.rs b/disk/src/lib.rs index dc01b52..131c5aa 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -6,6 +6,7 @@ use err::Error; use futures_core::Stream; use futures_util::future::FusedFuture; use futures_util::{pin_mut, select, FutureExt, StreamExt}; +use netpod::log::*; use netpod::{ChannelConfig, NanoRange, Node, Shape}; use std::future::Future; use std::path::PathBuf; @@ -14,8 +15,6 @@ use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use tokio::fs::{File, OpenOptions}; use tokio::io::AsyncRead; -#[allow(unused_imports)] -use tracing::{debug, error, info, span, trace, warn, Level}; pub mod agg; #[cfg(test)] diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 65bdf14..0eafcee 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -173,18 +173,7 @@ async fn raw_conn_handler_inner_try( event_chunker_conf, ) .into_dim_1_f32_stream() - .into_binned_x_bins_1() - .map(|k| { - if false { - match &k { - Ok(MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats)) => { - info!("raw::conn ✑ ✑ ✑ ✑ ✑ ✑ seeing stats: {:?}", stats); - } - _ => {} - } - } - k - }); + .into_binned_x_bins_1(); let mut e = 0; while let Some(item) = s1.next().await { match &item { diff --git a/err/src/lib.rs b/err/src/lib.rs index 3e786fc..9269542 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -48,7 +48,10 @@ fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { for sy in fr.symbols() { let is_ours = match sy.filename() { None => false, - Some(s) => s.to_str().unwrap().contains("dev/daqbuffer"), + Some(s) => { + let s = s.to_str().unwrap(); + s.contains("dev/daqbuffer/") || s.contains("/retrsbld/") + } }; let name = match sy.name() { Some(k) => k.to_string(), diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 33ad4af..57b9064 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -92,6 +92,18 @@ async fn data_api_proxy_try(req: Request, node_config: &NodeConfigCached) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path == "/api/1/table_sizes" { + if req.method() == Method::GET { + Ok(table_sizes(req, &node_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } else if path == "/api/1/random_channel" { + if req.method() == Method::GET { + Ok(random_channel(req, &node_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else if path == "/api/1/parsed_raw" { if req.method() == Method::POST { Ok(parsed_raw(req, &node_config.node).await?) @@ -278,3 +290,22 @@ async fn node_status(req: Request, node_config: &NodeConfigCached) -> Resu let ret = response(StatusCode::OK).body(Body::from(ret))?; Ok(ret) } + +async fn table_sizes(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let (_head, _body) = req.into_parts(); + let sizes = dbconn::table_sizes(node_config).await?; + let mut ret = String::new(); + for size in sizes.sizes { + use std::fmt::Write; + write!(ret, "{:60} {:20}\n", size.0, size.1)?; + } + let ret = response(StatusCode::OK).body(Body::from(ret))?; + Ok(ret) +} + +pub async fn random_channel(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let (_head, _body) = req.into_parts(); + let ret = dbconn::random_channel(node_config).await?; + let ret = response(StatusCode::OK).body(Body::from(ret))?; + Ok(ret) +} diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index 1e0ac8c..eb0a5c4 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -62,6 +62,7 @@ pub async fn get_binned( let req = hyper::Request::builder() .method(http::Method::GET) .uri(uri) + .header("aCCepT", "application/octet-stream") .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?;