Find start position in large files via binary search
This commit is contained in:
+204
-3
@@ -1,3 +1,5 @@
|
|||||||
|
use crate::generated::EPICSEvent::PayloadType;
|
||||||
|
use crate::parse::multi::parse_all_ts;
|
||||||
use crate::parse::PbFileReader;
|
use crate::parse::PbFileReader;
|
||||||
use crate::{
|
use crate::{
|
||||||
EventsItem, MultiBinWaveEvents, PlainEvents, ScalarPlainEvents, SingleBinWaveEvents, WavePlainEvents, XBinnedEvents,
|
EventsItem, MultiBinWaveEvents, PlainEvents, ScalarPlainEvents, SingleBinWaveEvents, WavePlainEvents, XBinnedEvents,
|
||||||
@@ -19,10 +21,12 @@ use netpod::timeunits::{DAY, SEC};
|
|||||||
use netpod::{AggKind, ArchiverAppliance, Channel, ChannelInfo, HasScalarType, HasShape, NanoRange, ScalarType, Shape};
|
use netpod::{AggKind, ArchiverAppliance, Channel, ChannelInfo, HasScalarType, HasShape, NanoRange, ScalarType, Shape};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json::Value as JsonValue;
|
use serde_json::Value as JsonValue;
|
||||||
|
use std::io::SeekFrom;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use tokio::fs::{read_dir, File};
|
use tokio::fs::{read_dir, File};
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||||
|
|
||||||
pub struct DataFilename {
|
pub struct DataFilename {
|
||||||
year: u32,
|
year: u32,
|
||||||
@@ -403,13 +407,31 @@ pub async fn make_single_event_pipe(
|
|||||||
info!("•••••••••••••••••••••••••• file matches requested range");
|
info!("•••••••••••••••••••••••••• file matches requested range");
|
||||||
let f1 = File::open(de.path()).await?;
|
let f1 = File::open(de.path()).await?;
|
||||||
info!("opened {:?}", de.path());
|
info!("opened {:?}", de.path());
|
||||||
|
|
||||||
|
let z = position_file_for_evq(f1, evq.clone(), df.year).await?;
|
||||||
|
let mut f1 = if let PositionState::Positioned = z.state {
|
||||||
|
z.file
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO could avoid some seeks if position_file_for_evq would return the position instead of
|
||||||
|
// positioning the file.
|
||||||
|
let pos1 = f1.stream_position().await?;
|
||||||
|
f1.seek(SeekFrom::Start(0)).await?;
|
||||||
let mut pbr = PbFileReader::new(f1).await;
|
let mut pbr = PbFileReader::new(f1).await;
|
||||||
pbr.read_header().await?;
|
pbr.read_header().await?;
|
||||||
info!("✓ read header {:?}", pbr.payload_type());
|
info!("✓ read header {:?}", pbr.payload_type());
|
||||||
|
|
||||||
|
// TODO this is ugly:
|
||||||
|
pbr.file().seek(SeekFrom::Start(pos1)).await?;
|
||||||
|
pbr.reset_io(pos1);
|
||||||
|
|
||||||
let mut i1 = 0;
|
let mut i1 = 0;
|
||||||
'evread: loop {
|
'evread: loop {
|
||||||
match pbr.read_msg().await {
|
match pbr.read_msg().await {
|
||||||
Ok(ei) => {
|
Ok(Some(ei)) => {
|
||||||
|
let ei = ei.item;
|
||||||
let tslast = if ei.len() > 0 { Some(ei.ts(ei.len() - 1)) } else { None };
|
let tslast = if ei.len() > 0 { Some(ei.ts(ei.len() - 1)) } else { None };
|
||||||
i1 += 1;
|
i1 += 1;
|
||||||
if i1 % 1000 == 0 {
|
if i1 % 1000 == 0 {
|
||||||
@@ -425,6 +447,10 @@ pub async fn make_single_event_pipe(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
info!("reached end of file");
|
||||||
|
break;
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("error while reading msg {:?}", e);
|
error!("error while reading msg {:?}", e);
|
||||||
break;
|
break;
|
||||||
@@ -455,6 +481,177 @@ pub async fn make_single_event_pipe(
|
|||||||
Ok(Box::pin(rx))
|
Ok(Box::pin(rx))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum PositionState {
|
||||||
|
NothingFound,
|
||||||
|
Positioned,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct PositionResult {
|
||||||
|
file: File,
|
||||||
|
state: PositionState,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn position_file_for_evq(mut file: File, evq: RawEventsQuery, year: u32) -> Result<PositionResult, Error> {
|
||||||
|
let flen = file.seek(SeekFrom::End(0)).await?;
|
||||||
|
file.seek(SeekFrom::Start(0)).await?;
|
||||||
|
if flen < 1024 * 512 {
|
||||||
|
position_file_for_evq_linear(file, evq, year).await
|
||||||
|
} else {
|
||||||
|
position_file_for_evq_binary(file, evq, year).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn position_file_for_evq_linear(mut file: File, evq: RawEventsQuery, year: u32) -> Result<PositionResult, Error> {
|
||||||
|
let mut pbr = PbFileReader::new(file).await;
|
||||||
|
pbr.read_header().await?;
|
||||||
|
loop {
|
||||||
|
let res = pbr.read_msg().await?;
|
||||||
|
let res = if let Some(k) = res {
|
||||||
|
k
|
||||||
|
} else {
|
||||||
|
let ret = PositionResult {
|
||||||
|
file: pbr.into_file(),
|
||||||
|
state: PositionState::NothingFound,
|
||||||
|
};
|
||||||
|
return Ok(ret);
|
||||||
|
};
|
||||||
|
if res.item.len() < 1 {
|
||||||
|
return Err(Error::with_msg_no_trace("no event read from file"));
|
||||||
|
}
|
||||||
|
if res.item.ts(res.item.len() - 1) >= evq.range.beg {
|
||||||
|
let ret = PositionResult {
|
||||||
|
file: pbr.into_file(),
|
||||||
|
state: PositionState::Positioned,
|
||||||
|
};
|
||||||
|
return Ok(ret);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn position_file_for_evq_binary(mut file: File, evq: RawEventsQuery, year: u32) -> Result<PositionResult, Error> {
|
||||||
|
info!("position_file_for_evq_binary");
|
||||||
|
let flen = file.seek(SeekFrom::End(0)).await?;
|
||||||
|
file.seek(SeekFrom::Start(0)).await?;
|
||||||
|
let mut pbr = PbFileReader::new(file).await;
|
||||||
|
pbr.read_header().await?;
|
||||||
|
let payload_type = pbr.payload_type().clone();
|
||||||
|
let res = pbr.read_msg().await?;
|
||||||
|
let mut file = pbr.into_file();
|
||||||
|
let res = if let Some(res) = res {
|
||||||
|
res
|
||||||
|
} else {
|
||||||
|
return Err(Error::with_msg_no_trace("no event read from file"));
|
||||||
|
};
|
||||||
|
if res.item.len() < 1 {
|
||||||
|
return Err(Error::with_msg_no_trace("no event read from file"));
|
||||||
|
}
|
||||||
|
let events_begin_pos = res.pos;
|
||||||
|
|
||||||
|
// * the search invariant is that the ts1 < beg and ts2 >= end
|
||||||
|
// * read some data from the end.
|
||||||
|
// * read some data from the begin.
|
||||||
|
// * extract events from begin and end.
|
||||||
|
// * check if the binary search invariant is already violated, in that case return.
|
||||||
|
// * otherwise, choose some spot in the middle, read there the next chunk.
|
||||||
|
// Then use the actual position of the found item!
|
||||||
|
let mut buf1 = vec![0; 1024 * 16];
|
||||||
|
let mut buf2 = vec![0; 1024 * 16];
|
||||||
|
let mut buf3 = vec![0; 1024 * 16];
|
||||||
|
|
||||||
|
let mut p1 = events_begin_pos;
|
||||||
|
let mut p2 = flen - buf2.len() as u64;
|
||||||
|
|
||||||
|
file.seek(SeekFrom::Start(p1 - 1)).await?;
|
||||||
|
file.read_exact(&mut buf1).await?;
|
||||||
|
file.seek(SeekFrom::Start(p2)).await?;
|
||||||
|
file.read_exact(&mut buf2).await?;
|
||||||
|
|
||||||
|
let evs1 = parse_all_ts(p1 - 1, &buf1, payload_type.clone(), year)?;
|
||||||
|
let evs2 = parse_all_ts(p2, &buf2, payload_type.clone(), year)?;
|
||||||
|
|
||||||
|
info!("...............................................................");
|
||||||
|
info!("evs1: {:?}", evs1);
|
||||||
|
info!("evs2: {:?}", evs2);
|
||||||
|
info!("p1: {}", p1);
|
||||||
|
info!("p2: {}", p2);
|
||||||
|
|
||||||
|
let tgt = evq.range.beg;
|
||||||
|
|
||||||
|
{
|
||||||
|
let ev = evs1.first().unwrap();
|
||||||
|
if ev.ts >= tgt {
|
||||||
|
file.seek(SeekFrom::Start(ev.pos)).await?;
|
||||||
|
let ret = PositionResult {
|
||||||
|
state: PositionState::Positioned,
|
||||||
|
file,
|
||||||
|
};
|
||||||
|
return Ok(ret);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let ev = evs2.last().unwrap();
|
||||||
|
if ev.ts < tgt {
|
||||||
|
file.seek(SeekFrom::Start(0)).await?;
|
||||||
|
let ret = PositionResult {
|
||||||
|
state: PositionState::NothingFound,
|
||||||
|
file,
|
||||||
|
};
|
||||||
|
return Ok(ret);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
p2 = evs2.last().unwrap().pos;
|
||||||
|
|
||||||
|
// TODO make sure that NL-delimited chunks have a max size.
|
||||||
|
loop {
|
||||||
|
info!("bsearch loop p1 {} p2 {}", p1, p2);
|
||||||
|
if p2 - p1 < 1024 * 128 {
|
||||||
|
// TODO switch here to linear search...
|
||||||
|
info!("switch to linear search in pos {}..{}", p1, p2);
|
||||||
|
return linear_search_2(file, evq, year, p1, p2, payload_type).await;
|
||||||
|
}
|
||||||
|
let p3 = (p2 + p1) / 2;
|
||||||
|
file.seek(SeekFrom::Start(p3)).await?;
|
||||||
|
file.read_exact(&mut buf3).await?;
|
||||||
|
let evs3 = parse_all_ts(p3, &buf3, payload_type.clone(), year)?;
|
||||||
|
let ev = evs3.first().unwrap();
|
||||||
|
if ev.ts < tgt {
|
||||||
|
info!("p3 {} ts: {} pos: {} branch A", p3, ev.ts, ev.pos);
|
||||||
|
p1 = ev.pos;
|
||||||
|
} else {
|
||||||
|
info!("p3 {} ts: {} pos: {} branch B", p3, ev.ts, ev.pos);
|
||||||
|
p2 = ev.pos;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn linear_search_2(
|
||||||
|
mut file: File,
|
||||||
|
evq: RawEventsQuery,
|
||||||
|
year: u32,
|
||||||
|
p1: u64,
|
||||||
|
p2: u64,
|
||||||
|
payload_type: PayloadType,
|
||||||
|
) -> Result<PositionResult, Error> {
|
||||||
|
eprintln!("linear_search_2");
|
||||||
|
file.seek(SeekFrom::Start(p1 - 1)).await?;
|
||||||
|
let mut buf = vec![0; (p2 - p1) as usize];
|
||||||
|
file.read_exact(&mut buf).await?;
|
||||||
|
let evs1 = parse_all_ts(p1 - 1, &buf, payload_type.clone(), year)?;
|
||||||
|
for ev in evs1 {
|
||||||
|
if ev.ts >= evq.range.beg {
|
||||||
|
info!("FOUND {:?}", ev);
|
||||||
|
file.seek(SeekFrom::Start(ev.pos)).await?;
|
||||||
|
let ret = PositionResult {
|
||||||
|
file,
|
||||||
|
state: PositionState::Positioned,
|
||||||
|
};
|
||||||
|
return Ok(ret);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(Error::with_msg_no_trace("linear_search_2 failed"))
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
fn events_item_to_framable(ei: EventsItem) -> Result<Box<dyn Framable + Send>, Error> {
|
fn events_item_to_framable(ei: EventsItem) -> Result<Box<dyn Framable + Send>, Error> {
|
||||||
match ei {
|
match ei {
|
||||||
@@ -527,15 +724,19 @@ pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result<C
|
|||||||
msgs.push(format!("got header {}", pbr.channel_name()));
|
msgs.push(format!("got header {}", pbr.channel_name()));
|
||||||
let ev = pbr.read_msg().await;
|
let ev = pbr.read_msg().await;
|
||||||
match ev {
|
match ev {
|
||||||
Ok(item) => {
|
Ok(Some(item)) => {
|
||||||
|
let item = item.item;
|
||||||
msgs.push(format!("got event {:?}", item));
|
msgs.push(format!("got event {:?}", item));
|
||||||
shape = Some(item.shape());
|
shape = Some(item.shape());
|
||||||
// These type mappings are defined by the protobuffer schema.
|
// These type mappings are defined by the protobuffer schema.
|
||||||
scalar_type = Some(item.scalar_type());
|
scalar_type = Some(item.scalar_type());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
msgs.push(format!("can not read event"));
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
msgs.push(format!("can not read event! {:?}", e));
|
msgs.push(format!("can not read event {:?}", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
msgs.push(format!("got header {}", pbr.channel_name()));
|
msgs.push(format!("got header {}", pbr.channel_name()));
|
||||||
|
|||||||
+58
-29
@@ -1,3 +1,5 @@
|
|||||||
|
pub mod multi;
|
||||||
|
|
||||||
use crate::events::parse_data_filename;
|
use crate::events::parse_data_filename;
|
||||||
use crate::generated::EPICSEvent::PayloadType;
|
use crate::generated::EPICSEvent::PayloadType;
|
||||||
use crate::{unescape_archapp_msg, EventsItem, PlainEvents, ScalarPlainEvents, WavePlainEvents};
|
use crate::{unescape_archapp_msg, EventsItem, PlainEvents, ScalarPlainEvents, WavePlainEvents};
|
||||||
@@ -27,6 +29,7 @@ pub struct PbFileReader {
|
|||||||
escbuf: Vec<u8>,
|
escbuf: Vec<u8>,
|
||||||
wp: usize,
|
wp: usize,
|
||||||
rp: usize,
|
rp: usize,
|
||||||
|
off: u64,
|
||||||
channel_name: String,
|
channel_name: String,
|
||||||
payload_type: PayloadType,
|
payload_type: PayloadType,
|
||||||
year: u32,
|
year: u32,
|
||||||
@@ -85,6 +88,11 @@ macro_rules! wave_parse {
|
|||||||
|
|
||||||
const MIN_BUF_FILL: usize = 1024 * 64;
|
const MIN_BUF_FILL: usize = 1024 * 64;
|
||||||
|
|
||||||
|
pub struct ReadMessageResult {
|
||||||
|
pub pos: u64,
|
||||||
|
pub item: EventsItem,
|
||||||
|
}
|
||||||
|
|
||||||
impl PbFileReader {
|
impl PbFileReader {
|
||||||
pub async fn new(file: File) -> Self {
|
pub async fn new(file: File) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -93,12 +101,27 @@ impl PbFileReader {
|
|||||||
escbuf: vec![],
|
escbuf: vec![],
|
||||||
wp: 0,
|
wp: 0,
|
||||||
rp: 0,
|
rp: 0,
|
||||||
|
off: 0,
|
||||||
channel_name: String::new(),
|
channel_name: String::new(),
|
||||||
payload_type: PayloadType::V4_GENERIC_BYTES,
|
payload_type: PayloadType::V4_GENERIC_BYTES,
|
||||||
year: 0,
|
year: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn into_file(self) -> File {
|
||||||
|
self.file
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn file(&mut self) -> &mut File {
|
||||||
|
&mut self.file
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reset_io(&mut self, off: u64) {
|
||||||
|
self.wp = 0;
|
||||||
|
self.rp = 0;
|
||||||
|
self.off = off;
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn read_header(&mut self) -> Result<(), Error> {
|
pub async fn read_header(&mut self) -> Result<(), Error> {
|
||||||
self.fill_buf().await?;
|
self.fill_buf().await?;
|
||||||
let k = self.find_next_nl()?;
|
let k = self.find_next_nl()?;
|
||||||
@@ -110,62 +133,73 @@ impl PbFileReader {
|
|||||||
self.channel_name = payload_info.get_pvname().into();
|
self.channel_name = payload_info.get_pvname().into();
|
||||||
self.payload_type = payload_info.get_field_type();
|
self.payload_type = payload_info.get_field_type();
|
||||||
self.year = payload_info.get_year() as u32;
|
self.year = payload_info.get_year() as u32;
|
||||||
|
self.off += k as u64 + 1 - self.rp as u64;
|
||||||
self.rp = k + 1;
|
self.rp = k + 1;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn read_msg(&mut self) -> Result<EventsItem, Error> {
|
pub async fn read_msg(&mut self) -> Result<Option<ReadMessageResult>, Error> {
|
||||||
self.fill_buf().await?;
|
self.fill_buf().await?;
|
||||||
let k = self.find_next_nl()?;
|
let k = if let Ok(k) = self.find_next_nl() {
|
||||||
|
k
|
||||||
|
} else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
let buf = &mut self.buf;
|
let buf = &mut self.buf;
|
||||||
let m = mem::replace(&mut self.escbuf, vec![]);
|
let m = mem::replace(&mut self.escbuf, vec![]);
|
||||||
let m = unescape_archapp_msg(&buf[self.rp..k], m)?;
|
let m = unescape_archapp_msg(&buf[self.rp..k], m)?;
|
||||||
self.escbuf = m;
|
self.escbuf = m;
|
||||||
let m = &self.escbuf;
|
let ei = Self::parse_buffer(&self.escbuf, self.payload_type.clone(), self.year)?;
|
||||||
|
let ret = ReadMessageResult {
|
||||||
|
pos: self.off,
|
||||||
|
item: ei,
|
||||||
|
};
|
||||||
|
self.off += k as u64 + 1 - self.rp as u64;
|
||||||
|
self.rp = k + 1;
|
||||||
|
Ok(Some(ret))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn parse_buffer(m: &[u8], payload_type: PayloadType, year: u32) -> Result<EventsItem, Error> {
|
||||||
use PayloadType::*;
|
use PayloadType::*;
|
||||||
let ei = match self.payload_type {
|
let ei = match payload_type {
|
||||||
SCALAR_BYTE => parse_scalar_byte(m, self.year)?,
|
SCALAR_BYTE => parse_scalar_byte(m, year)?,
|
||||||
SCALAR_ENUM => {
|
SCALAR_ENUM => {
|
||||||
scalar_parse!(m, self.year, ScalarEnum, Int, i32)
|
scalar_parse!(m, year, ScalarEnum, Int, i32)
|
||||||
}
|
}
|
||||||
SCALAR_SHORT => {
|
SCALAR_SHORT => {
|
||||||
scalar_parse!(m, self.year, ScalarShort, Short, i16)
|
scalar_parse!(m, year, ScalarShort, Short, i16)
|
||||||
}
|
}
|
||||||
SCALAR_INT => {
|
SCALAR_INT => {
|
||||||
scalar_parse!(m, self.year, ScalarInt, Int, i32)
|
scalar_parse!(m, year, ScalarInt, Int, i32)
|
||||||
}
|
}
|
||||||
SCALAR_FLOAT => {
|
SCALAR_FLOAT => {
|
||||||
scalar_parse!(m, self.year, ScalarFloat, Float, f32)
|
scalar_parse!(m, year, ScalarFloat, Float, f32)
|
||||||
}
|
}
|
||||||
SCALAR_DOUBLE => {
|
SCALAR_DOUBLE => {
|
||||||
scalar_parse!(m, self.year, ScalarDouble, Double, f64)
|
scalar_parse!(m, year, ScalarDouble, Double, f64)
|
||||||
}
|
}
|
||||||
WAVEFORM_BYTE => {
|
WAVEFORM_BYTE => {
|
||||||
wave_parse!(m, self.year, VectorChar, Byte, i8)
|
wave_parse!(m, year, VectorChar, Byte, i8)
|
||||||
}
|
}
|
||||||
WAVEFORM_SHORT => {
|
WAVEFORM_SHORT => {
|
||||||
wave_parse!(m, self.year, VectorShort, Short, i16)
|
wave_parse!(m, year, VectorShort, Short, i16)
|
||||||
}
|
}
|
||||||
WAVEFORM_ENUM => {
|
WAVEFORM_ENUM => {
|
||||||
wave_parse!(m, self.year, VectorEnum, Int, i32)
|
wave_parse!(m, year, VectorEnum, Int, i32)
|
||||||
}
|
}
|
||||||
WAVEFORM_INT => {
|
WAVEFORM_INT => {
|
||||||
wave_parse!(m, self.year, VectorInt, Int, i32)
|
wave_parse!(m, year, VectorInt, Int, i32)
|
||||||
}
|
}
|
||||||
WAVEFORM_FLOAT => {
|
WAVEFORM_FLOAT => {
|
||||||
wave_parse!(m, self.year, VectorFloat, Float, f32)
|
wave_parse!(m, year, VectorFloat, Float, f32)
|
||||||
}
|
}
|
||||||
WAVEFORM_DOUBLE => {
|
WAVEFORM_DOUBLE => {
|
||||||
wave_parse!(m, self.year, VectorDouble, Double, f64)
|
wave_parse!(m, year, VectorDouble, Double, f64)
|
||||||
}
|
}
|
||||||
SCALAR_STRING | WAVEFORM_STRING | V4_GENERIC_BYTES => {
|
SCALAR_STRING | WAVEFORM_STRING | V4_GENERIC_BYTES => {
|
||||||
return Err(Error::with_msg_no_trace(format!(
|
return Err(Error::with_msg_no_trace(format!("not supported: {:?}", payload_type)));
|
||||||
"not supported: {:?}",
|
|
||||||
self.payload_type
|
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
self.rp = k + 1;
|
|
||||||
Ok(ei)
|
Ok(ei)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -176,9 +210,6 @@ impl PbFileReader {
|
|||||||
if self.rp + MIN_BUF_FILL >= self.buf.len() {
|
if self.rp + MIN_BUF_FILL >= self.buf.len() {
|
||||||
let n = self.wp - self.rp;
|
let n = self.wp - self.rp;
|
||||||
self.buf.copy_within(self.rp..self.rp + n, 0);
|
self.buf.copy_within(self.rp..self.rp + n, 0);
|
||||||
//for i in 0..n {
|
|
||||||
// self.buf[i] = self.buf[self.rp + i];
|
|
||||||
//}
|
|
||||||
self.rp = 0;
|
self.rp = 0;
|
||||||
self.wp = n;
|
self.wp = n;
|
||||||
}
|
}
|
||||||
@@ -205,6 +236,7 @@ impl PbFileReader {
|
|||||||
k += 1;
|
k += 1;
|
||||||
}
|
}
|
||||||
if k == self.wp {
|
if k == self.wp {
|
||||||
|
// TODO test whether with_msg_no_trace makes difference.
|
||||||
return Err(Error::with_msg("no nl in pb file"));
|
return Err(Error::with_msg("no nl in pb file"));
|
||||||
}
|
}
|
||||||
Ok(k)
|
Ok(k)
|
||||||
@@ -442,7 +474,8 @@ pub async fn scan_files_inner(
|
|||||||
if false {
|
if false {
|
||||||
dbconn::insert_channel(channel_path.into(), ndi.facility, &dbc).await?;
|
dbconn::insert_channel(channel_path.into(), ndi.facility, &dbc).await?;
|
||||||
}
|
}
|
||||||
if let Ok(msg) = pbr.read_msg().await {
|
if let Ok(Some(msg)) = pbr.read_msg().await {
|
||||||
|
let msg = msg.item;
|
||||||
lru.insert(channel_path);
|
lru.insert(channel_path);
|
||||||
{
|
{
|
||||||
tx.send(Ok(Box::new(serde_json::to_value(format!(
|
tx.send(Ok(Box::new(serde_json::to_value(format!(
|
||||||
@@ -451,10 +484,6 @@ pub async fn scan_files_inner(
|
|||||||
msg.variant_name()
|
msg.variant_name()
|
||||||
))?) as ItemSerBox))
|
))?) as ItemSerBox))
|
||||||
.await?;
|
.await?;
|
||||||
/*waves_found += 1;
|
|
||||||
if waves_found >= 20 {
|
|
||||||
break;
|
|
||||||
}*/
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,50 @@
|
|||||||
|
use crate::generated::EPICSEvent::PayloadType;
|
||||||
|
use crate::parse::PbFileReader;
|
||||||
|
use err::Error;
|
||||||
|
use items::{WithLen, WithTimestamps};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct PosTs {
|
||||||
|
pub pos: u64,
|
||||||
|
pub ts: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn parse_all_ts(off: u64, buf: &[u8], payload_type: PayloadType, year: u32) -> Result<Vec<PosTs>, Error> {
|
||||||
|
let mut ret = vec![];
|
||||||
|
let mut i1 = 0;
|
||||||
|
let mut i2 = usize::MAX;
|
||||||
|
loop {
|
||||||
|
if i1 >= buf.len() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if buf[i1] == 10 {
|
||||||
|
if i2 == usize::MAX {
|
||||||
|
i2 = i1;
|
||||||
|
} else {
|
||||||
|
// Have a chunk from i2..i1
|
||||||
|
match PbFileReader::parse_buffer(&buf[i2 + 1..i1], payload_type.clone(), year) {
|
||||||
|
Ok(k) => {
|
||||||
|
if k.len() != 1 {
|
||||||
|
return Err(Error::with_msg_no_trace(format!(
|
||||||
|
"parsed buffer contained {} events",
|
||||||
|
k.len()
|
||||||
|
)));
|
||||||
|
} else {
|
||||||
|
let h = PosTs {
|
||||||
|
pos: off + i2 as u64 + 1,
|
||||||
|
ts: k.ts(0),
|
||||||
|
};
|
||||||
|
ret.push(h);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
// TODO ignore except if it's the last chunk.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
i2 = i1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
i1 += 1;
|
||||||
|
}
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
@@ -40,6 +40,7 @@ fn test_cluster() -> Cluster {
|
|||||||
port: 8360 + id as u16,
|
port: 8360 + id as u16,
|
||||||
port_raw: 8360 + id as u16 + 100,
|
port_raw: 8360 + id as u16 + 100,
|
||||||
data_base_path: format!("../tmpdata/node{:02}", id).into(),
|
data_base_path: format!("../tmpdata/node{:02}", id).into(),
|
||||||
|
cache_base_path: format!("../tmpdata/node{:02}", id).into(),
|
||||||
ksprefix: "ks".into(),
|
ksprefix: "ks".into(),
|
||||||
split: id,
|
split: id,
|
||||||
backend: "testbackend".into(),
|
backend: "testbackend".into(),
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ pub fn make_test_node(id: u32) -> Node {
|
|||||||
port: 8800 + id as u16,
|
port: 8800 + id as u16,
|
||||||
port_raw: 8800 + id as u16 + 100,
|
port_raw: 8800 + id as u16 + 100,
|
||||||
data_base_path: format!("../tmpdata/node{:02}", id).into(),
|
data_base_path: format!("../tmpdata/node{:02}", id).into(),
|
||||||
|
cache_base_path: format!("../tmpdata/node{:02}", id).into(),
|
||||||
split: id,
|
split: id,
|
||||||
ksprefix: "ks".into(),
|
ksprefix: "ks".into(),
|
||||||
backend: "testbackend".into(),
|
backend: "testbackend".into(),
|
||||||
|
|||||||
+1
-1
@@ -138,7 +138,7 @@ impl CacheFileDesc {
|
|||||||
let hc = self.hash_channel();
|
let hc = self.hash_channel();
|
||||||
node_config
|
node_config
|
||||||
.node
|
.node
|
||||||
.data_base_path
|
.cache_base_path
|
||||||
.join("cache")
|
.join("cache")
|
||||||
.join(&hc[0..3])
|
.join(&hc[0..3])
|
||||||
.join(&hc[3..6])
|
.join(&hc[3..6])
|
||||||
|
|||||||
@@ -78,6 +78,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
|||||||
port_raw: 7780 + i1 as u16 + 100,
|
port_raw: 7780 + i1 as u16 + 100,
|
||||||
split: i1,
|
split: i1,
|
||||||
data_base_path: data_base_path.join(format!("node{:02}", i1)),
|
data_base_path: data_base_path.join(format!("node{:02}", i1)),
|
||||||
|
cache_base_path: data_base_path.join(format!("node{:02}", i1)),
|
||||||
ksprefix: ksprefix.clone(),
|
ksprefix: ksprefix.clone(),
|
||||||
backend: "testbackend".into(),
|
backend: "testbackend".into(),
|
||||||
bin_grain_kind: 0,
|
bin_grain_kind: 0,
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ fn ca_connect_1() {
|
|||||||
backend: "".into(),
|
backend: "".into(),
|
||||||
split: 0,
|
split: 0,
|
||||||
data_base_path: "".into(),
|
data_base_path: "".into(),
|
||||||
|
cache_base_path: "".into(),
|
||||||
listen: "".into(),
|
listen: "".into(),
|
||||||
ksprefix: "".into(),
|
ksprefix: "".into(),
|
||||||
archiver_appliance: None,
|
archiver_appliance: None,
|
||||||
|
|||||||
@@ -129,6 +129,7 @@ pub struct Node {
|
|||||||
pub port_raw: u16,
|
pub port_raw: u16,
|
||||||
pub split: u32,
|
pub split: u32,
|
||||||
pub data_base_path: PathBuf,
|
pub data_base_path: PathBuf,
|
||||||
|
pub cache_base_path: PathBuf,
|
||||||
pub ksprefix: String,
|
pub ksprefix: String,
|
||||||
pub backend: String,
|
pub backend: String,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
|||||||
Reference in New Issue
Block a user