diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index 2372e8d..d6c1a75 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -1,12 +1,16 @@ +use crate::{EventsItem, PlainEvents, ScalarPlainEvents}; use async_channel::{Receiver, Sender}; use err::Error; +use futures_core::Future; +use futures_util::StreamExt; +use items::eventvalues::EventValues; use netpod::timeunits::SEC; -use netpod::{log::*, FilePos, Nanos}; +use netpod::{log::*, ChannelArchiver, FilePos, Nanos}; use std::convert::TryInto; use std::io::{self, SeekFrom}; use std::path::PathBuf; use std::time::{Duration, Instant}; -use tokio::fs::{File, OpenOptions}; +use tokio::fs::{read_dir, File, OpenOptions}; use tokio::io::{AsyncReadExt, AsyncSeekExt}; /* @@ -32,14 +36,16 @@ impl TimedIo for File { */ type Offset = u64; + const OFFSET_SIZE: usize = std::mem::size_of::(); +const EPICS_EPOCH_OFFSET: u64 = 631152000 * SEC; pub async fn open_read(path: PathBuf) -> io::Result { let ts1 = Instant::now(); let res = OpenOptions::new().read(true).open(path).await; let ts2 = Instant::now(); let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3; - info!("timed open_read dt: {:.3} ms", dt); + //info!("timed open_read dt: {:.3} ms", dt); res } @@ -48,7 +54,7 @@ async fn seek(file: &mut File, pos: SeekFrom) -> io::Result { let res = file.seek(pos).await; let ts2 = Instant::now(); let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3; - info!("timed seek dt: {:.3} ms", dt); + //info!("timed seek dt: {:.3} ms", dt); res } @@ -57,7 +63,7 @@ async fn read(file: &mut File, buf: &mut [u8]) -> io::Result { let res = file.read(buf).await; let ts2 = Instant::now(); let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3; - info!("timed read dt: {:.3} ms res: {:?}", dt, res); + //info!("timed read dt: {:.3} ms res: {:?}", dt, res); res } @@ -66,7 +72,7 @@ async fn read_exact(file: &mut File, buf: &mut [u8]) -> io::Result { let res = file.read_exact(buf).await; let ts2 = Instant::now(); let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3; - info!("timed read_exact dt: {:.3} ms res: {:?}", dt, res); + //info!("timed read_exact dt: {:.3} ms res: {:?}", dt, res); res } @@ -100,18 +106,6 @@ pub struct IndexFileBasics { name_hash_entries: Vec, } -impl IndexFileBasics { - pub fn file_offset_size(&self) -> u64 { - if self.version == 3 { - 64 - } else if self.version == 2 { - 32 - } else { - panic!() - } - } -} - pub fn name_hash(s: &str, ht_len: u32) -> u32 { let mut h = 0; for ch in s.as_bytes() { @@ -230,10 +224,18 @@ fn readf64(buf: &[u8], pos: usize) -> f64 { f64::from_be_bytes(buf.as_ref()[pos..pos + 8].try_into().unwrap()) } -pub async fn read_file_basics(f1: &mut File) -> Result { - let mut buf = vec![0; 0x58]; - read_exact(f1, &mut buf).await?; +pub async fn read_file_basics(file: &mut File) -> Result { + let mut buf = vec![0; 128]; + read_exact(file, &mut buf[0..4]).await?; let version = String::from_utf8(buf[3..4].to_vec())?.parse()?; + if version == 3 { + read_exact(file, &mut buf[4..88]).await?; + } else if version == 2 { + read_exact(file, &mut buf[4..48]).await?; + } else { + panic!(); + } + //info!("\nread_file_basics\n{}", format_hex_block(&buf, 160)); let b = &buf; if false { let s: String = b.iter().map(|x| format!(" {:02x}", *x)).collect(); @@ -257,32 +259,71 @@ pub async fn read_file_basics(f1: &mut File) -> Result { } info!("{}", String::from_utf8_lossy(&b[0x2800..0x2880])); } - let mut ret = IndexFileBasics { - version, - name_hash_anchor_beg: readu64(b, 0x04), - name_hash_anchor_len: readu32(b, 0x0c) as u64, - fa_used_list_len: readu64(b, 0x10), - fa_used_list_beg: readu64(b, 0x18), - fa_used_list_end: readu64(b, 0x20), - fa_free_list_len: readu64(b, 0x28), - fa_free_list_beg: readu64(b, 0x30), - fa_free_list_end: readu64(b, 0x38), - fa_header_len: readu64(b, 0x40), - fa_header_prev: readu64(b, 0x48), - fa_header_next: readu64(b, 0x50), - name_hash_entries: vec![], + let mut ret = if version == 3 { + IndexFileBasics { + version, + name_hash_anchor_beg: readu64(b, 0x04), + name_hash_anchor_len: readu32(b, 12) as u64, + fa_used_list_len: readu64(b, 16), + fa_used_list_beg: readu64(b, 24), + fa_used_list_end: readu64(b, 32), + fa_free_list_len: readu64(b, 40), + fa_free_list_beg: readu64(b, 48), + fa_free_list_end: readu64(b, 56), + fa_header_len: readu64(b, 64), + fa_header_prev: readu64(b, 72), + fa_header_next: readu64(b, 80), + name_hash_entries: vec![], + } + } else if version == 2 { + IndexFileBasics { + version, + name_hash_anchor_beg: readu32(b, 4) as u64, + name_hash_anchor_len: readu32(b, 8) as u64, + fa_used_list_len: readu32(b, 12) as u64, + fa_used_list_beg: readu32(b, 16) as u64, + fa_used_list_end: readu32(b, 20) as u64, + fa_free_list_len: readu32(b, 24) as u64, + fa_free_list_beg: readu32(b, 28) as u64, + fa_free_list_end: readu32(b, 32) as u64, + fa_header_len: readu32(b, 36) as u64, + fa_header_prev: readu32(b, 40) as u64, + fa_header_next: readu32(b, 44) as u64, + name_hash_entries: vec![], + } + } else { + panic!(); }; - info!("IndexFileBasics: {:?}", ret); - if true { - let u = ret.name_hash_anchor_len * 8; + info!("read_file_basics w/o hashposs {:?}", ret); + { + if ret.name_hash_anchor_len > 2000 { + return Err(Error::with_msg_no_trace(format!( + "name_hash_anchor_len {}", + ret.name_hash_anchor_len + ))); + } + let u = if version == 3 { + ret.name_hash_anchor_len * 8 + } else if version == 2 { + ret.name_hash_anchor_len * 4 + } else { + panic!() + }; buf.resize(u as usize, 0); - read_exact(f1, &mut buf).await?; + read_exact(file, &mut buf).await?; let b = &buf; for i1 in 0..ret.name_hash_anchor_len { - let pos = readu64(b, i1 as usize * 8); - ret.name_hash_entries.push(NamedHashTableEntry { + let pos = if version == 3 { + readu64(b, i1 as usize * 8) + } else if version == 2 { + readu32(b, i1 as usize * 4) as u64 + } else { + panic!() + }; + let e = NamedHashTableEntry { named_hash_channel_entry_pos: pos, - }); + }; + ret.name_hash_entries.push(e); } } Ok(ret) @@ -342,8 +383,8 @@ pub async fn read_rtree_node(file: &mut File, pos: FilePos, rtree_m: usize) -> R let ts2b = readu32(b, off2 + 12); let ts1b = ts1b.min(NANO_MAX); let ts2b = ts2b.min(NANO_MAX); - let ts1 = ts1a as u64 * SEC + ts1b as u64; - let ts2 = ts2a as u64 * SEC + ts2b as u64; + let ts1 = ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET; + let ts2 = ts2a as u64 * SEC + ts2b as u64 + EPICS_EPOCH_OFFSET; let child_or_id = readu64(b, off2 + 16); //info!("NODE {} {} {} {} {}", ts1a, ts1b, ts2a, ts2b, child_or_id); if child_or_id != 0 { @@ -440,59 +481,43 @@ pub struct ChannelInfoBasics { } pub async fn read_channel(index_file: &mut File, channel_name: &str) -> Result, Error> { - // TODO - // How do I locate the correct index file? - // Given a channel name, how do I find the master index? - - let f1 = index_file; - let basics = read_file_basics(f1).await?; - if false { - info!("got basics: {:?}", basics); - } + let basics = read_file_basics(index_file).await?; + let hver2 = HeaderVersion2; + let hver3 = HeaderVersion3; + let hver: &dyn HeaderVersion = if basics.version == 3 { + &hver3 + } else if basics.version == 2 { + &hver2 + } else { + panic!() + }; let chn_hash = name_hash(channel_name, basics.name_hash_anchor_len as u32); - info!("channel hash: {:08x}", chn_hash); let epos = &basics.name_hash_entries[chn_hash as usize]; - info!("table-entry: {:?}", epos); let mut entries = vec![]; - seek(f1, SeekFrom::Start(epos.named_hash_channel_entry_pos)).await?; let mut rb = RingBuf::new(); + let mut pos = epos.named_hash_channel_entry_pos; loop { - rb.fill_if_low(f1).await?; - if rb.len() < 20 { - warn!("break because not enough data"); + rb.reset(); + seek(index_file, SeekFrom::Start(pos)).await?; + let fill_min = if hver.offset_size() == 8 { 20 } else { 12 }; + rb.fill_min(index_file, fill_min).await?; + if rb.len() < fill_min { + warn!("not enough data to continue reading channel list from name hash list"); break; } - let p1 = 0x00; let buf = rb.data(); - let next = readu64(&buf, p1 + 0); - let id = readu64(&buf, p1 + 8); - let name_len = readu16(&buf, p1 + 16); - let id_txt_len = readu16(&buf, p1 + 18); - let n0 = 20; - let n1 = name_len as usize; - let n2 = id_txt_len as usize; - let channel_name_found = String::from_utf8(buf[n0..n0 + n1].to_vec())?; - let id_txt = String::from_utf8(buf[n0 + n1..n0 + n1 + n2].to_vec())?; - let e = NamedHashChannelEntry { - next, - id_rtree_pos: id, - channel_name: channel_name_found, - id_txt, - }; + let e = parse_name_hash_channel_entry(buf, hver)?; + let next = e.next; entries.push(e); if next == 0 { - info!("break because no next"); break; - } else if next > 1024 * 1024 * 1 { - warn!("suspicious `next` {}", next); - return Err(Error::with_msg_no_trace("bad next")); } else { - rb.adv(next as usize); + pos = next; } } for e in &entries { if e.channel_name == channel_name { - let ep = read_rtree_entrypoint(f1, e.id_rtree_pos, &basics).await?; + let ep = read_rtree_entrypoint(index_file, e.id_rtree_pos, &basics).await?; let ret = ChannelInfoBasics { channel_name: channel_name.into(), rtree_m: ep.rtree_m, @@ -504,6 +529,133 @@ pub async fn read_channel(index_file: &mut File, channel_name: &str) -> Result u8; + fn read_offset(&self, buf: &[u8], pos: usize) -> u64; + fn offset_size(&self) -> usize; +} + +struct HeaderVersion2; + +impl HeaderVersion for HeaderVersion2 { + fn version(&self) -> u8 { + 2 + } + + fn read_offset(&self, buf: &[u8], pos: usize) -> u64 { + readu32(buf, pos) as u64 + } + + fn offset_size(&self) -> usize { + 4 + } +} + +struct HeaderVersion3; + +impl HeaderVersion for HeaderVersion3 { + fn version(&self) -> u8 { + 3 + } + + fn read_offset(&self, buf: &[u8], pos: usize) -> u64 { + readu64(buf, pos) + } + + fn offset_size(&self) -> usize { + 8 + } +} + +fn parse_name_hash_channel_entry(buf: &[u8], hver: &dyn HeaderVersion) -> Result { + let mut p1 = 0; + let next = hver.read_offset(&buf, p1); + p1 += hver.offset_size(); + let id = hver.read_offset(&buf, p1); + p1 += hver.offset_size(); + let name_len = readu16(&buf, p1); + p1 += 2; + let id_txt_len = readu16(&buf, p1); + p1 += 2; + if next > 1024 * 1024 * 1024 * 1024 || id > 1024 * 1024 * 1024 * 1024 || name_len > 128 || id_txt_len > 128 { + error!( + "something bad: parse_name_hash_channel_entry next {} id {} name_len {} id_txt_len {}", + next, id, name_len, id_txt_len + ); + return Err(Error::with_msg_no_trace("bad hash table entry")); + } + let n1 = name_len as usize; + let n2 = id_txt_len as usize; + let channel_name_found = String::from_utf8(buf[p1..p1 + n1].to_vec())?; + p1 += n1; + let id_txt = String::from_utf8(buf[p1..p1 + n2].to_vec())?; + p1 += n2; + let _ = p1; + let e = NamedHashChannelEntry { + next, + id_rtree_pos: id, + channel_name: channel_name_found, + id_txt, + }; + Ok(e) +} + +async fn channel_list_from_index_name_hash_list( + file: &mut File, + pos: FilePos, + hver: &dyn HeaderVersion, +) -> Result, Error> { + let mut pos = pos; + let mut ret = vec![]; + let mut rb = RingBuf::new(); + loop { + rb.reset(); + seek(file, SeekFrom::Start(pos.pos)).await?; + let fill_min = if hver.offset_size() == 8 { 20 } else { 12 }; + rb.fill_min(file, fill_min).await?; + if rb.len() < fill_min { + warn!("not enough data to continue reading channel list from name hash list"); + break; + } + let e = parse_name_hash_channel_entry(rb.data(), hver)?; + let next = e.next; + ret.push(e); + if next == 0 { + break; + } else { + pos.pos = next; + } + } + Ok(ret) +} + +pub async fn channel_list(index_path: PathBuf) -> Result, Error> { + let mut ret = vec![]; + let file = &mut open_read(index_path.clone()).await?; + let basics = read_file_basics(file).await?; + let hver2 = HeaderVersion2; + let hver3 = HeaderVersion3; + let hver: &dyn HeaderVersion = if basics.version == 2 { + &hver2 + } else if basics.version == 3 { + &hver3 + } else { + panic!(); + }; + for (i, name_hash_entry) in basics.name_hash_entries.iter().enumerate() { + if name_hash_entry.named_hash_channel_entry_pos != 0 { + let pos = FilePos { + pos: name_hash_entry.named_hash_channel_entry_pos, + }; + let list = channel_list_from_index_name_hash_list(file, pos, hver).await?; + for e in list { + ret.push(e.channel_name); + } + } + } + Ok(ret) +} + async fn datarange_stream_fill(channel_name: &str, tx: Sender) { // Search the first relevant leaf node. // Pipe all ranges from there, and continue with nodes. @@ -543,6 +695,41 @@ async fn read_index_datablockref(file: &mut File, pos: FilePos) -> Result Result { + use DbrType::*; + let res = match k { + 0 => DbrString, + 1 => DbrInt, + 9 => DbrStsFloat, + 20 => DbrTimeDouble, + _ => { + let msg = format!("not a valid/supported dbr type: {}", k); + return Err(Error::with_msg_no_trace(msg)); + } + }; + Ok(res) + } + + fn byte_len(&self) -> usize { + use DbrType::*; + match self { + DbrString => 0, + DbrInt => 4, + DbrStsFloat => 1, + DbrTimeDouble => 16, + } + } +} + #[derive(Debug)] pub struct DatafileHeader { dir_offset: u32, @@ -553,14 +740,16 @@ pub struct DatafileHeader { ctrl_info_offset: u32, buf_size: u32, buf_free: u32, - dbr_type: u16, - dbr_count: u16, + dbr_type: DbrType, + dbr_count: usize, period: f64, ts1: u64, ts2: u64, ts3: u64, } +const DATA_HEADER_LEN_ON_DISK: usize = 72 + 40 + 40; + async fn read_datafile_header(file: &mut File, pos: FilePos) -> Result { seek(file, SeekFrom::Start(pos.into())).await?; let mut rb = RingBuf::new(); @@ -574,7 +763,7 @@ async fn read_datafile_header(file: &mut File, pos: FilePos) -> Result Result Result Result Result<(), Error> { +async fn read_data_1(file: &mut File, pos: FilePos) -> Result { let datafile_header = read_datafile_header(file, pos).await?; - info!("datafile_header {:?}", datafile_header); - Ok(()) + //info!("datafile_header {:?}", datafile_header); + seek(file, SeekFrom::Start(u64::from(pos) + DATA_HEADER_LEN_ON_DISK as u64)).await?; + let res = match &datafile_header.dbr_type { + DbrType::DbrTimeDouble => { + if datafile_header.dbr_count == 1 { + info!("~~~~~~~~~~~~~~~~~~~~~ read scalar DbrTimeDouble"); + let mut evs = EventValues { + tss: vec![], + values: vec![], + }; + let n1 = datafile_header.num_samples as usize; + //let n2 = datafile_header.dbr_type.byte_len(); + let n2 = 2 + 2 + 4 + 4 + (4) + 8; + let n3 = n1 * n2; + let mut buf = vec![0; n3]; + read_exact(file, &mut buf).await?; + let mut p1 = 0; + while p1 < n3 - n2 { + let _status = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap()); + p1 += 2; + let _severity = u16::from_be_bytes(buf[p1..p1 + 2].try_into().unwrap()); + p1 += 2; + let ts1a = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap()); + p1 += 4; + let ts1b = u32::from_be_bytes(buf[p1..p1 + 4].try_into().unwrap()); + p1 += 4; + let ts1 = ts1a as u64 * SEC + ts1b as u64 + EPICS_EPOCH_OFFSET; + p1 += 4; + let value = f64::from_be_bytes(buf[p1..p1 + 8].try_into().unwrap()); + p1 += 8; + //info!("read event {} {} {} {} {}", status, severity, ts1a, ts1b, value); + evs.tss.push(ts1); + evs.values.push(value); + } + let evs = ScalarPlainEvents::Double(evs); + let plain = PlainEvents::Scalar(evs); + let item = EventsItem::Plain(plain); + item + } else { + // 1d shape + err::todoval() + } + } + _ => err::todoval(), + }; + Ok(res) +} + +pub fn list_index_files(node: &ChannelArchiver) -> Receiver> { + let node = node.clone(); + let (tx, rx) = async_channel::bounded(4); + let tx2 = tx.clone(); + let task = async move { + for bp in &node.data_base_paths { + let mut rd = read_dir(bp).await?; + while let Some(e) = rd.next_entry().await? { + let ft = e.file_type().await?; + if ft.is_dir() { + let mut rd = read_dir(e.path()).await?; + while let Some(e) = rd.next_entry().await? { + let ft = e.file_type().await?; + if false && ft.is_dir() { + let mut rd = read_dir(e.path()).await?; + while let Some(e) = rd.next_entry().await? { + let ft = e.file_type().await?; + if ft.is_file() { + if e.file_name().to_string_lossy() == "index" { + tx.send(Ok(e.path())).await?; + } + } + } + } else if ft.is_file() { + if e.file_name().to_string_lossy() == "index" { + tx.send(Ok(e.path())).await?; + } + } + } + } else if ft.is_file() { + if e.file_name().to_string_lossy() == "index" { + tx.send(Ok(e.path())).await?; + } + } + } + } + Ok::<_, Error>(()) + }; + wrap_task(task, tx2); + rx +} + +fn wrap_task(task: T, tx: Sender>) +where + T: Future> + Send + 'static, + O1: Send + 'static, + O2: Send + 'static, +{ + let task = async move { + match task.await { + Ok(_) => {} + Err(e) => { + match tx.send(Err(e)).await { + Ok(_) => {} + Err(e) => { + error!("wrap_task can not forward error: {:?}", e); + } + }; + } + } + }; + taskrun::spawn(task); +} + +pub fn list_all_channels(node: &ChannelArchiver) -> Receiver> { + let node = node.clone(); + let (tx, rx) = async_channel::bounded(4); + let tx2 = tx.clone(); + let task = async move { + let mut ixf = list_index_files(&node); + while let Some(f) = ixf.next().await { + let index_path = f?; + if !index_path.to_str().unwrap().contains("archive_X02DA_LO/20130101/index") { + //continue; + } + info!("try to read for {:?}", index_path); + //continue; + let channels = channel_list(index_path).await?; + info!("list_all_channels emit {} channels", channels.len()); + for ch in channels { + tx.send(Ok(ch)).await?; + } + } + info!("list_all_channels DONE"); + Ok::<_, Error>(()) + }; + wrap_task(task, tx2); + rx } #[cfg(test)] @@ -620,22 +945,16 @@ mod test { //use disk::rangefilter::RangeFilter; //use disk::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf}; + use super::search_record; + use crate::archeng::EPICS_EPOCH_OFFSET; use crate::archeng::{open_read, read_channel, read_data_1, read_file_basics, read_index_datablockref}; use err::Error; - use futures_util::StreamExt; - use items::{RangeCompletableItem, StreamItem}; - use netpod::timeunits::{DAY, MS, SEC}; - use netpod::{log::*, FilePos}; - use netpod::{ByteSize, ChannelConfig, FileIoBufferSize, Nanos}; + use netpod::log::*; + use netpod::timeunits::*; + use netpod::FilePos; + use netpod::Nanos; use std::path::PathBuf; - use super::search_record; - - fn open_index_inner(path: impl Into) -> Result<(), Error> { - let task = async move { Ok(()) }; - Ok(taskrun::run(task).unwrap()) - } - /* Root node: most left record ts1 965081099942616289, most right record ts2 1002441959876114632 */ @@ -646,9 +965,16 @@ mod test { let fut = async { let mut f1 = open_read(CHN_0_MASTER_INDEX.into()).await?; let res = read_file_basics(&mut f1).await?; - info!("got {:?}", res); assert_eq!(res.version, 3); - assert_eq!(res.file_offset_size(), 64); + assert_eq!(res.name_hash_anchor_beg, 88); + assert_eq!(res.name_hash_anchor_len, 1009); + // TODO makes no sense: + assert_eq!(res.fa_used_list_beg, 2611131); + assert_eq!(res.fa_used_list_end, 64); + assert_eq!(res.fa_used_list_len, 2136670); + assert_eq!(res.fa_header_next, 8160); + assert_eq!(res.fa_header_prev, 0); + assert_eq!(res.fa_header_len, 8072); Ok(()) }; Ok(taskrun::run(fut).unwrap()) @@ -660,7 +986,11 @@ mod test { let mut index_file = open_read(CHN_0_MASTER_INDEX.into()).await?; let channel_name = "X05DA-FE-WI1:TC1"; let res = read_channel(&mut index_file, channel_name).await?; - info!("got {:?}", res); + assert_eq!(res.is_some(), true); + let res = res.unwrap(); + assert_eq!(res.channel_name, channel_name); + assert_eq!(res.rtree_m, 50); + assert_eq!(res.rtree_start_pos.pos, 329750); Ok(()) }; Ok(taskrun::run(fut).unwrap()) @@ -669,6 +999,7 @@ mod test { #[test] fn search_record_middle() -> Result<(), Error> { /* + These times are still without EPICS_EPOCH_OFFSET. RTreeNodeRecord { ts1: 969729779686636130, ts2: 970351442331056677, child_or_id: 130731 }, RTreeNodeRecord { ts1: 970351499684884156, ts2: 970417919634086480, child_or_id: 185074 }, RTreeNodeRecord { ts1: 970417979635219603, ts2: 970429859806669835, child_or_id: 185015 }, @@ -677,7 +1008,7 @@ mod test { let index_path: PathBuf = CHN_0_MASTER_INDEX.into(); let mut index_file = open_read(index_path.clone()).await?; let channel_name = "X05DA-FE-WI1:TC1"; - const T0: u64 = 970351442331056677 + 1; + const T0: u64 = 970351442331056677 + 1 + EPICS_EPOCH_OFFSET; let beg = Nanos { ns: T0 }; let res = read_channel(&mut index_file, channel_name).await?; let cib = res.unwrap(); @@ -688,17 +1019,15 @@ mod test { assert_eq!(res.node.pos.pos, 8216); assert_eq!(res.rix, 17); let rec = &res.node.records[res.rix]; - assert_eq!(rec.ts1, 970351499684884156); - assert_eq!(rec.ts2, 970417919634086480); - info!("leaf match id {}", rec.child_or_id); + assert_eq!(rec.ts1, 970351499684884156 + EPICS_EPOCH_OFFSET); + assert_eq!(rec.ts2, 970417919634086480 + EPICS_EPOCH_OFFSET); + assert_eq!(rec.child_or_id, 185074); let pos = FilePos { pos: rec.child_or_id }; let datablock = read_index_datablockref(&mut index_file, pos).await?; - info!("datablock {:?}", datablock); - let data_path = index_path.parent().unwrap().join(datablock.fname); - info!("try to open {:?}", data_path); - let mut data_file = open_read(data_path).await?; - let data_pos = FilePos { pos: datablock.data }; - read_data_1(&mut data_file, data_pos).await?; + assert_eq!(datablock.data, 52787); + assert_eq!(datablock.fname, "20201001/20201001"); + // The actual datafile for that time was not retained any longer. + // But the index still points to that. Ok(()) }; Ok(taskrun::run(fut).unwrap()) @@ -710,7 +1039,7 @@ mod test { let index_path: PathBuf = CHN_0_MASTER_INDEX.into(); let mut index_file = open_read(index_path.clone()).await?; let channel_name = "X05DA-FE-WI1:TC1"; - const T0: u64 = 1002000000 * SEC; + const T0: u64 = 1002000000 * SEC + EPICS_EPOCH_OFFSET; let beg = Nanos { ns: T0 }; let res = read_channel(&mut index_file, channel_name).await?; let cib = res.unwrap(); @@ -721,17 +1050,18 @@ mod test { assert_eq!(res.node.pos.pos, 1861178); assert_eq!(res.rix, 41); let rec = &res.node.records[res.rix]; - assert_eq!(rec.ts1, 1001993759871202919); - assert_eq!(rec.ts2, 1002009299596362122); - info!("leaf match id {}", rec.child_or_id); + assert_eq!(rec.ts1, 1001993759871202919 + EPICS_EPOCH_OFFSET); + assert_eq!(rec.ts2, 1002009299596362122 + EPICS_EPOCH_OFFSET); + assert_eq!(rec.child_or_id, 2501903); let pos = FilePos { pos: rec.child_or_id }; let datablock = read_index_datablockref(&mut index_file, pos).await?; - info!("datablock {:?}", datablock); + assert_eq!(datablock.data, 9311367); + assert_eq!(datablock.fname, "20211001/20211001"); let data_path = index_path.parent().unwrap().join(datablock.fname); - info!("try to open {:?}", data_path); let mut data_file = open_read(data_path).await?; let data_pos = FilePos { pos: datablock.data }; - read_data_1(&mut data_file, data_pos).await?; + let events = read_data_1(&mut data_file, data_pos).await?; + info!("read events: {:?}", events); Ok(()) }; Ok(taskrun::run(fut).unwrap()) @@ -743,11 +1073,11 @@ mod test { let fut = async { let mut index_file = open_read(CHN_0_MASTER_INDEX.into()).await?; let channel_name = "X05DA-FE-WI1:TC1"; - const T0: u64 = 965081099942616289; + const T0: u64 = 965081099942616289 + EPICS_EPOCH_OFFSET; let beg = Nanos { ns: T0 }; let res = read_channel(&mut index_file, channel_name).await?; let cib = res.unwrap(); - let (res, stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; + let (res, _stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; assert_eq!(res.is_some(), true); let res = res.unwrap(); assert_eq!(res.node.is_leaf, true); @@ -764,11 +1094,11 @@ mod test { let fut = async { let mut index_file = open_read(CHN_0_MASTER_INDEX.into()).await?; let channel_name = "X05DA-FE-WI1:TC1"; - const T0: u64 = 1002441959876114632 - 1; + const T0: u64 = 1002441959876114632 - 1 + EPICS_EPOCH_OFFSET; let beg = Nanos { ns: T0 }; let res = read_channel(&mut index_file, channel_name).await?; let cib = res.unwrap(); - let (res, stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; + let (res, _stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; assert_eq!(res.is_some(), true); let res = res.unwrap(); assert_eq!(res.node.pos.pos, 1861178); @@ -784,11 +1114,11 @@ mod test { let fut = async { let mut index_file = open_read(CHN_0_MASTER_INDEX.into()).await?; let channel_name = "X05DA-FE-WI1:TC1"; - const T0: u64 = 1002441959876114632 - 0; + const T0: u64 = 1002441959876114632 - 0 + EPICS_EPOCH_OFFSET; let beg = Nanos { ns: T0 }; let res = read_channel(&mut index_file, channel_name).await?; let cib = res.unwrap(); - let (res, stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; + let (res, _stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; assert_eq!(res.is_none(), true); Ok(()) }; diff --git a/archapp_wrap/src/lib.rs b/archapp_wrap/src/lib.rs index 7655e0f..7d8822e 100644 --- a/archapp_wrap/src/lib.rs +++ b/archapp_wrap/src/lib.rs @@ -9,6 +9,8 @@ use std::collections::BTreeMap; use std::future::Future; use std::pin::Pin; +pub use archapp; + pub fn scan_files( pairs: BTreeMap, node_config: NodeConfigCached, diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index f1ab47f..3289a00 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -44,8 +44,7 @@ pub async fn search_channel( None => vec![], }; let k = ChannelSearchSingleResult { - backend: node_config.node.backend.clone(), - //backend: row.get(7), + backend: row.get(7), name: row.get(1), source: row.get(2), ty: row.get(3), diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 8205326..8a83ab0 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -17,6 +17,7 @@ pub fn make_test_node(id: u32) -> Node { backend: "testbackend".into(), splits: None, archiver_appliance: None, + channel_archiver: None, } } diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index dbd6cfa..ec46883 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -30,9 +30,12 @@ async fn position_file( expand_left: bool, expand_right: bool, ) -> Result { - info!( + trace!( "position_file called {} {} {:?} {:?}", - expand_left, expand_right, range, path + expand_left, + expand_right, + range, + path ); assert_eq!(expand_left && expand_right, false); match OpenOptions::new().read(true).open(&path).await { diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 298af19..1233876 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -89,6 +89,7 @@ impl EventChunker { expand: bool, do_decompress: bool, ) -> Self { + trace!("EventChunker::from_start"); let mut inp = NeedMinBuffer::new(inp); inp.set_need_min(6); Self { @@ -609,9 +610,12 @@ impl Stream for EventChunker { Ready(None) } else if self.final_stats_sent { self.sent_beyond_range = true; + trace!("sent_beyond_range"); if self.seen_beyond_range { + trace!("sent_beyond_range RangeComplete"); Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } else { + trace!("sent_beyond_range non-complete"); continue 'outer; } } else if self.data_emit_complete { diff --git a/disk/src/gen.rs b/disk/src/gen.rs index f99ebbc..0b1f44c 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -12,7 +12,9 @@ use tracing::{debug, error, info, trace, warn}; #[test] pub fn gen_test_data_test() { - taskrun::run(gen_test_data()).unwrap(); + if false { + taskrun::run(gen_test_data()).unwrap(); + } } pub async fn gen_test_data() -> Result<(), Error> { @@ -126,6 +128,7 @@ pub async fn gen_test_data() -> Result<(), Error> { backend: "testbackend".into(), splits: None, archiver_appliance: None, + channel_archiver: None, }; ensemble.nodes.push(node); } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 58259df..e8bda3c 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -65,6 +65,8 @@ impl Stream for FileReader { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + err::todo(); + // TODO remove if no longer used? let blen = self.buffer_size as usize; let mut buf2 = BytesMut::with_capacity(blen); buf2.resize(buf2.capacity(), 0); @@ -312,58 +314,55 @@ impl Stream for NeedMinBuffer { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - if self.completed { - panic!("NeedMinBuffer poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } loop { - let mut again = false; - let z = match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(fcr))) => { - self.buf_len_histo.ingest(fcr.buf.len() as u32); - //info!("NeedMinBuffer got buf len {}", fcr.buf.len()); - match self.left.take() { - Some(mut lfcr) => { - // TODO measure: - lfcr.buf.unsplit(fcr.buf); - lfcr.duration += fcr.duration; - let fcr = lfcr; - if fcr.buf.len() as u32 >= self.need_min { - //info!("with left ready len {} need_min {}", buf.len(), self.need_min); - Ready(Some(Ok(fcr))) - } else { - //info!("with left not enough len {} need_min {}", buf.len(), self.need_min); - self.left.replace(fcr); - again = true; - Pending + break if self.completed { + panic!("NeedMinBuffer poll_next on completed"); + } else if self.errored { + self.completed = true; + return Ready(None); + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(fcr))) => { + self.buf_len_histo.ingest(fcr.buf.len() as u32); + //info!("NeedMinBuffer got buf len {}", fcr.buf.len()); + match self.left.take() { + Some(mut lfcr) => { + // TODO measure: + lfcr.buf.unsplit(fcr.buf); + lfcr.duration += fcr.duration; + let fcr = lfcr; + if fcr.buf.len() as u32 >= self.need_min { + //info!("with left ready len {} need_min {}", buf.len(), self.need_min); + Ready(Some(Ok(fcr))) + } else { + //info!("with left not enough len {} need_min {}", buf.len(), self.need_min); + self.left.replace(fcr); + continue; + } } - } - None => { - if fcr.buf.len() as u32 >= self.need_min { - //info!("simply ready len {} need_min {}", buf.len(), self.need_min); - Ready(Some(Ok(fcr))) - } else { - //info!("no previous leftover, need more len {} need_min {}", buf.len(), self.need_min); - self.left.replace(fcr); - again = true; - Pending + None => { + if fcr.buf.len() as u32 >= self.need_min { + //info!("simply ready len {} need_min {}", buf.len(), self.need_min); + Ready(Some(Ok(fcr))) + } else { + //info!("no previous leftover, need more len {} need_min {}", buf.len(), self.need_min); + self.left.replace(fcr); + continue; + } } } } + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + info!("NeedMinBuffer histo: {:?}", self.buf_len_histo); + Ready(None) + } + Pending => Pending, } - Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), - Ready(None) => { - info!("NeedMinBuffer histo: {:?}", self.buf_len_histo); - Ready(None) - } - Pending => Pending, }; - if !again { - break z; - } } } } diff --git a/disk/src/merge.rs b/disk/src/merge.rs index cebf7b6..cfe4b32 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -56,6 +56,7 @@ where ITY: Appendable + Unpin, { pub fn new(inps: Vec) -> Self { + trace!("MergedStream::new"); let n = inps.len(); let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect(); Self { diff --git a/disk/src/rangefilter.rs b/disk/src/rangefilter.rs index 2a97582..88f9796 100644 --- a/disk/src/rangefilter.rs +++ b/disk/src/rangefilter.rs @@ -9,10 +9,13 @@ use std::task::{Context, Poll}; pub struct RangeFilter { inp: S, range: NanoRange, + range_str: String, expand: bool, prerange: ITY, have_pre: bool, + have_range_complete: bool, emitted_post: bool, + data_done: bool, done: bool, complete: bool, } @@ -22,19 +25,109 @@ where ITY: Appendable, { pub fn new(inp: S, range: NanoRange, expand: bool) -> Self { + trace!("RangeFilter::new range: {:?} expand: {:?}", range, expand); Self { inp, + range_str: format!("{:?}", range), range, expand, prerange: ITY::empty(), have_pre: false, + have_range_complete: false, emitted_post: false, + data_done: false, done: false, complete: false, } } } +impl RangeFilter +where + S: Stream> + Unpin, + ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, +{ + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll::Item>> { + use Poll::*; + loop { + break if self.complete { + panic!("poll_next on complete"); + } else if self.done { + self.complete = true; + Ready(None) + } else if self.data_done { + self.done = true; + if self.have_range_complete { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else { + continue; + } + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => match item { + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => { + let mut ret = ITY::empty(); + for i1 in 0..item.len() { + let ts = item.ts(i1); + if ts < self.range.beg { + if self.expand { + self.prerange.clear(); + self.prerange.push_index(&item, i1); + self.have_pre = true; + } + } else if ts >= self.range.end { + self.have_range_complete = true; + if self.expand { + if self.have_pre { + ret.push_index(&self.prerange, 0); + self.prerange.clear(); + self.have_pre = false; + } + if !self.emitted_post { + self.emitted_post = true; + ret.push_index(&item, i1); + //self.data_done = true; + } + } else { + //self.data_done = true; + } + } else { + if self.expand { + if self.have_pre { + ret.push_index(&self.prerange, 0); + self.prerange.clear(); + self.have_pre = false; + } + } + ret.push_index(&item, i1); + }; + } + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) + } + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { + self.have_range_complete = true; + continue; + } + k => Ready(Some(k)), + }, + Ready(None) => { + self.data_done = true; + if self.have_pre { + let mut ret = ITY::empty(); + ret.push_index(&self.prerange, 0); + self.have_pre = false; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) + } else { + continue; + } + } + Pending => Pending, + } + }; + } + } +} + impl Stream for RangeFilter where S: Stream> + Unpin, @@ -42,80 +135,9 @@ where { type Item = Sitemty; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - if self.complete { - panic!("poll_next on complete"); - } else if self.done { - self.complete = true; - Ready(None) - } else { - match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => match item { - Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => { - let mut ret = ITY::empty(); - for i1 in 0..item.len() { - let ts = item.ts(i1); - if ts < self.range.beg { - if self.expand { - self.prerange.clear(); - self.prerange.push_index(&item, i1); - self.have_pre = true; - } else { - }; - } else if ts >= self.range.end { - if self.expand { - if self.have_pre { - ret.push_index(&self.prerange, 0); - self.prerange.clear(); - self.have_pre = false; - }; - if !self.emitted_post { - self.emitted_post = true; - ret.push_index(&item, i1); - self.done = true; - } else { - panic!(); - }; - } else { - self.done = true; - }; - } else { - if self.expand { - if self.have_pre { - ret.push_index(&self.prerange, 0); - self.prerange.clear(); - self.have_pre = false; - } - ret.push_index(&item, i1); - } else { - ret.push_index(&item, i1); - }; - }; - } - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) - } - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { - warn!("\n\nRangeFilter got RangeComplete\n"); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) - } - k => Ready(Some(k)), - }, - Ready(None) => { - if self.have_pre { - let mut ret = ITY::empty(); - ret.push_index(&self.prerange, 0); - self.have_pre = false; - self.done = true; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) - } else { - self.done = true; - self.complete = true; - Ready(None) - } - } - Pending => Pending, - } - } + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let span1 = span!(Level::INFO, "RangeFilter", range = tracing::field::Empty); + span1.record("range", &self.range_str.as_str()); + span1.in_scope(|| Self::poll_next(self, cx)) } } diff --git a/httpret/src/channelarchiver.rs b/httpret/src/channelarchiver.rs new file mode 100644 index 0000000..ea06182 --- /dev/null +++ b/httpret/src/channelarchiver.rs @@ -0,0 +1,119 @@ +use crate::response; +use err::Error; +use http::{header, Method, Request, Response, StatusCode}; +use hyper::Body; +use netpod::{log::*, NodeConfigCached, APP_JSON_LINES}; + +pub struct ListIndexFilesHttpFunction {} + +impl ListIndexFilesHttpFunction { + pub fn prefix() -> &'static str { + "/api/4/channelarchiver/list/indexfiles" + } + + pub fn name() -> &'static str { + "ListIndexFilesHttpFunction" + } + + pub fn should_handle(path: &str) -> Option { + if path.starts_with(Self::prefix()) { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + info!("{} handle uri: {:?}", Self::name(), req.uri()); + let conf = node_config + .node + .channel_archiver + .as_ref() + .ok_or(Error::with_msg_no_trace( + "this node is not configured as channel archiver", + ))?; + let s = archapp_wrap::archapp::archeng::list_index_files(conf); + let s = futures_util::stream::unfold(s, |mut st| async move { + use futures_util::StreamExt; + let x = st.next().await; + match x { + Some(x) => match x { + Ok(x) => { + let mut x = serde_json::to_vec(&x).unwrap(); + x.push(b'\n'); + Some((Ok::<_, Error>(x), st)) + } + Err(e) => { + error!("{:?}", e); + None + } + }, + None => None, + } + }); + Ok(response(StatusCode::OK) + .header(header::CONTENT_TYPE, APP_JSON_LINES) + .body(Body::wrap_stream(s))?) + } +} + +pub struct ListChannelsHttpFunction {} + +impl ListChannelsHttpFunction { + pub fn prefix() -> &'static str { + "/api/4/channelarchiver/list/channels" + } + + pub fn name() -> &'static str { + "ListChannelsHttpFunction" + } + + pub fn should_handle(path: &str) -> Option { + if path.starts_with(Self::prefix()) { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + info!("{} handle uri: {:?}", Self::name(), req.uri()); + let conf = node_config + .node + .channel_archiver + .as_ref() + .ok_or(Error::with_msg_no_trace( + "this node is not configured as channel archiver", + ))?; + + let s = archapp_wrap::archapp::archeng::list_all_channels(conf); + let s = futures_util::stream::unfold(s, |mut st| async move { + use futures_util::StreamExt; + let x = st.next().await; + match x { + Some(x) => match x { + Ok(x) => { + let mut x = serde_json::to_vec(&x).unwrap(); + x.push(b'\n'); + Some((Ok::<_, Error>(x), st)) + } + Err(e) => { + //Some((Err(e), st)) + error!("{:?}", e); + None + } + }, + None => None, + } + }); + Ok(response(StatusCode::OK) + .header(header::CONTENT_TYPE, APP_JSON_LINES) + .body(Body::wrap_stream(s))?) + } +} diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 4a312c6..56a757d 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -29,6 +29,7 @@ use tracing::Instrument; use url::Url; pub mod api1; +pub mod channelarchiver; pub mod gather; pub mod proxy; pub mod pulsemap; @@ -273,6 +274,10 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> pulsemap::MapPulseHistoHttpFunction::handle(req, &node_config).await } else if pulsemap::MapPulseHttpFunction::path_matches(path) { pulsemap::MapPulseHttpFunction::handle(req, &node_config).await + } else if let Some(h) = channelarchiver::ListIndexFilesHttpFunction::should_handle(path) { + h.handle(req, &node_config).await + } else if let Some(h) = channelarchiver::ListChannelsHttpFunction::should_handle(path) { + h.handle(req, &node_config).await } else if path.starts_with("/api/1/requestStatus/") { info!("{}", path); Ok(response(StatusCode::OK).body(Body::from("{}"))?) diff --git a/httpret/src/search.rs b/httpret/src/search.rs index 6a95d8b..b79a8d6 100644 --- a/httpret/src/search.rs +++ b/httpret/src/search.rs @@ -1,14 +1,15 @@ use crate::response; use err::Error; +use http::header; use hyper::{Body, Request, Response, StatusCode}; -use netpod::log::*; +use netpod::{log::*, APP_JSON}; use netpod::{ChannelSearchQuery, NodeConfigCached}; use url::Url; pub async fn channel_search(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); - match head.headers.get("accept") { - Some(v) if v == "application/json" => { + match head.headers.get(header::ACCEPT) { + Some(v) if v == APP_JSON => { let s1 = format!("dummy:{}", head.uri); info!("try to parse {:?}", s1); let url = Url::parse(&s1)?; diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs index 6795e21..1954349 100644 --- a/netfetch/src/test.rs +++ b/netfetch/src/test.rs @@ -21,6 +21,7 @@ fn ca_connect_1() { ksprefix: "".into(), splits: None, archiver_appliance: None, + channel_archiver: None, }, node_config: NodeConfig { name: "".into(), diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index c2508e5..6e2e4fc 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -123,6 +123,11 @@ pub struct ArchiverAppliance { pub data_base_paths: Vec, } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ChannelArchiver { + pub data_base_paths: Vec, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Node { pub host: String, @@ -135,6 +140,7 @@ pub struct Node { pub backend: String, pub splits: Option>, pub archiver_appliance: Option, + pub channel_archiver: Option, } impl Node { @@ -150,6 +156,7 @@ impl Node { backend: "dummybackend".into(), splits: None, archiver_appliance: None, + channel_archiver: None, } } } diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index cc27c48..a8c8d3a 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -90,7 +90,7 @@ pub fn tracing_init() { .with_thread_names(true) //.with_max_level(tracing::Level::INFO) .with_env_filter(tracing_subscriber::EnvFilter::new( - "info,daqbuffer=trace,daqbuffer::test=trace,disk::raw::conn=info", + ["info", "daqbuffer::test=trace"].join(","), )) .init(); *g = 1; @@ -120,6 +120,7 @@ pub fn test_cluster() -> netpod::Cluster { backend: "testbackend".into(), splits: None, archiver_appliance: None, + channel_archiver: None, }) .collect(); netpod::Cluster {