diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml index f03bf42..d7def8d 100644 --- a/dbconn/Cargo.toml +++ b/dbconn/Cargo.toml @@ -20,6 +20,8 @@ bincode = "1.3.3" #dashmap = "3" tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } chrono = "0.4" +regex = "1.5.4" err = { path = "../err" } netpod = { path = "../netpod" } +parse = { path = "../parse" } taskrun = { path = "../taskrun" } diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs index 4d23ea9..4f2b26d 100644 --- a/dbconn/src/lib.rs +++ b/dbconn/src/lib.rs @@ -1,10 +1,24 @@ use err::Error; use netpod::log::*; use netpod::{Channel, NodeConfigCached}; +use std::time::Duration; use tokio_postgres::{Client, NoTls}; +pub mod scan; pub mod search; +pub async fn delay_us(mu: u64) { + tokio::time::sleep(Duration::from_micros(mu)).await; +} + +pub async fn delay_io_short() { + delay_us(1000).await; +} + +pub async fn delay_io_medium() { + delay_us(2000).await; +} + pub async fn create_connection(node_config: &NodeConfigCached) -> Result { let d = &node_config.node_config.cluster.database; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name); diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs new file mode 100644 index 0000000..cd03e28 --- /dev/null +++ b/dbconn/src/scan.rs @@ -0,0 +1,462 @@ +use chrono::{DateTime, Utc}; +use err::Error; +use netpod::log::*; +use netpod::NodeConfigCached; +use serde::{Deserialize, Serialize}; +use std::cell::RefCell; +use std::future::Future; +use std::io::ErrorKind; +use std::os::unix::ffi::OsStringExt; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::rc::Rc; +use tokio_postgres::Client; + +#[derive(Debug, Serialize, Deserialize)] +pub struct NodeDiskIdent { + pub rowid: i64, + pub facility: i64, + pub split: i32, + pub hostname: String, +} + +impl NodeDiskIdent { + pub fn rowid(&self) -> i64 { + self.rowid + } + pub fn facility(&self) -> i64 { + self.facility + } +} + +fn _get_hostname() -> Result { + let out = std::process::Command::new("hostname").output()?; + Ok(String::from_utf8(out.stdout[..out.stdout.len() - 1].to_vec())?) +} + +pub async fn get_node_disk_ident(node_config: &NodeConfigCached) -> Result { + let con1 = crate::create_connection(node_config).await?; + let rows = con1.query("select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2", &[&node_config.node.backend, &node_config.node.host]).await?; + if rows.len() != 1 { + return Err(Error::with_msg(format!( + "get_node can't find unique entry for {} {}", + node_config.node.host, node_config.node.backend + ))); + } + let row = &rows[0]; + Ok(NodeDiskIdent { + rowid: row.get(0), + facility: row.get(1), + split: row.get(2), + hostname: row.get(3), + }) +} + +async fn find_channel_names_from_config(base_dir: impl AsRef, mut cb: F) -> Result<(), Error> +where + F: FnMut(&str) -> Fut, + Fut: Future>, +{ + let mut path2: PathBuf = base_dir.as_ref().into(); + path2.push("config"); + let mut rd = tokio::fs::read_dir(&path2).await?; + while let Ok(Some(entry)) = rd.next_entry().await { + let fname = String::from_utf8(entry.file_name().into_vec())?; + cb(&fname).await?; + } + Ok(()) +} + +pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Result<(), Error> { + let node_ident = get_node_disk_ident(node_config).await?; + let c1 = Rc::new(RefCell::new(0u32)); + let dbc = crate::create_connection(node_config).await?; + let rows = dbc + .query("select rowid from facilities where name = $1", &[&node_ident.facility]) + .await?; + if rows.len() != 1 { + return Err(Error::with_msg(format!( + "can not find facility {}", + node_ident.facility + ))); + } + let facility: i64 = rows[0].try_get(0)?; + dbc.query("begin", &[]).await?; + let dbc = Rc::new(dbc); + find_channel_names_from_config(&node_config.node.data_base_path, |ch| { + let ch = ch.to_owned(); + let dbc = dbc.clone(); + let c1 = c1.clone(); + async move { + crate::delay_io_short().await; + dbc.query( + "insert into channels (facility, name) values ($1, $2) on conflict do nothing", + &[&facility, &ch], + ) + .await?; + *c1.borrow_mut() += 1; + let c2 = *c1.borrow(); + if c2 % 200 == 0 { + trace!("channels {:6} current {}", c2, ch); + dbc.query("commit", &[]).await?; + crate::delay_io_medium().await; + dbc.query("begin", &[]).await?; + } + Ok(()) + } + }) + .await?; + dbc.query("commit", &[]).await?; + Ok(()) +} + +pub async fn update_db_with_all_channel_configs(node_config: &NodeConfigCached, ks_prefix: &str) -> Result<(), Error> { + let dbc = crate::create_connection(node_config).await?; + let dbc = Rc::new(dbc); + let node_disk_ident = &get_node_disk_ident(node_config).await?; + let rows = dbc + .query( + "select rowid, facility, name from channels where facility = $1 order by facility, name", + &[&node_config.node.backend], + ) + .await?; + let mut c1 = 0; + dbc.query("begin", &[]).await?; + let mut count_inserted = 0; + let mut count_updated = 0; + for row in rows { + let rowid: i64 = row.try_get(0)?; + let _facility: i64 = row.try_get(1)?; + let channel: String = row.try_get(2)?; + match update_db_with_channel_config( + node_config, + node_disk_ident, + ks_prefix, + rowid, + &channel, + dbc.clone(), + &mut count_inserted, + &mut count_updated, + ) + .await + { + /*Err(Error::ChannelConfigdirNotFound { .. }) => { + warn!("can not find channel config {}", channel); + crate::delay_io_medium().await; + }*/ + Err(e) => { + error!("{:?}", e); + crate::delay_io_medium().await; + } + _ => { + c1 += 1; + if c1 % 200 == 0 { + trace!( + "channel no {:6} inserted {:6} updated {:6}", + c1, + count_inserted, + count_updated + ); + dbc.query("commit", &[]).await?; + dbc.query("begin", &[]).await?; + } + crate::delay_io_short().await; + } + } + } + dbc.query("commit", &[]).await?; + Ok(()) +} + +pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result<(), Error> { + let dbc = crate::create_connection(node_config).await?; + dbc.query("select update_cache()", &[]).await?; + Ok(()) +} + +/** +Parse the config of the given channel and update database. +*/ +pub async fn update_db_with_channel_config( + node_config: &NodeConfigCached, + node_disk_ident: &NodeDiskIdent, + _ks_prefix: &str, + channel_id: i64, + channel: &str, + dbc: Rc, + count_inserted: &mut usize, + count_updated: &mut usize, +) -> Result<(), Error> { + let path = node_config + .node + .data_base_path + .join("config") + .join(channel) + .join("latest") + .join("00000_Config"); + let meta = tokio::fs::metadata(&path).await?; + if meta.len() > 8 * 1024 * 1024 { + return Err(Error::with_msg("meta data too long")); + } + let rows = dbc + .query( + "select rowid, fileSize, parsedUntil, channel from configs where node = $1 and channel = $2", + &[&node_disk_ident.rowid(), &channel_id], + ) + .await?; + if rows.len() > 1 { + return Err(Error::with_msg("more than one row")); + } + let (config_id, do_parse) = if rows.len() > 0 { + let row = &rows[0]; + let rowid: i64 = row.get(0); + let file_size: u32 = row.get::<_, i64>(1) as u32; + let parsed_until: u32 = row.get::<_, i64>(2) as u32; + let _channel_id = row.get::<_, i64>(2) as i64; + if meta.len() < file_size as u64 || meta.len() < parsed_until as u64 { + dbc.query( + "insert into configs_history (rowid_original, node, channel, fileSize, parsedUntil, config, tsinsert) select rowid as rowid_original, node, channel, fileSize, parsedUntil, config, now() from configs where rowid = $1", + &[&rowid], + ).await?; + } + //ensure!(meta.len() >= parsed_until as u64, ConfigFileOnDiskShrunk{path}); + (Some(rowid), true) + } else { + (None, true) + }; + if do_parse { + let buf = tokio::fs::read(&path).await?; + let config = parse::channelconfig::parse_config(&buf)?; + match config_id { + None => { + dbc.query( + "insert into configs (node, channel, fileSize, parsedUntil, config) values ($1, $2, $3, $4, $5)", + &[ + &node_disk_ident.rowid(), + &channel_id, + &(meta.len() as i64), + &(buf.len() as i64), + &serde_json::to_value(config)?, + ], + ) + .await?; + *count_inserted += 1; + } + Some(_config_id_2) => { + dbc.query( + "insert into configs (node, channel, fileSize, parsedUntil, config) values ($1, $2, $3, $4, $5) on conflict (node, channel) do update set fileSize = $3, parsedUntil = $4, config = $5", + &[&node_disk_ident.rowid(), &channel_id, &(meta.len() as i64), &(buf.len() as i64), &serde_json::to_value(config)?], + ).await?; + *count_updated += 1; + } + } + } + Ok(()) +} + +pub async fn update_db_with_all_channel_datafiles( + node_config: &NodeConfigCached, + node_disk_ident: &NodeDiskIdent, + ks_prefix: &str, +) -> Result<(), Error> { + let dbc = Rc::new(crate::create_connection(node_config).await?); + let rows = dbc + .query( + "select rowid, facility, name from channels where facility = $1 order by facility, name", + &[&node_disk_ident.facility()], + ) + .await?; + let mut c1 = 0; + dbc.query("begin", &[]).await?; + for row in rows { + let rowid: i64 = row.try_get(0)?; + let _facility: i64 = row.try_get(1)?; + let channel: String = row.try_get(2)?; + update_db_with_channel_datafiles(node_config, node_disk_ident, ks_prefix, rowid, &channel, dbc.clone()).await?; + c1 += 1; + if c1 % 40 == 0 { + trace!("import datafiles {} {}", c1, channel); + dbc.query("commit", &[]).await?; + dbc.query("begin", &[]).await?; + } + if false && c1 >= 30 { + break; + } + } + dbc.query("commit", &[]).await?; + Ok(()) +} + +struct DatafileDbWriter { + channel_id: i64, + node_id: i64, + dbc: Rc, + c1: Rc>, +} +#[derive(Debug, Serialize)] +pub struct ChannelDesc { + name: String, +} + +#[derive(Debug, Serialize)] +pub struct ChannelDatafileDesc { + channel: ChannelDesc, + ks: u32, + tb: u32, + sp: u32, + bs: u32, + fs: u64, + mt: DateTime, + ix_fs: Option, + ix_mt: Option>, +} + +impl ChannelDatafileDesc { + pub fn timebin(&self) -> u32 { + self.tb + } + pub fn binsize(&self) -> u32 { + self.bs + } + pub fn keyspace(&self) -> u32 { + self.ks + } + pub fn split(&self) -> u32 { + self.sp + } +} + +pub trait ChannelDatafileDescSink { + fn sink(&self, k: ChannelDatafileDesc) -> Pin>>>; +} + +impl ChannelDatafileDescSink for DatafileDbWriter { + fn sink(&self, k: ChannelDatafileDesc) -> Pin>>> { + let dbc = self.dbc.clone(); + let c1 = self.c1.clone(); + let channel_id = self.channel_id; + let node_id = self.node_id; + Box::pin(async move { + dbc.query( + "insert into datafiles (node, channel, tsbeg, tsend, props) values ($1, $2, $3, $4, $5) on conflict do nothing", + &[ + &node_id, + &channel_id, + &(k.timebin() as i64 * k.binsize() as i64), + &((k.timebin() + 1) as i64 * k.binsize() as i64), + &serde_json::to_value(k)?, + ] + ).await?; + *c1.try_borrow_mut().unwrap() += 1; + Ok(()) + }) + } +} + +pub async fn find_channel_datafiles_in_ks( + base_dir: impl AsRef, + ks_prefix: &str, + ks: u32, + channel: &str, + cb: &dyn ChannelDatafileDescSink, +) -> Result, Error> { + let data_dir_path: PathBuf = base_dir + .as_ref() + .join(format!("{}_{}", ks_prefix, ks)) + .join("byTime") + .join(channel); + let re1 = regex::Regex::new(r"^\d{19}$")?; + let re2 = regex::Regex::new(r"^\d{10}$")?; + let re4 = regex::Regex::new(r"^(\d{19})_0{5}_Data$")?; + let mut rd = match tokio::fs::read_dir(&data_dir_path).await { + Ok(k) => k, + Err(e) => match e.kind() { + ErrorKind::NotFound => return Ok(None), + _ => Err(e)?, + }, + }; + while let Ok(Some(entry)) = rd.next_entry().await { + let fname = String::from_utf8(entry.file_name().into_vec())?; + if !re1.is_match(&fname) { + warn!("unexpected file {}", fname); + continue; + } + let timebin: u32 = fname.parse()?; + let path = data_dir_path.join(fname); + let mut rd = tokio::fs::read_dir(&path).await?; + while let Ok(Some(entry)) = rd.next_entry().await { + let fname = String::from_utf8(entry.file_name().into_vec())?; + if !re2.is_match(&fname) { + warn!("unexpected file {}", fname); + continue; + } + let split: u32 = fname.parse()?; + let path = path.join(fname); + let mut rd = tokio::fs::read_dir(&path).await?; + while let Ok(Some(entry)) = rd.next_entry().await { + let fname = String::from_utf8(entry.file_name().into_vec())?; + if let Some(m) = re4.captures(&fname) { + let binsize: u32 = m.get(1).unwrap().as_str().parse()?; + let data_path = path.join(&fname); + let meta = tokio::fs::metadata(&data_path).await?; + let index_path = path.join(format!("{}_Index", fname)); + let (ix_size, ix_tmod) = if let Ok(meta) = tokio::fs::metadata(&index_path).await { + (Some(meta.len()), Some(meta.modified().unwrap().into())) + } else { + (None, None) + }; + cb.sink(ChannelDatafileDesc { + channel: ChannelDesc { name: channel.into() }, + ks: ks, + tb: timebin, + sp: split, + bs: binsize, + fs: meta.len(), + ix_fs: ix_size, + ix_mt: ix_tmod, + mt: meta.modified().unwrap().into(), + }) + .await?; + } + } + } + } + Ok(Some(())) +} + +pub async fn update_db_with_channel_datafiles( + node_config: &NodeConfigCached, + node_disk_ident: &NodeDiskIdent, + ks_prefix: &str, + channel_id: i64, + channel: &str, + dbc: Rc, +) -> Result<(), Error> { + let writer = DatafileDbWriter { + node_id: node_disk_ident.rowid(), + channel_id: channel_id, + dbc: dbc.clone(), + c1: Rc::new(RefCell::new(0)), + }; + let mut n_nothing = 0; + for ks in &[2, 3, 4] { + match find_channel_datafiles_in_ks(&node_config.node.data_base_path, ks_prefix, *ks, channel, &writer).await { + /*Err(Error::ChannelDatadirNotFound { .. }) => { + n_nothing += 1; + }*/ + Ok(None) => { + n_nothing += 1; + } + x => { + x?; + } + }; + if false && *writer.c1.borrow() >= 10 { + break; + } + } + if n_nothing >= 3 { + //warn!("No datafile directories in any keyspace writer got {:5} n_nothing {} channel {}", writer.c1.borrow(), n_nothing, channel); + } + Ok(()) +} diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index 025f22b..3bd2cce 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -8,7 +8,7 @@ pub async fn search_channel( ) -> Result { let sql = format!(concat!( "select ", - "channel_id, channel_name, source_name, dtype, shape, unit, description", + "channel_id, channel_name, source_name, dtype, shape, unit, description, channel_backend", " from searchext($1, $2, $3, $4)", )); let cl = create_connection(node_config).await?; @@ -44,6 +44,7 @@ pub async fn search_channel( None => vec![], }; let k = ChannelSearchSingleResult { + backend: row.get(7), name: row.get(1), source: row.get(2), ty: row.get(3), diff --git a/disk/Cargo.toml b/disk/Cargo.toml index bf97a01..1dbfc00 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -36,3 +36,4 @@ taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } bitshuffle = { path = "../bitshuffle" } dbconn = { path = "../dbconn" } +parse = { path = "../parse" } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 78b41c1..85ce579 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -8,7 +8,6 @@ use crate::agg::{Fits, FitsInside}; use crate::binned::scalar::binned_stream; use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream}; use crate::cache::{BinnedQuery, MergedFromRemotes}; -use crate::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use crate::frame::makeframe::FrameType; use crate::raw::EventsQuery; use bytes::Bytes; @@ -21,6 +20,7 @@ use netpod::{ AggKind, BinnedRange, NanoRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, }; use num_traits::Zero; +use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize, Serializer}; use serde_json::Map; diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index 5eb88e1..8b13789 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -1,335 +1 @@ -use err::Error; -use netpod::timeunits::MS; -use netpod::{Channel, NanoRange, Nanos, Node, ScalarType, Shape}; -use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; -use nom::Needed; -#[allow(unused_imports)] -use nom::{ - bytes::complete::{tag, take, take_while_m_n}, - combinator::map_res, - sequence::tuple, -}; -use num_derive::{FromPrimitive, ToPrimitive}; -use num_traits::ToPrimitive; -use serde::{Deserialize, Serialize}; -#[allow(unused_imports)] -use tracing::{debug, error, info, trace, warn}; -type NRes<'a, O> = nom::IResult<&'a [u8], O, err::Error>; - -fn mkerr<'a, S, O>(msg: S) -> NRes<'a, O> -where - S: Into, -{ - let e = Error::with_msg(msg); - Err(nom::Err::Error(e)) -} - -#[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] -pub enum CompressionMethod { - BitshuffleLZ4 = 0, -} - -impl CompressionMethod { - pub fn to_i16(&self) -> i16 { - ToPrimitive::to_i16(self).unwrap() - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct ConfigEntry { - pub ts: u64, - pub pulse: i64, - pub ks: i32, - pub bs: Nanos, - pub split_count: i32, - pub status: i32, - pub bb: i8, - pub modulo: i32, - pub offset: i32, - /* - Precision: - 0 'default' whatever that is - -7 f32 - -16 f64 - */ - pub precision: i16, - pub scalar_type: ScalarType, - pub is_compressed: bool, - pub is_shaped: bool, - pub is_array: bool, - pub is_big_endian: bool, - pub compression_method: Option, - pub shape: Option>, - pub source_name: Option, - unit: Option, - description: Option, - optional_fields: Option, - value_converter: Option, -} - -impl ConfigEntry { - pub fn to_shape(&self) -> Result { - let ret = match &self.shape { - Some(lens) => { - if lens.len() == 1 { - Shape::Wave(lens[0]) - } else { - return Err(Error::with_msg(format!("Channel config unsupported shape {:?}", self)))?; - } - } - None => Shape::Scalar, - }; - Ok(ret) - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Config { - pub format_version: i16, - pub channel_name: String, - pub entries: Vec, -} - -fn parse_short_string(inp: &[u8]) -> NRes> { - let (inp, len1) = be_i32(inp)?; - if len1 == -1 { - return Ok((inp, None)); - } - if len1 < 4 { - return mkerr(format!("bad string len {}", len1)); - } - if len1 > 500 { - return mkerr(format!("large string len {}", len1)); - } - let (inp, snb) = take((len1 - 4) as usize)(inp)?; - match String::from_utf8(snb.to_vec()) { - Ok(s1) => Ok((inp, Some(s1))), - Err(e) => mkerr(format!("{:?}", e)), - } -} - -pub fn parse_entry(inp: &[u8]) -> NRes> { - let (inp, len1) = be_i32(inp)?; - if len1 < 0 || len1 > 4000 { - return mkerr(format!("ConfigEntry bad len1 {}", len1)); - } - if inp.len() == 0 { - return Ok((inp, None)); - } - if inp.len() < len1 as usize - 4 { - return Err(nom::Err::Incomplete(Needed::new(len1 as usize - 4))); - } - let inp_e = &inp[(len1 - 8) as usize..]; - let (inp, ts) = be_i64(inp)?; - let (inp, pulse) = be_i64(inp)?; - let (inp, ks) = be_i32(inp)?; - let (inp, bs) = be_i64(inp)?; - let bs = Nanos { ns: bs as u64 * MS }; - let (inp, split_count) = be_i32(inp)?; - let (inp, status) = be_i32(inp)?; - let (inp, bb) = be_i8(inp)?; - let (inp, modulo) = be_i32(inp)?; - let (inp, offset) = be_i32(inp)?; - let (inp, precision) = be_i16(inp)?; - let (inp, dtlen) = be_i32(inp)?; - if dtlen > 100 { - return mkerr(format!("unexpected data type len {}", dtlen)); - } - let (inp, dtmask) = be_u8(inp)?; - let is_compressed = dtmask & 0x80 != 0; - let is_array = dtmask & 0x40 != 0; - let is_big_endian = dtmask & 0x20 != 0; - let is_shaped = dtmask & 0x10 != 0; - let (inp, dtype) = be_u8(inp)?; - if dtype > 13 { - return mkerr(format!("unexpected data type {}", dtype)); - } - let scalar_type = match ScalarType::from_dtype_index(dtype) { - Ok(k) => k, - Err(e) => { - return mkerr(format!("Can not convert {} to DType {:?}", dtype, e)); - } - }; - let (inp, compression_method) = match is_compressed { - false => (inp, None), - true => { - let (inp, cm) = be_u8(inp)?; - match num_traits::FromPrimitive::from_u8(cm) { - Some(k) => (inp, Some(k)), - None => { - return mkerr(format!("unknown compression")); - } - } - } - }; - let (inp, shape) = match is_shaped { - false => (inp, None), - true => { - let (mut inp, dim) = be_u8(inp)?; - if dim > 4 { - return mkerr(format!("unexpected number of dimensions: {}", dim)); - } - let mut shape = vec![]; - for _ in 0..dim { - let t1 = be_i32(inp)?; - inp = t1.0; - shape.push(t1.1 as u32); - } - (inp, Some(shape)) - } - }; - let (inp, source_name) = parse_short_string(inp)?; - let (inp, unit) = parse_short_string(inp)?; - let (inp, description) = parse_short_string(inp)?; - let (inp, optional_fields) = parse_short_string(inp)?; - let (inp, value_converter) = parse_short_string(inp)?; - assert_eq!(inp.len(), inp_e.len()); - let (inp_e, len2) = be_i32(inp_e)?; - if len1 != len2 { - return mkerr(format!("mismatch len1 {} len2 {}", len1, len2)); - } - Ok(( - inp_e, - Some(ConfigEntry { - ts: ts as u64, - pulse, - ks, - bs, - split_count: split_count, - status, - bb, - modulo, - offset, - precision, - scalar_type, - is_compressed: is_compressed, - is_array: is_array, - is_shaped: is_shaped, - is_big_endian: is_big_endian, - compression_method: compression_method, - shape, - source_name: source_name, - unit, - description, - optional_fields: optional_fields, - value_converter: value_converter, - }), - )) -} - -/* -Parse the full configuration file. -*/ -pub fn parse_config(inp: &[u8]) -> NRes { - let (inp, ver) = be_i16(inp)?; - let (inp, len1) = be_i32(inp)?; - if len1 <= 8 || len1 > 500 { - return mkerr(format!("no channel name. len1 {}", len1)); - } - let (inp, chn) = take((len1 - 8) as usize)(inp)?; - let (inp, len2) = be_i32(inp)?; - if len1 != len2 { - return mkerr(format!("Mismatch len1 {} len2 {}", len1, len2)); - } - let mut entries = vec![]; - let mut inp_a = inp; - while inp_a.len() > 0 { - let inp = inp_a; - let (inp, e) = parse_entry(inp)?; - if let Some(e) = e { - entries.push(e); - } - inp_a = inp; - } - let channel_name = match String::from_utf8(chn.to_vec()) { - Ok(k) => k, - Err(e) => { - return mkerr(format!("channelName utf8 error {:?}", e)); - } - }; - let ret = Config { - format_version: ver, - channel_name: channel_name, - entries: entries, - }; - Ok((inp, ret)) -} - -pub async fn read_local_config(channel: &Channel, node: &Node) -> Result { - let path = node - .data_base_path - .join("config") - .join(&channel.name) - .join("latest") - .join("00000_Config"); - let buf = tokio::fs::read(&path).await?; - let config = parse_config(&buf)?; - Ok(config.1) -} - -pub enum MatchingConfigEntry<'a> { - None, - Multiple, - Entry(&'a ConfigEntry), -} - -pub fn extract_matching_config_entry<'a>( - range: &NanoRange, - channel_config: &'a Config, -) -> Result, Error> { - let mut ixs = vec![]; - for i1 in 0..channel_config.entries.len() { - let e1 = &channel_config.entries[i1]; - if i1 + 1 < channel_config.entries.len() { - let e2 = &channel_config.entries[i1 + 1]; - if e1.ts < range.end && e2.ts >= range.beg { - ixs.push(i1); - } - } else { - if e1.ts < range.end { - ixs.push(i1); - } - } - } - if ixs.len() == 0 { - Ok(MatchingConfigEntry::None) - } else if ixs.len() > 1 { - Ok(MatchingConfigEntry::Multiple) - } else { - Ok(MatchingConfigEntry::Entry(&channel_config.entries[ixs[0]])) - } -} - -#[cfg(test)] -mod test { - use super::parse_config; - - fn read_data() -> Vec { - use std::io::Read; - let path = "ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config"; - let mut f1 = std::fs::File::open(path).unwrap(); - let mut buf = vec![]; - f1.read_to_end(&mut buf).unwrap(); - buf - } - - #[test] - fn parse_dummy() { - let config = parse_config(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap(); - assert_eq!(0, config.1.format_version); - assert_eq!("abc", config.1.channel_name); - } - - #[test] - fn open_file() { - let config = parse_config(&read_data()).unwrap().1; - assert_eq!(0, config.format_version); - assert_eq!(9, config.entries.len()); - for e in &config.entries { - assert!(e.ts >= 631152000000000000); - assert!(e.ts <= 1591106812800073974); - assert!(e.shape.is_some()); - } - } -} diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index c7e2957..56ccb16 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -3,7 +3,6 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; use crate::agg::IntoDim1F32Stream; use crate::binned::{BinnedStreamKind, BinnedStreamKindScalar, RangeCompletableItem}; -use crate::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use crate::eventblobs::EventBlobsComplete; use crate::eventchunker::EventChunkerConf; use crate::frame::inmem::InMemoryFrameAsyncReadStream; @@ -13,6 +12,7 @@ use err::Error; use futures_util::StreamExt; use netpod::log::*; use netpod::{AggKind, ByteSize, NodeConfigCached, PerfOpts}; +use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; use std::io; use std::net::SocketAddr; use tokio::io::AsyncWriteExt; diff --git a/err/Cargo.toml b/err/Cargo.toml index 4ae8711..59fc7d9 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -17,3 +17,4 @@ chrono = { version = "0.4.19", features = ["serde"] } nom = "6.1.2" tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } serde_cbor = "0.11.1" +regex = "1.5.4" diff --git a/err/src/lib.rs b/err/src/lib.rs index d0b7aa1..0178a69 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -231,6 +231,12 @@ impl From for Error { } } +impl From for Error { + fn from(k: regex::Error) -> Self { + Self::with_msg(k.to_string()) + } +} + pub fn todo() { todo!("TODO"); } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index ea31ca3..2a53fa3 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -808,6 +808,7 @@ impl ChannelSearchQuery { #[derive(Serialize, Deserialize)] pub struct ChannelSearchSingleResult { + pub backend: String, pub name: String, pub source: String, #[serde(rename = "type")] diff --git a/parse/Cargo.toml b/parse/Cargo.toml new file mode 100644 index 0000000..ed8bb03 --- /dev/null +++ b/parse/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "parse" +version = "0.0.1-a.0" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +chrono = { version = "0.4.19", features = ["serde"] } +bytes = "1.0.1" +byteorder = "1.4.3" +hex = "0.4.3" +nom = "6.1.2" +num-traits = "0.2" +num-derive = "0.3" +err = { path = "../err" } +netpod = { path = "../netpod" } diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs new file mode 100644 index 0000000..ff43b0e --- /dev/null +++ b/parse/src/channelconfig.rs @@ -0,0 +1,331 @@ +use err::Error; +use netpod::timeunits::MS; +use netpod::{Channel, NanoRange, Nanos, Node, ScalarType, Shape}; +use nom::bytes::complete::take; +use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; +use nom::Needed; +//use nom::bytes::complete::{tag, take_while_m_n}; +//use nom::combinator::map_res; +//use nom::sequence::tuple; +use num_derive::{FromPrimitive, ToPrimitive}; +use num_traits::ToPrimitive; +use serde::{Deserialize, Serialize}; + +type NRes<'a, O> = nom::IResult<&'a [u8], O, err::Error>; + +fn mkerr<'a, S, O>(msg: S) -> NRes<'a, O> +where + S: Into, +{ + let e = Error::with_msg(msg); + Err(nom::Err::Error(e)) +} + +#[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] +pub enum CompressionMethod { + BitshuffleLZ4 = 0, +} + +impl CompressionMethod { + pub fn to_i16(&self) -> i16 { + ToPrimitive::to_i16(self).unwrap() + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ConfigEntry { + pub ts: u64, + pub pulse: i64, + pub ks: i32, + pub bs: Nanos, + pub split_count: i32, + pub status: i32, + pub bb: i8, + pub modulo: i32, + pub offset: i32, + /* + Precision: + 0 'default' whatever that is + -7 f32 + -16 f64 + */ + pub precision: i16, + pub scalar_type: ScalarType, + pub is_compressed: bool, + pub is_shaped: bool, + pub is_array: bool, + pub is_big_endian: bool, + pub compression_method: Option, + pub shape: Option>, + pub source_name: Option, + unit: Option, + description: Option, + optional_fields: Option, + value_converter: Option, +} + +impl ConfigEntry { + pub fn to_shape(&self) -> Result { + let ret = match &self.shape { + Some(lens) => { + if lens.len() == 1 { + Shape::Wave(lens[0]) + } else { + return Err(Error::with_msg(format!("Channel config unsupported shape {:?}", self)))?; + } + } + None => Shape::Scalar, + }; + Ok(ret) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Config { + pub format_version: i16, + pub channel_name: String, + pub entries: Vec, +} + +fn parse_short_string(inp: &[u8]) -> NRes> { + let (inp, len1) = be_i32(inp)?; + if len1 == -1 { + return Ok((inp, None)); + } + if len1 < 4 { + return mkerr(format!("bad string len {}", len1)); + } + if len1 > 500 { + return mkerr(format!("large string len {}", len1)); + } + let (inp, snb) = take((len1 - 4) as usize)(inp)?; + match String::from_utf8(snb.to_vec()) { + Ok(s1) => Ok((inp, Some(s1))), + Err(e) => mkerr(format!("{:?}", e)), + } +} + +pub fn parse_entry(inp: &[u8]) -> NRes> { + let (inp, len1) = be_i32(inp)?; + if len1 < 0 || len1 > 4000 { + return mkerr(format!("ConfigEntry bad len1 {}", len1)); + } + if inp.len() == 0 { + return Ok((inp, None)); + } + if inp.len() < len1 as usize - 4 { + return Err(nom::Err::Incomplete(Needed::new(len1 as usize - 4))); + } + let inp_e = &inp[(len1 - 8) as usize..]; + let (inp, ts) = be_i64(inp)?; + let (inp, pulse) = be_i64(inp)?; + let (inp, ks) = be_i32(inp)?; + let (inp, bs) = be_i64(inp)?; + let bs = Nanos { ns: bs as u64 * MS }; + let (inp, split_count) = be_i32(inp)?; + let (inp, status) = be_i32(inp)?; + let (inp, bb) = be_i8(inp)?; + let (inp, modulo) = be_i32(inp)?; + let (inp, offset) = be_i32(inp)?; + let (inp, precision) = be_i16(inp)?; + let (inp, dtlen) = be_i32(inp)?; + if dtlen > 100 { + return mkerr(format!("unexpected data type len {}", dtlen)); + } + let (inp, dtmask) = be_u8(inp)?; + let is_compressed = dtmask & 0x80 != 0; + let is_array = dtmask & 0x40 != 0; + let is_big_endian = dtmask & 0x20 != 0; + let is_shaped = dtmask & 0x10 != 0; + let (inp, dtype) = be_u8(inp)?; + if dtype > 13 { + return mkerr(format!("unexpected data type {}", dtype)); + } + let scalar_type = match ScalarType::from_dtype_index(dtype) { + Ok(k) => k, + Err(e) => { + return mkerr(format!("Can not convert {} to DType {:?}", dtype, e)); + } + }; + let (inp, compression_method) = match is_compressed { + false => (inp, None), + true => { + let (inp, cm) = be_u8(inp)?; + match num_traits::FromPrimitive::from_u8(cm) { + Some(k) => (inp, Some(k)), + None => { + return mkerr(format!("unknown compression")); + } + } + } + }; + let (inp, shape) = match is_shaped { + false => (inp, None), + true => { + let (mut inp, dim) = be_u8(inp)?; + if dim > 4 { + return mkerr(format!("unexpected number of dimensions: {}", dim)); + } + let mut shape = vec![]; + for _ in 0..dim { + let t1 = be_i32(inp)?; + inp = t1.0; + shape.push(t1.1 as u32); + } + (inp, Some(shape)) + } + }; + let (inp, source_name) = parse_short_string(inp)?; + let (inp, unit) = parse_short_string(inp)?; + let (inp, description) = parse_short_string(inp)?; + let (inp, optional_fields) = parse_short_string(inp)?; + let (inp, value_converter) = parse_short_string(inp)?; + assert_eq!(inp.len(), inp_e.len()); + let (inp_e, len2) = be_i32(inp_e)?; + if len1 != len2 { + return mkerr(format!("mismatch len1 {} len2 {}", len1, len2)); + } + Ok(( + inp_e, + Some(ConfigEntry { + ts: ts as u64, + pulse, + ks, + bs, + split_count: split_count, + status, + bb, + modulo, + offset, + precision, + scalar_type, + is_compressed: is_compressed, + is_array: is_array, + is_shaped: is_shaped, + is_big_endian: is_big_endian, + compression_method: compression_method, + shape, + source_name: source_name, + unit, + description, + optional_fields: optional_fields, + value_converter: value_converter, + }), + )) +} + +/* +Parse the full configuration file. +*/ +pub fn parse_config(inp: &[u8]) -> NRes { + let (inp, ver) = be_i16(inp)?; + let (inp, len1) = be_i32(inp)?; + if len1 <= 8 || len1 > 500 { + return mkerr(format!("no channel name. len1 {}", len1)); + } + let (inp, chn) = take((len1 - 8) as usize)(inp)?; + let (inp, len2) = be_i32(inp)?; + if len1 != len2 { + return mkerr(format!("Mismatch len1 {} len2 {}", len1, len2)); + } + let mut entries = vec![]; + let mut inp_a = inp; + while inp_a.len() > 0 { + let inp = inp_a; + let (inp, e) = parse_entry(inp)?; + if let Some(e) = e { + entries.push(e); + } + inp_a = inp; + } + let channel_name = match String::from_utf8(chn.to_vec()) { + Ok(k) => k, + Err(e) => { + return mkerr(format!("channelName utf8 error {:?}", e)); + } + }; + let ret = Config { + format_version: ver, + channel_name: channel_name, + entries: entries, + }; + Ok((inp, ret)) +} + +pub async fn read_local_config(channel: &Channel, node: &Node) -> Result { + let path = node + .data_base_path + .join("config") + .join(&channel.name) + .join("latest") + .join("00000_Config"); + let buf = tokio::fs::read(&path).await?; + let config = parse_config(&buf)?; + Ok(config.1) +} + +pub enum MatchingConfigEntry<'a> { + None, + Multiple, + Entry(&'a ConfigEntry), +} + +pub fn extract_matching_config_entry<'a>( + range: &NanoRange, + channel_config: &'a Config, +) -> Result, Error> { + let mut ixs = vec![]; + for i1 in 0..channel_config.entries.len() { + let e1 = &channel_config.entries[i1]; + if i1 + 1 < channel_config.entries.len() { + let e2 = &channel_config.entries[i1 + 1]; + if e1.ts < range.end && e2.ts >= range.beg { + ixs.push(i1); + } + } else { + if e1.ts < range.end { + ixs.push(i1); + } + } + } + if ixs.len() == 0 { + Ok(MatchingConfigEntry::None) + } else if ixs.len() > 1 { + Ok(MatchingConfigEntry::Multiple) + } else { + Ok(MatchingConfigEntry::Entry(&channel_config.entries[ixs[0]])) + } +} + +#[cfg(test)] +mod test { + use super::parse_config; + + fn read_data() -> Vec { + use std::io::Read; + let path = "ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config"; + let mut f1 = std::fs::File::open(path).unwrap(); + let mut buf = vec![]; + f1.read_to_end(&mut buf).unwrap(); + buf + } + + #[test] + fn parse_dummy() { + let config = parse_config(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap(); + assert_eq!(0, config.1.format_version); + assert_eq!("abc", config.1.channel_name); + } + + #[test] + fn open_file() { + let config = parse_config(&read_data()).unwrap().1; + assert_eq!(0, config.format_version); + assert_eq!(9, config.entries.len()); + for e in &config.entries { + assert!(e.ts >= 631152000000000000); + assert!(e.ts <= 1591106812800073974); + assert!(e.shape.is_some()); + } + } +} diff --git a/parse/src/lib.rs b/parse/src/lib.rs new file mode 100644 index 0000000..9c63800 --- /dev/null +++ b/parse/src/lib.rs @@ -0,0 +1 @@ +pub mod channelconfig;