From 5e624bb2ca375874549902bb774749807e1d5140 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 30 Jul 2021 22:14:07 +0200 Subject: [PATCH] Find start position in large files via binary search --- archapp/src/events.rs | 207 ++++++++++++++++++++++++++++++++++++- archapp/src/parse.rs | 87 ++++++++++------ archapp/src/parse/multi.rs | 50 +++++++++ daqbufp2/src/nodes.rs | 1 + disk/src/aggtest.rs | 1 + disk/src/cache.rs | 2 +- disk/src/gen.rs | 1 + netfetch/src/test.rs | 1 + netpod/src/lib.rs | 1 + 9 files changed, 318 insertions(+), 33 deletions(-) create mode 100644 archapp/src/parse/multi.rs diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 8e058a8..dd0526e 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -1,3 +1,5 @@ +use crate::generated::EPICSEvent::PayloadType; +use crate::parse::multi::parse_all_ts; use crate::parse::PbFileReader; use crate::{ EventsItem, MultiBinWaveEvents, PlainEvents, ScalarPlainEvents, SingleBinWaveEvents, WavePlainEvents, XBinnedEvents, @@ -19,10 +21,12 @@ use netpod::timeunits::{DAY, SEC}; use netpod::{AggKind, ArchiverAppliance, Channel, ChannelInfo, HasScalarType, HasShape, NanoRange, ScalarType, Shape}; use serde::Serialize; use serde_json::Value as JsonValue; +use std::io::SeekFrom; use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::{read_dir, File}; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; pub struct DataFilename { year: u32, @@ -403,13 +407,31 @@ pub async fn make_single_event_pipe( info!("•••••••••••••••••••••••••• file matches requested range"); let f1 = File::open(de.path()).await?; info!("opened {:?}", de.path()); + + let z = position_file_for_evq(f1, evq.clone(), df.year).await?; + let mut f1 = if let PositionState::Positioned = z.state { + z.file + } else { + continue; + }; + + // TODO could avoid some seeks if position_file_for_evq would return the position instead of + // positioning the file. + let pos1 = f1.stream_position().await?; + f1.seek(SeekFrom::Start(0)).await?; let mut pbr = PbFileReader::new(f1).await; pbr.read_header().await?; info!("✓ read header {:?}", pbr.payload_type()); + + // TODO this is ugly: + pbr.file().seek(SeekFrom::Start(pos1)).await?; + pbr.reset_io(pos1); + let mut i1 = 0; 'evread: loop { match pbr.read_msg().await { - Ok(ei) => { + Ok(Some(ei)) => { + let ei = ei.item; let tslast = if ei.len() > 0 { Some(ei.ts(ei.len() - 1)) } else { None }; i1 += 1; if i1 % 1000 == 0 { @@ -425,6 +447,10 @@ pub async fn make_single_event_pipe( } } } + Ok(None) => { + info!("reached end of file"); + break; + } Err(e) => { error!("error while reading msg {:?}", e); break; @@ -455,6 +481,177 @@ pub async fn make_single_event_pipe( Ok(Box::pin(rx)) } +pub enum PositionState { + NothingFound, + Positioned, +} + +pub struct PositionResult { + file: File, + state: PositionState, +} + +async fn position_file_for_evq(mut file: File, evq: RawEventsQuery, year: u32) -> Result { + let flen = file.seek(SeekFrom::End(0)).await?; + file.seek(SeekFrom::Start(0)).await?; + if flen < 1024 * 512 { + position_file_for_evq_linear(file, evq, year).await + } else { + position_file_for_evq_binary(file, evq, year).await + } +} + +async fn position_file_for_evq_linear(mut file: File, evq: RawEventsQuery, year: u32) -> Result { + let mut pbr = PbFileReader::new(file).await; + pbr.read_header().await?; + loop { + let res = pbr.read_msg().await?; + let res = if let Some(k) = res { + k + } else { + let ret = PositionResult { + file: pbr.into_file(), + state: PositionState::NothingFound, + }; + return Ok(ret); + }; + if res.item.len() < 1 { + return Err(Error::with_msg_no_trace("no event read from file")); + } + if res.item.ts(res.item.len() - 1) >= evq.range.beg { + let ret = PositionResult { + file: pbr.into_file(), + state: PositionState::Positioned, + }; + return Ok(ret); + } + } +} + +async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year: u32) -> Result { + info!("position_file_for_evq_binary"); + let flen = file.seek(SeekFrom::End(0)).await?; + file.seek(SeekFrom::Start(0)).await?; + let mut pbr = PbFileReader::new(file).await; + pbr.read_header().await?; + let payload_type = pbr.payload_type().clone(); + let res = pbr.read_msg().await?; + let mut file = pbr.into_file(); + let res = if let Some(res) = res { + res + } else { + return Err(Error::with_msg_no_trace("no event read from file")); + }; + if res.item.len() < 1 { + return Err(Error::with_msg_no_trace("no event read from file")); + } + let events_begin_pos = res.pos; + + // * the search invariant is that the ts1 < beg and ts2 >= end + // * read some data from the end. + // * read some data from the begin. + // * extract events from begin and end. + // * check if the binary search invariant is already violated, in that case return. + // * otherwise, choose some spot in the middle, read there the next chunk. + // Then use the actual position of the found item! + let mut buf1 = vec![0; 1024 * 16]; + let mut buf2 = vec![0; 1024 * 16]; + let mut buf3 = vec![0; 1024 * 16]; + + let mut p1 = events_begin_pos; + let mut p2 = flen - buf2.len() as u64; + + file.seek(SeekFrom::Start(p1 - 1)).await?; + file.read_exact(&mut buf1).await?; + file.seek(SeekFrom::Start(p2)).await?; + file.read_exact(&mut buf2).await?; + + let evs1 = parse_all_ts(p1 - 1, &buf1, payload_type.clone(), year)?; + let evs2 = parse_all_ts(p2, &buf2, payload_type.clone(), year)?; + + info!("..............................................................."); + info!("evs1: {:?}", evs1); + info!("evs2: {:?}", evs2); + info!("p1: {}", p1); + info!("p2: {}", p2); + + let tgt = evq.range.beg; + + { + let ev = evs1.first().unwrap(); + if ev.ts >= tgt { + file.seek(SeekFrom::Start(ev.pos)).await?; + let ret = PositionResult { + state: PositionState::Positioned, + file, + }; + return Ok(ret); + } + } + { + let ev = evs2.last().unwrap(); + if ev.ts < tgt { + file.seek(SeekFrom::Start(0)).await?; + let ret = PositionResult { + state: PositionState::NothingFound, + file, + }; + return Ok(ret); + } + } + + p2 = evs2.last().unwrap().pos; + + // TODO make sure that NL-delimited chunks have a max size. + loop { + info!("bsearch loop p1 {} p2 {}", p1, p2); + if p2 - p1 < 1024 * 128 { + // TODO switch here to linear search... + info!("switch to linear search in pos {}..{}", p1, p2); + return linear_search_2(file, evq, year, p1, p2, payload_type).await; + } + let p3 = (p2 + p1) / 2; + file.seek(SeekFrom::Start(p3)).await?; + file.read_exact(&mut buf3).await?; + let evs3 = parse_all_ts(p3, &buf3, payload_type.clone(), year)?; + let ev = evs3.first().unwrap(); + if ev.ts < tgt { + info!("p3 {} ts: {} pos: {} branch A", p3, ev.ts, ev.pos); + p1 = ev.pos; + } else { + info!("p3 {} ts: {} pos: {} branch B", p3, ev.ts, ev.pos); + p2 = ev.pos; + } + } +} + +async fn linear_search_2( + mut file: File, + evq: RawEventsQuery, + year: u32, + p1: u64, + p2: u64, + payload_type: PayloadType, +) -> Result { + eprintln!("linear_search_2"); + file.seek(SeekFrom::Start(p1 - 1)).await?; + let mut buf = vec![0; (p2 - p1) as usize]; + file.read_exact(&mut buf).await?; + let evs1 = parse_all_ts(p1 - 1, &buf, payload_type.clone(), year)?; + for ev in evs1 { + if ev.ts >= evq.range.beg { + info!("FOUND {:?}", ev); + file.seek(SeekFrom::Start(ev.pos)).await?; + let ret = PositionResult { + file, + state: PositionState::Positioned, + }; + return Ok(ret); + } + } + Err(Error::with_msg_no_trace("linear_search_2 failed")) +} + #[allow(unused)] fn events_item_to_framable(ei: EventsItem) -> Result, Error> { match ei { @@ -527,15 +724,19 @@ pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result { + Ok(Some(item)) => { + let item = item.item; msgs.push(format!("got event {:?}", item)); shape = Some(item.shape()); // These type mappings are defined by the protobuffer schema. scalar_type = Some(item.scalar_type()); break; } + Ok(None) => { + msgs.push(format!("can not read event")); + } Err(e) => { - msgs.push(format!("can not read event! {:?}", e)); + msgs.push(format!("can not read event {:?}", e)); } } msgs.push(format!("got header {}", pbr.channel_name())); diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index 41b7a88..f2069a2 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -1,3 +1,5 @@ +pub mod multi; + use crate::events::parse_data_filename; use crate::generated::EPICSEvent::PayloadType; use crate::{unescape_archapp_msg, EventsItem, PlainEvents, ScalarPlainEvents, WavePlainEvents}; @@ -27,6 +29,7 @@ pub struct PbFileReader { escbuf: Vec, wp: usize, rp: usize, + off: u64, channel_name: String, payload_type: PayloadType, year: u32, @@ -85,6 +88,11 @@ macro_rules! wave_parse { const MIN_BUF_FILL: usize = 1024 * 64; +pub struct ReadMessageResult { + pub pos: u64, + pub item: EventsItem, +} + impl PbFileReader { pub async fn new(file: File) -> Self { Self { @@ -93,12 +101,27 @@ impl PbFileReader { escbuf: vec![], wp: 0, rp: 0, + off: 0, channel_name: String::new(), payload_type: PayloadType::V4_GENERIC_BYTES, year: 0, } } + pub fn into_file(self) -> File { + self.file + } + + pub fn file(&mut self) -> &mut File { + &mut self.file + } + + pub fn reset_io(&mut self, off: u64) { + self.wp = 0; + self.rp = 0; + self.off = off; + } + pub async fn read_header(&mut self) -> Result<(), Error> { self.fill_buf().await?; let k = self.find_next_nl()?; @@ -110,62 +133,73 @@ impl PbFileReader { self.channel_name = payload_info.get_pvname().into(); self.payload_type = payload_info.get_field_type(); self.year = payload_info.get_year() as u32; + self.off += k as u64 + 1 - self.rp as u64; self.rp = k + 1; Ok(()) } - pub async fn read_msg(&mut self) -> Result { + pub async fn read_msg(&mut self) -> Result, Error> { self.fill_buf().await?; - let k = self.find_next_nl()?; + let k = if let Ok(k) = self.find_next_nl() { + k + } else { + return Ok(None); + }; let buf = &mut self.buf; let m = mem::replace(&mut self.escbuf, vec![]); let m = unescape_archapp_msg(&buf[self.rp..k], m)?; self.escbuf = m; - let m = &self.escbuf; + let ei = Self::parse_buffer(&self.escbuf, self.payload_type.clone(), self.year)?; + let ret = ReadMessageResult { + pos: self.off, + item: ei, + }; + self.off += k as u64 + 1 - self.rp as u64; + self.rp = k + 1; + Ok(Some(ret)) + } + + pub fn parse_buffer(m: &[u8], payload_type: PayloadType, year: u32) -> Result { use PayloadType::*; - let ei = match self.payload_type { - SCALAR_BYTE => parse_scalar_byte(m, self.year)?, + let ei = match payload_type { + SCALAR_BYTE => parse_scalar_byte(m, year)?, SCALAR_ENUM => { - scalar_parse!(m, self.year, ScalarEnum, Int, i32) + scalar_parse!(m, year, ScalarEnum, Int, i32) } SCALAR_SHORT => { - scalar_parse!(m, self.year, ScalarShort, Short, i16) + scalar_parse!(m, year, ScalarShort, Short, i16) } SCALAR_INT => { - scalar_parse!(m, self.year, ScalarInt, Int, i32) + scalar_parse!(m, year, ScalarInt, Int, i32) } SCALAR_FLOAT => { - scalar_parse!(m, self.year, ScalarFloat, Float, f32) + scalar_parse!(m, year, ScalarFloat, Float, f32) } SCALAR_DOUBLE => { - scalar_parse!(m, self.year, ScalarDouble, Double, f64) + scalar_parse!(m, year, ScalarDouble, Double, f64) } WAVEFORM_BYTE => { - wave_parse!(m, self.year, VectorChar, Byte, i8) + wave_parse!(m, year, VectorChar, Byte, i8) } WAVEFORM_SHORT => { - wave_parse!(m, self.year, VectorShort, Short, i16) + wave_parse!(m, year, VectorShort, Short, i16) } WAVEFORM_ENUM => { - wave_parse!(m, self.year, VectorEnum, Int, i32) + wave_parse!(m, year, VectorEnum, Int, i32) } WAVEFORM_INT => { - wave_parse!(m, self.year, VectorInt, Int, i32) + wave_parse!(m, year, VectorInt, Int, i32) } WAVEFORM_FLOAT => { - wave_parse!(m, self.year, VectorFloat, Float, f32) + wave_parse!(m, year, VectorFloat, Float, f32) } WAVEFORM_DOUBLE => { - wave_parse!(m, self.year, VectorDouble, Double, f64) + wave_parse!(m, year, VectorDouble, Double, f64) } SCALAR_STRING | WAVEFORM_STRING | V4_GENERIC_BYTES => { - return Err(Error::with_msg_no_trace(format!( - "not supported: {:?}", - self.payload_type - ))); + return Err(Error::with_msg_no_trace(format!("not supported: {:?}", payload_type))); } }; - self.rp = k + 1; Ok(ei) } @@ -176,9 +210,6 @@ impl PbFileReader { if self.rp + MIN_BUF_FILL >= self.buf.len() { let n = self.wp - self.rp; self.buf.copy_within(self.rp..self.rp + n, 0); - //for i in 0..n { - // self.buf[i] = self.buf[self.rp + i]; - //} self.rp = 0; self.wp = n; } @@ -205,6 +236,7 @@ impl PbFileReader { k += 1; } if k == self.wp { + // TODO test whether with_msg_no_trace makes difference. return Err(Error::with_msg("no nl in pb file")); } Ok(k) @@ -442,7 +474,8 @@ pub async fn scan_files_inner( if false { dbconn::insert_channel(channel_path.into(), ndi.facility, &dbc).await?; } - if let Ok(msg) = pbr.read_msg().await { + if let Ok(Some(msg)) = pbr.read_msg().await { + let msg = msg.item; lru.insert(channel_path); { tx.send(Ok(Box::new(serde_json::to_value(format!( @@ -451,10 +484,6 @@ pub async fn scan_files_inner( msg.variant_name() ))?) as ItemSerBox)) .await?; - /*waves_found += 1; - if waves_found >= 20 { - break; - }*/ } } } diff --git a/archapp/src/parse/multi.rs b/archapp/src/parse/multi.rs new file mode 100644 index 0000000..f82e74c --- /dev/null +++ b/archapp/src/parse/multi.rs @@ -0,0 +1,50 @@ +use crate::generated::EPICSEvent::PayloadType; +use crate::parse::PbFileReader; +use err::Error; +use items::{WithLen, WithTimestamps}; + +#[derive(Debug)] +pub struct PosTs { + pub pos: u64, + pub ts: u64, +} + +pub fn parse_all_ts(off: u64, buf: &[u8], payload_type: PayloadType, year: u32) -> Result, Error> { + let mut ret = vec![]; + let mut i1 = 0; + let mut i2 = usize::MAX; + loop { + if i1 >= buf.len() { + break; + } + if buf[i1] == 10 { + if i2 == usize::MAX { + i2 = i1; + } else { + // Have a chunk from i2..i1 + match PbFileReader::parse_buffer(&buf[i2 + 1..i1], payload_type.clone(), year) { + Ok(k) => { + if k.len() != 1 { + return Err(Error::with_msg_no_trace(format!( + "parsed buffer contained {} events", + k.len() + ))); + } else { + let h = PosTs { + pos: off + i2 as u64 + 1, + ts: k.ts(0), + }; + ret.push(h); + } + } + Err(e) => { + // TODO ignore except if it's the last chunk. + } + } + i2 = i1; + } + } + i1 += 1; + } + Ok(ret) +} diff --git a/daqbufp2/src/nodes.rs b/daqbufp2/src/nodes.rs index 4c15fa0..4ac29aa 100644 --- a/daqbufp2/src/nodes.rs +++ b/daqbufp2/src/nodes.rs @@ -40,6 +40,7 @@ fn test_cluster() -> Cluster { port: 8360 + id as u16, port_raw: 8360 + id as u16 + 100, data_base_path: format!("../tmpdata/node{:02}", id).into(), + cache_base_path: format!("../tmpdata/node{:02}", id).into(), ksprefix: "ks".into(), split: id, backend: "testbackend".into(), diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 6857d38..46645e3 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -12,6 +12,7 @@ pub fn make_test_node(id: u32) -> Node { port: 8800 + id as u16, port_raw: 8800 + id as u16 + 100, data_base_path: format!("../tmpdata/node{:02}", id).into(), + cache_base_path: format!("../tmpdata/node{:02}", id).into(), split: id, ksprefix: "ks".into(), backend: "testbackend".into(), diff --git a/disk/src/cache.rs b/disk/src/cache.rs index aba778f..efec22e 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -138,7 +138,7 @@ impl CacheFileDesc { let hc = self.hash_channel(); node_config .node - .data_base_path + .cache_base_path .join("cache") .join(&hc[0..3]) .join(&hc[3..6]) diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 8fccfa3..311762e 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -78,6 +78,7 @@ pub async fn gen_test_data() -> Result<(), Error> { port_raw: 7780 + i1 as u16 + 100, split: i1, data_base_path: data_base_path.join(format!("node{:02}", i1)), + cache_base_path: data_base_path.join(format!("node{:02}", i1)), ksprefix: ksprefix.clone(), backend: "testbackend".into(), bin_grain_kind: 0, diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs index b682e85..bde3c7b 100644 --- a/netfetch/src/test.rs +++ b/netfetch/src/test.rs @@ -18,6 +18,7 @@ fn ca_connect_1() { backend: "".into(), split: 0, data_base_path: "".into(), + cache_base_path: "".into(), listen: "".into(), ksprefix: "".into(), archiver_appliance: None, diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 945448c..f7582db 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -129,6 +129,7 @@ pub struct Node { pub port_raw: u16, pub split: u32, pub data_base_path: PathBuf, + pub cache_base_path: PathBuf, pub ksprefix: String, pub backend: String, #[serde(default)]