Scan indexfiles and channels
This commit is contained in:
File diff suppressed because it is too large
Load Diff
208
archapp/src/archeng/datablock.rs
Normal file
208
archapp/src/archeng/datablock.rs
Normal file
@@ -0,0 +1,208 @@
|
||||
use crate::archeng::{
|
||||
read_exact, read_string, readf64, readu16, readu32, seek, RingBuf, StatsChannel, EPICS_EPOCH_OFFSET,
|
||||
};
|
||||
use crate::eventsitem::EventsItem;
|
||||
use crate::plainevents::{PlainEvents, ScalarPlainEvents};
|
||||
use err::Error;
|
||||
use items::eventvalues::EventValues;
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::{NanoRange, Nanos};
|
||||
use std::convert::TryInto;
|
||||
use std::io::SeekFrom;
|
||||
use tokio::fs::File;
|
||||
|
||||
use super::indextree::DataheaderPos;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum DbrType {
|
||||
DbrString = 0,
|
||||
DbrInt = 1,
|
||||
DbrStsFloat = 9,
|
||||
DbrTimeDouble = 20,
|
||||
}
|
||||
|
||||
impl DbrType {
|
||||
fn from_u16(k: u16) -> Result<Self, Error> {
|
||||
use DbrType::*;
|
||||
let res = match k {
|
||||
0 => DbrString,
|
||||
1 => DbrInt,
|
||||
9 => DbrStsFloat,
|
||||
20 => DbrTimeDouble,
|
||||
_ => {
|
||||
let msg = format!("not a valid/supported dbr type: {}", k);
|
||||
return Err(Error::with_msg_no_trace(msg));
|
||||
}
|
||||
};
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn byte_len(&self) -> usize {
|
||||
use DbrType::*;
|
||||
match self {
|
||||
DbrString => 0,
|
||||
DbrInt => 4,
|
||||
DbrStsFloat => 1,
|
||||
DbrTimeDouble => 16,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DatafileHeader {
|
||||
pos: DataheaderPos,
|
||||
dir_offset: u32,
|
||||
// Should be absolute file position of the next data header
|
||||
// together with `fname_next`.
|
||||
// But unfortunately not always set?
|
||||
next_offset: u32,
|
||||
prev_offset: u32,
|
||||
curr_offset: u32,
|
||||
num_samples: u32,
|
||||
ctrl_info_offset: u32,
|
||||
buf_size: u32,
|
||||
buf_free: u32,
|
||||
dbr_type: DbrType,
|
||||
dbr_count: usize,
|
||||
period: f64,
|
||||
ts_beg: Nanos,
|
||||
ts_end: Nanos,
|
||||
ts_next_file: Nanos,
|
||||
fname_next: String,
|
||||
fname_prev: String,
|
||||
}
|
||||
|
||||
const DATA_HEADER_LEN_ON_DISK: usize = 72 + 40 + 40;
|
||||
|
||||
pub async fn read_datafile_header(
|
||||
file: &mut File,
|
||||
pos: DataheaderPos,
|
||||
stats: &StatsChannel,
|
||||
) -> Result<DatafileHeader, Error> {
|
||||
seek(file, SeekFrom::Start(pos.0), stats).await?;
|
||||
let mut rb = RingBuf::new();
|
||||
rb.fill_min(file, DATA_HEADER_LEN_ON_DISK, stats).await?;
|
||||
let buf = rb.data();
|
||||
let dir_offset = readu32(buf, 0);
|
||||
let next_offset = readu32(buf, 4);
|
||||
let prev_offset = readu32(buf, 8);
|
||||
let curr_offset = readu32(buf, 12);
|
||||
let num_samples = readu32(buf, 16);
|
||||
let ctrl_info_offset = readu32(buf, 20);
|
||||
let buf_size = readu32(buf, 24);
|
||||
let buf_free = readu32(buf, 28);
|
||||
let dbr_type = DbrType::from_u16(readu16(buf, 32))?;
|
||||
let dbr_count = readu16(buf, 34);
|
||||
// 4 bytes padding.
|
||||
let period = readf64(buf, 40);
|
||||
let ts1a = readu32(buf, 48);
|
||||
let ts1b = readu32(buf, 52);
|
||||
let ts2a = readu32(buf, 56);
|
||||
let ts2b = readu32(buf, 60);
|
||||
let ts3a = readu32(buf, 64);
|
||||
let ts3b = readu32(buf, 68);
|
||||
let ts_beg = if ts1a != 0 || ts1b != 0 {
|
||||
ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let ts_end = if ts3a != 0 || ts3b != 0 {
|
||||
ts3a as u64 * SEC + ts3b as u64 + EPICS_EPOCH_OFFSET
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let ts_next_file = if ts2a != 0 || ts2b != 0 {
|
||||
ts2a as u64 * SEC + ts2b as u64 + EPICS_EPOCH_OFFSET
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let fname_prev = read_string(&buf[72..112])?;
|
||||
let fname_next = read_string(&buf[112..152])?;
|
||||
let ret = DatafileHeader {
|
||||
pos,
|
||||
dir_offset,
|
||||
next_offset,
|
||||
prev_offset,
|
||||
curr_offset,
|
||||
num_samples,
|
||||
ctrl_info_offset,
|
||||
buf_size,
|
||||
buf_free,
|
||||
dbr_type,
|
||||
dbr_count: dbr_count as usize,
|
||||
period,
|
||||
ts_beg: Nanos { ns: ts_beg },
|
||||
ts_end: Nanos { ns: ts_end },
|
||||
ts_next_file: Nanos { ns: ts_next_file },
|
||||
fname_next,
|
||||
fname_prev,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn read_data_1(
|
||||
file: &mut File,
|
||||
datafile_header: &DatafileHeader,
|
||||
range: NanoRange,
|
||||
_expand: bool,
|
||||
stats: &StatsChannel,
|
||||
) -> Result<EventsItem, Error> {
|
||||
// TODO handle expand mode
|
||||
let dhpos = datafile_header.pos.0 + DATA_HEADER_LEN_ON_DISK as u64;
|
||||
seek(file, SeekFrom::Start(dhpos), stats).await?;
|
||||
let res = match &datafile_header.dbr_type {
|
||||
DbrType::DbrTimeDouble => {
|
||||
if datafile_header.dbr_count == 1 {
|
||||
trace!("~~~~~~~~~~~~~~~~~~~~~ read scalar DbrTimeDouble");
|
||||
let mut evs = EventValues {
|
||||
tss: vec![],
|
||||
values: vec![],
|
||||
};
|
||||
let n1 = datafile_header.num_samples as usize;
|
||||
//let n2 = datafile_header.dbr_type.byte_len();
|
||||
let n2 = 2 + 2 + 4 + 4 + (4) + 8;
|
||||
let n3 = n1 * n2;
|
||||
let mut buf = vec![0; n3];
|
||||
read_exact(file, &mut buf, stats).await?;
|
||||
let mut p1 = 0;
|
||||
let mut ntot = 0;
|
||||
while p1 < n3 - n2 {
|
||||
let _status = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap());
|
||||
p1 += 2;
|
||||
let _severity = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap());
|
||||
p1 += 2;
|
||||
let ts1a = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap());
|
||||
p1 += 4;
|
||||
let ts1b = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap());
|
||||
p1 += 4;
|
||||
let ts1 = ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET;
|
||||
p1 += 4;
|
||||
let value = f64::from_be_bytes(buf[p1..p1 + 8].try_into().unwrap());
|
||||
p1 += 8;
|
||||
ntot += 1;
|
||||
if ts1 >= range.beg && ts1 < range.end {
|
||||
evs.tss.push(ts1);
|
||||
evs.values.push(value);
|
||||
}
|
||||
}
|
||||
info!("parsed block with {} / {} events", ntot, evs.tss.len());
|
||||
let evs = ScalarPlainEvents::Double(evs);
|
||||
let plain = PlainEvents::Scalar(evs);
|
||||
let item = EventsItem::Plain(plain);
|
||||
item
|
||||
} else {
|
||||
let msg = format!("dbr_count {:?} not yet supported", datafile_header.dbr_count);
|
||||
error!("{}", msg);
|
||||
return Err(Error::with_msg_no_trace(msg));
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
let msg = format!("Type {:?} not yet supported", datafile_header.dbr_type);
|
||||
error!("{}", msg);
|
||||
return Err(Error::with_msg_no_trace(msg));
|
||||
}
|
||||
};
|
||||
Ok(res)
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::archeng::{
|
||||
index_file_path_list, open_read, read_channel, read_data_1, read_datafile_header, read_index_datablockref,
|
||||
search_record, search_record_expand, StatsChannel,
|
||||
};
|
||||
use crate::archeng::datablock::{read_data_1, read_datafile_header};
|
||||
use crate::archeng::indexfiles::index_file_path_list;
|
||||
use crate::archeng::indextree::{read_channel, read_datablockref, search_record, search_record_expand, DataheaderPos};
|
||||
use crate::archeng::{open_read, StatsChannel};
|
||||
use crate::eventsitem::EventsItem;
|
||||
use crate::storagemerge::StorageMerge;
|
||||
use crate::timed::Timed;
|
||||
@@ -10,8 +10,9 @@ use err::Error;
|
||||
use futures_core::{Future, Stream};
|
||||
use futures_util::{FutureExt, StreamExt};
|
||||
use items::{inspect_timestamps, RangeCompletableItem, Sitemty, StreamItem, WithLen};
|
||||
use netpod::{log::*, DataHeaderPos, FilePos, Nanos};
|
||||
use netpod::log::*;
|
||||
use netpod::{Channel, NanoRange};
|
||||
use netpod::{FilePos, Nanos};
|
||||
use std::collections::VecDeque;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
@@ -63,129 +64,106 @@ async fn datablock_stream_inner_single_index(
|
||||
let mut events_tot = 0;
|
||||
let stats = &StatsChannel::new(tx.clone());
|
||||
debug!("try to open index file: {:?}", index_path);
|
||||
let res = open_read(index_path.clone(), stats).await;
|
||||
debug!("opened index file: {:?} {:?}", index_path, res);
|
||||
match res {
|
||||
Ok(mut index_file) => {
|
||||
if let Some(basics) = read_channel(&mut index_file, channel.name(), stats).await? {
|
||||
let beg = Nanos { ns: range.beg };
|
||||
let mut expand_beg = expand;
|
||||
let mut index_ts_max = 0;
|
||||
let mut search_ts = beg.clone();
|
||||
let mut last_data_file_path = PathBuf::new();
|
||||
let mut last_data_file_pos = DataHeaderPos(0);
|
||||
loop {
|
||||
let timed_search = Timed::new("search next record");
|
||||
let (res, _stats) = if expand_beg {
|
||||
// TODO even though this is an entry in the index, it may reference
|
||||
// non-existent blocks.
|
||||
// Therefore, lower expand_beg flag at some later stage only if we've really
|
||||
// found at least one event in the block.
|
||||
expand_beg = false;
|
||||
search_record_expand(
|
||||
&mut index_file,
|
||||
basics.rtree_m,
|
||||
basics.rtree_start_pos,
|
||||
search_ts,
|
||||
stats,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
search_record(
|
||||
&mut index_file,
|
||||
basics.rtree_m,
|
||||
basics.rtree_start_pos,
|
||||
search_ts,
|
||||
stats,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
drop(timed_search);
|
||||
if let Some(nrec) = res {
|
||||
let rec = nrec.rec();
|
||||
trace!("found record: {:?}", rec);
|
||||
let pos = FilePos { pos: rec.child_or_id };
|
||||
// TODO rename Datablock? → IndexNodeDatablock
|
||||
trace!("READ Datablock FROM {:?}\n", pos);
|
||||
let datablock = read_index_datablockref(&mut index_file, pos, stats).await?;
|
||||
trace!("Datablock: {:?}\n", datablock);
|
||||
let data_path = index_path.parent().unwrap().join(datablock.file_name());
|
||||
if data_path == last_data_file_path && datablock.data_header_pos() == last_data_file_pos {
|
||||
debug!("skipping because it is the same block");
|
||||
} else {
|
||||
trace!("try to open data_path: {:?}", data_path);
|
||||
match open_read(data_path.clone(), stats).await {
|
||||
Ok(mut data_file) => {
|
||||
let datafile_header =
|
||||
read_datafile_header(&mut data_file, datablock.data_header_pos(), stats)
|
||||
.await?;
|
||||
trace!("datafile_header -------------- HEADER\n{:?}", datafile_header);
|
||||
let events =
|
||||
read_data_1(&mut data_file, &datafile_header, range.clone(), expand_beg, stats)
|
||||
.await?;
|
||||
if false {
|
||||
let msg = inspect_timestamps(&events, range.clone());
|
||||
trace!("datablock_stream_inner_single_index read_data_1\n{}", msg);
|
||||
}
|
||||
{
|
||||
let mut ts_max = 0;
|
||||
use items::WithTimestamps;
|
||||
for i in 0..events.len() {
|
||||
let ts = events.ts(i);
|
||||
if ts < ts_max {
|
||||
error!("unordered event within block at ts {}", ts);
|
||||
break;
|
||||
} else {
|
||||
ts_max = ts;
|
||||
}
|
||||
if ts < index_ts_max {
|
||||
error!(
|
||||
"unordered event in index branch ts {} index_ts_max {}",
|
||||
ts, index_ts_max
|
||||
);
|
||||
break;
|
||||
} else {
|
||||
index_ts_max = ts;
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("Was able to read data: {} events", events.len());
|
||||
events_tot += events.len() as u64;
|
||||
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(events)));
|
||||
tx.send(item).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
// That's fine. The index mentions lots of datafiles which got purged already.
|
||||
trace!("can not find file mentioned in index: {:?} {}", data_path, e);
|
||||
}
|
||||
};
|
||||
}
|
||||
if datablock.next != 0 {
|
||||
warn!("MAYBE TODO? datablock.next != 0: {:?}", datablock);
|
||||
}
|
||||
last_data_file_path = data_path;
|
||||
last_data_file_pos = datablock.data_header_pos();
|
||||
// TODO anything special to do in expand mode?
|
||||
search_ts.ns = rec.ts2.ns;
|
||||
} else {
|
||||
warn!("nothing found, break");
|
||||
break;
|
||||
}
|
||||
if events_tot >= max_events {
|
||||
warn!("reached events_tot {} max_events {}", events_tot, max_events);
|
||||
break;
|
||||
}
|
||||
}
|
||||
let index_file = open_read(index_path.clone(), stats).await?;
|
||||
let mut file2 = open_read(index_path.clone(), stats).await?;
|
||||
debug!("opened index file: {:?} {:?}", index_path, index_file);
|
||||
if let Some(basics) = read_channel(index_path.clone(), index_file, channel.name(), stats).await? {
|
||||
let beg = Nanos { ns: range.beg };
|
||||
let mut expand_beg = expand;
|
||||
let mut index_ts_max = 0;
|
||||
let mut search_ts = beg.clone();
|
||||
let mut last_data_file_path = PathBuf::new();
|
||||
let mut last_data_file_pos = DataheaderPos(0);
|
||||
loop {
|
||||
let timed_search = Timed::new("search next record");
|
||||
let (res, _stats) = if expand_beg {
|
||||
// TODO even though this is an entry in the index, it may reference
|
||||
// non-existent blocks.
|
||||
// Therefore, lower expand_beg flag at some later stage only if we've really
|
||||
// found at least one event in the block.
|
||||
expand_beg = false;
|
||||
search_record_expand(&mut file2, basics.rtree_m, basics.rtree_start_pos, search_ts, stats).await?
|
||||
} else {
|
||||
warn!("can not read channel basics from {:?}", index_path);
|
||||
search_record(&mut file2, basics.rtree_m, basics.rtree_start_pos, search_ts, stats).await?
|
||||
};
|
||||
drop(timed_search);
|
||||
if let Some(nrec) = res {
|
||||
let rec = nrec.rec();
|
||||
trace!("found record: {:?}", rec);
|
||||
let pos = FilePos { pos: rec.child_or_id };
|
||||
// TODO rename Datablock? → IndexNodeDatablock
|
||||
trace!("READ Datablock FROM {:?}\n", pos);
|
||||
let datablock = read_datablockref(&mut file2, pos, basics.hver(), stats).await?;
|
||||
trace!("Datablock: {:?}\n", datablock);
|
||||
let data_path = index_path.parent().unwrap().join(datablock.file_name());
|
||||
if data_path == last_data_file_path && datablock.data_header_pos() == last_data_file_pos {
|
||||
debug!("skipping because it is the same block");
|
||||
} else {
|
||||
trace!("try to open data_path: {:?}", data_path);
|
||||
match open_read(data_path.clone(), stats).await {
|
||||
Ok(mut data_file) => {
|
||||
let datafile_header =
|
||||
read_datafile_header(&mut data_file, datablock.data_header_pos(), stats).await?;
|
||||
trace!("datafile_header -------------- HEADER\n{:?}", datafile_header);
|
||||
let events =
|
||||
read_data_1(&mut data_file, &datafile_header, range.clone(), expand_beg, stats).await?;
|
||||
if false {
|
||||
let msg = inspect_timestamps(&events, range.clone());
|
||||
trace!("datablock_stream_inner_single_index read_data_1\n{}", msg);
|
||||
}
|
||||
{
|
||||
let mut ts_max = 0;
|
||||
use items::WithTimestamps;
|
||||
for i in 0..events.len() {
|
||||
let ts = events.ts(i);
|
||||
if ts < ts_max {
|
||||
error!("unordered event within block at ts {}", ts);
|
||||
break;
|
||||
} else {
|
||||
ts_max = ts;
|
||||
}
|
||||
if ts < index_ts_max {
|
||||
error!(
|
||||
"unordered event in index branch ts {} index_ts_max {}",
|
||||
ts, index_ts_max
|
||||
);
|
||||
break;
|
||||
} else {
|
||||
index_ts_max = ts;
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("Was able to read data: {} events", events.len());
|
||||
events_tot += events.len() as u64;
|
||||
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(events)));
|
||||
tx.send(item).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
// That's fine. The index mentions lots of datafiles which got purged already.
|
||||
trace!("can not find file mentioned in index: {:?} {}", data_path, e);
|
||||
}
|
||||
};
|
||||
}
|
||||
if datablock.next().0 != 0 {
|
||||
warn!("MAYBE TODO? datablock.next != 0: {:?}", datablock);
|
||||
}
|
||||
last_data_file_path = data_path;
|
||||
last_data_file_pos = datablock.data_header_pos();
|
||||
// TODO anything special to do in expand mode?
|
||||
search_ts.ns = rec.ts2.ns;
|
||||
} else {
|
||||
warn!("nothing found, break");
|
||||
break;
|
||||
}
|
||||
if events_tot >= max_events {
|
||||
warn!("reached events_tot {} max_events {}", events_tot, max_events);
|
||||
break;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("can not find index file at {:?}", index_path);
|
||||
Err(Error::with_msg_no_trace(format!("can not open index file: {}", e)))
|
||||
}
|
||||
} else {
|
||||
warn!("can not read channel basics from {:?}", index_path);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn datablock_stream_inner(
|
||||
|
||||
0
archapp/src/archeng/diskio.rs
Normal file
0
archapp/src/archeng/diskio.rs
Normal file
@@ -1,17 +1,19 @@
|
||||
use crate::archeng::{open_read, read, StatsChannel};
|
||||
use crate::timed::Timed;
|
||||
use crate::wrap_task;
|
||||
use async_channel::Receiver;
|
||||
use err::Error;
|
||||
use futures_core::Future;
|
||||
use futures_core::Stream;
|
||||
use futures_core::{Future, Stream};
|
||||
use futures_util::stream::unfold;
|
||||
use futures_util::FutureExt;
|
||||
use netpod::log::*;
|
||||
use netpod::ChannelArchiver;
|
||||
use netpod::Database;
|
||||
use netpod::{Channel, ChannelArchiver, Database};
|
||||
use regex::Regex;
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::fs::read_dir;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_postgres::Client as PgClient;
|
||||
|
||||
pub fn list_index_files(node: &ChannelArchiver) -> Receiver<Result<PathBuf, Error>> {
|
||||
@@ -174,8 +176,36 @@ impl ScanIndexFiles {
|
||||
info!("collected {} level 1 paths", paths.len());
|
||||
let dbc = database_connect(&self.conf.database).await?;
|
||||
for p in paths {
|
||||
let sql = "insert into indexfiles (path) values ($1) on conflict do nothing";
|
||||
dbc.query(sql, &[&p.to_string_lossy()]).await?;
|
||||
let ps = p.to_string_lossy();
|
||||
let rows = dbc
|
||||
.query("select rowid from indexfiles where path = $1", &[&ps])
|
||||
.await?;
|
||||
let rid: i64 = if rows.len() == 0 {
|
||||
let rows = dbc
|
||||
.query(
|
||||
"insert into indexfiles (path) values ($1) on conflict do nothing returning rowid",
|
||||
&[&ps],
|
||||
)
|
||||
.await?;
|
||||
if rows.len() == 0 {
|
||||
error!("insert failed, maybe concurrent insert?");
|
||||
// TODO try this channel again? or the other process handled it?
|
||||
err::todoval()
|
||||
} else if rows.len() == 1 {
|
||||
let rid = rows[0].try_get(0)?;
|
||||
info!("insert done: {}", rid);
|
||||
rid
|
||||
} else {
|
||||
return Err(Error::with_msg("not unique"));
|
||||
}
|
||||
} else if rows.len() == 1 {
|
||||
let rid = rows[0].try_get(0)?;
|
||||
info!("select done: {}", rid);
|
||||
rid
|
||||
} else {
|
||||
return Err(Error::with_msg("not unique"));
|
||||
};
|
||||
let _ = rid;
|
||||
}
|
||||
self.steps = ScanIndexFilesSteps::Done;
|
||||
let item = format!("level 1 done");
|
||||
@@ -256,6 +286,7 @@ pub fn unfold2(_conf: ChannelArchiver) -> () {
|
||||
enum ScanChannelsSteps {
|
||||
Start,
|
||||
SelectIndexFile,
|
||||
ReadChannels(Vec<String>),
|
||||
Done,
|
||||
}
|
||||
|
||||
@@ -288,8 +319,66 @@ impl ScanChannels {
|
||||
for row in rows {
|
||||
paths.push(row.get::<_, String>(0));
|
||||
}
|
||||
let item = format!("SelectIndexFile {:?}", paths);
|
||||
self.steps = ReadChannels(paths);
|
||||
Ok(Some((item, self)))
|
||||
}
|
||||
ReadChannels(mut paths) => {
|
||||
// TODO stats
|
||||
let stats = &StatsChannel::dummy();
|
||||
let dbc = database_connect(&self.conf.database).await?;
|
||||
if let Some(path) = paths.pop() {
|
||||
let rows = dbc
|
||||
.query("select rowid from indexfiles where path = $1", &[&path])
|
||||
.await?;
|
||||
if rows.len() == 1 {
|
||||
let indexfile_rid: i64 = rows[0].try_get(0)?;
|
||||
let mut basics = super::indextree::IndexFileBasics::from_path(path, stats).await?;
|
||||
let entries = basics.all_channel_entries(stats).await?;
|
||||
for entry in entries {
|
||||
let rows = dbc
|
||||
.query("select rowid from channels where name = $1", &[&entry.channel_name()])
|
||||
.await?;
|
||||
let rid: i64 = if rows.len() == 0 {
|
||||
let rows = dbc
|
||||
.query(
|
||||
"insert into channels (name) values ($1) on conflict do nothing returning rowid",
|
||||
&[&entry.channel_name()],
|
||||
)
|
||||
.await?;
|
||||
if rows.len() == 0 {
|
||||
error!("insert failed, maybe concurrent insert?");
|
||||
// TODO try this channel again? or the other process handled it?
|
||||
err::todoval()
|
||||
} else if rows.len() == 1 {
|
||||
let rid = rows[0].try_get(0)?;
|
||||
info!("insert done: {}", rid);
|
||||
rid
|
||||
} else {
|
||||
return Err(Error::with_msg("not unique"));
|
||||
}
|
||||
} else if rows.len() == 1 {
|
||||
let rid = rows[0].try_get(0)?;
|
||||
info!("select done: {}", rid);
|
||||
rid
|
||||
} else {
|
||||
return Err(Error::with_msg("not unique"));
|
||||
};
|
||||
dbc.query(
|
||||
"insert into channel_index_map (channel, index) values ($1, $2) on conflict do nothing",
|
||||
&[&rid, &indexfile_rid],
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
dbc.query(
|
||||
"update indexfiles set ts_last_channel_search = now() where rowid = $1",
|
||||
&[&indexfile_rid],
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
self.steps = Done;
|
||||
Ok(Some((format!("SelectIndexFile {:?}", paths), self)))
|
||||
Ok(Some((format!("ReadChannels"), self)))
|
||||
}
|
||||
Done => Ok(None),
|
||||
}
|
||||
@@ -310,3 +399,186 @@ impl UnfoldExec for ScanChannels {
|
||||
pub fn scan_channels(conf: ChannelArchiver) -> impl Stream<Item = Result<String, Error>> {
|
||||
unfold_stream(ScanChannels::new(conf.clone()))
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum RetClass {
|
||||
Long,
|
||||
Medium,
|
||||
Short,
|
||||
#[allow(unused)]
|
||||
PostMortem,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum IndexCat {
|
||||
Machine { rc: RetClass },
|
||||
Beamline { rc: RetClass, name: String },
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct IndexFile {
|
||||
path: PathBuf,
|
||||
cat: IndexCat,
|
||||
}
|
||||
|
||||
// Try to make sense of historical conventions how the epics channel archiver engines are configured.
|
||||
fn categorize_index_files(list: &Vec<String>) -> Result<Vec<IndexFile>, Error> {
|
||||
let re_m = Regex::new(r"/archive_(ST|MT|LT)/index").unwrap();
|
||||
let re_b = Regex::new(r"/archive_(X([0-9]+)[^_]*)_(SH|LO)/index").unwrap();
|
||||
let mut ret = vec![];
|
||||
for p in list {
|
||||
match re_m.captures(p) {
|
||||
Some(cap) => {
|
||||
let rc = cap.get(1).unwrap().as_str();
|
||||
let rc = match rc {
|
||||
"ST" => Some(RetClass::Short),
|
||||
"MT" => Some(RetClass::Medium),
|
||||
"LT" => Some(RetClass::Long),
|
||||
_ => {
|
||||
warn!("categorize_index_files no idea about RC for {}", p);
|
||||
None
|
||||
}
|
||||
};
|
||||
if let Some(rc) = rc {
|
||||
let f = IndexFile {
|
||||
path: p.into(),
|
||||
cat: IndexCat::Machine { rc },
|
||||
};
|
||||
ret.push(f);
|
||||
}
|
||||
}
|
||||
None => match re_b.captures(p) {
|
||||
Some(cap) => {
|
||||
let name = cap.get(1).unwrap().as_str();
|
||||
let rc = cap.get(3).unwrap().as_str();
|
||||
let rc = match rc {
|
||||
"SH" => Some(RetClass::Short),
|
||||
"LO" => Some(RetClass::Long),
|
||||
_ => {
|
||||
warn!("categorize_index_files no idea about RC for {}", p);
|
||||
None
|
||||
}
|
||||
};
|
||||
if let Some(rc) = rc {
|
||||
let f = IndexFile {
|
||||
path: p.into(),
|
||||
cat: IndexCat::Beamline { name: name.into(), rc },
|
||||
};
|
||||
ret.push(f);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
warn!("categorize_index_files no idea at all about {}", p);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
let is_machine = {
|
||||
let mut k = false;
|
||||
for x in &ret {
|
||||
if let IndexCat::Machine { .. } = &x.cat {
|
||||
k = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
k
|
||||
};
|
||||
// TODO by default, filter post-mortem.
|
||||
let is_beamline = !is_machine;
|
||||
if is_beamline {
|
||||
let mut ret: Vec<_> = ret
|
||||
.into_iter()
|
||||
.filter_map(|k| {
|
||||
if let IndexCat::Machine { rc, .. } = &k.cat {
|
||||
let prio = match rc {
|
||||
&RetClass::Short => 4,
|
||||
&RetClass::Medium => 6,
|
||||
&RetClass::Long => 8,
|
||||
&RetClass::PostMortem => 0,
|
||||
};
|
||||
Some((k, prio))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
ret.sort_by_key(|x| x.1);
|
||||
let ret = ret.into_iter().map(|k| k.0).collect();
|
||||
Ok(ret)
|
||||
} else if is_machine {
|
||||
let mut ret: Vec<_> = ret
|
||||
.into_iter()
|
||||
.filter_map(|k| {
|
||||
if let IndexCat::Machine { rc, .. } = &k.cat {
|
||||
let prio = match rc {
|
||||
&RetClass::Short => 4,
|
||||
&RetClass::Medium => 6,
|
||||
&RetClass::Long => 8,
|
||||
&RetClass::PostMortem => 0,
|
||||
};
|
||||
Some((k, prio))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
ret.sort_by_key(|x| x.1);
|
||||
let ret = ret.into_iter().map(|k| k.0).collect();
|
||||
Ok(ret)
|
||||
} else {
|
||||
err::todoval()
|
||||
}
|
||||
}
|
||||
|
||||
static INDEX_JSON: Mutex<Option<BTreeMap<String, Vec<String>>>> = Mutex::const_new(None);
|
||||
|
||||
pub async fn index_files_index_ref<P: Into<PathBuf> + Send>(
|
||||
key: &str,
|
||||
index_files_index_path: P,
|
||||
stats: &StatsChannel,
|
||||
) -> Result<Option<Vec<String>>, Error> {
|
||||
let mut g = INDEX_JSON.lock().await;
|
||||
match &*g {
|
||||
Some(j) => Ok(j.get(key).map(|x| x.clone())),
|
||||
None => {
|
||||
let timed1 = Timed::new("slurp_index_json");
|
||||
let index_files_index_path = index_files_index_path.into();
|
||||
let index_files_index = {
|
||||
let timed1 = Timed::new("slurp_index_bytes");
|
||||
let mut index_files_index = open_read(index_files_index_path, stats).await?;
|
||||
let mut buf = vec![0; 1024 * 1024 * 50];
|
||||
let mut ntot = 0;
|
||||
loop {
|
||||
let n = read(&mut index_files_index, &mut buf[ntot..], stats).await?;
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
ntot += n;
|
||||
}
|
||||
buf.truncate(ntot);
|
||||
drop(timed1);
|
||||
serde_json::from_slice::<BTreeMap<String, Vec<String>>>(&buf)?
|
||||
};
|
||||
drop(timed1);
|
||||
let ret = index_files_index.get(key).map(|x| x.clone());
|
||||
*g = Some(index_files_index);
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn index_file_path_list(
|
||||
channel: Channel,
|
||||
index_files_index_path: PathBuf,
|
||||
stats: &StatsChannel,
|
||||
) -> Result<Vec<PathBuf>, Error> {
|
||||
let timed1 = Timed::new("categorize index files");
|
||||
let index_paths = index_files_index_ref(channel.name(), &index_files_index_path, stats)
|
||||
.await?
|
||||
.ok_or(Error::with_msg_no_trace("can not find channel"))?;
|
||||
let list = categorize_index_files(&index_paths)?;
|
||||
info!("GOT CATEGORIZED:\n{:?}", list);
|
||||
let ret = list.into_iter().map(|k| k.path).collect();
|
||||
drop(timed1);
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
1272
archapp/src/archeng/indextree.rs
Normal file
1272
archapp/src/archeng/indextree.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -55,7 +55,6 @@ impl ListIndexFilesHttpFunction {
|
||||
))?;
|
||||
let s = archapp_wrap::archapp::archeng::indexfiles::list_index_files(conf);
|
||||
let s = futures_util::stream::unfold(s, |mut st| async move {
|
||||
use futures_util::StreamExt;
|
||||
let x = st.next().await;
|
||||
match x {
|
||||
Some(x) => match x {
|
||||
@@ -190,7 +189,6 @@ impl ListChannelsHttpFunction {
|
||||
|
||||
let s = archapp_wrap::archapp::archeng::list_all_channels(conf);
|
||||
let s = futures_util::stream::unfold(s, |mut st| async move {
|
||||
use futures_util::StreamExt;
|
||||
let x = st.next().await;
|
||||
match x {
|
||||
Some(x) => match x {
|
||||
|
||||
@@ -278,15 +278,6 @@ impl From<FilePos> for u64 {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub struct DataHeaderPos(pub u64);
|
||||
|
||||
impl PartialEq for DataHeaderPos {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.0 == other.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum TimeRange {
|
||||
Time { beg: DateTime<Utc>, end: DateTime<Utc> },
|
||||
@@ -294,7 +285,7 @@ pub enum TimeRange {
|
||||
Nano { beg: u64, end: u64 },
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Clone, Copy, PartialEq, PartialOrd, Serialize, Deserialize)]
|
||||
pub struct Nanos {
|
||||
pub ns: u64,
|
||||
}
|
||||
|
||||
@@ -94,6 +94,7 @@ pub fn tracing_init() {
|
||||
"info",
|
||||
"archapp::archeng=info",
|
||||
"archapp::archeng::datablockstream=info",
|
||||
"archapp::archeng::indextree=trace",
|
||||
"archapp::storagemerge=info",
|
||||
"daqbuffer::test=trace",
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user