diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 5caeb3b..bb33dbf 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -324,6 +324,10 @@ macro_rules! arm1 { $sty2 ), }, + Shape::Image(..) => { + // There should be no images on archiver. + err::todoval() + } } }}; } diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 8077137..d4219d2 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -22,8 +22,8 @@ byteorder = "1.4.3" futures-core = "0.3.14" futures-util = "0.3.14" async-stream = "0.3.0" -tracing = "0.1.25" -#tracing-futures = "0.2.5" +tracing = { version = "0.1.25", features = [] } +tracing-futures = { version = "0.2.5", features = ["futures-01", "futures-03", "std-future"] } fs2 = "0.4.3" libc = "0.2.93" hex = "0.4.3" diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 8cbcf27..6b068a1 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -112,6 +112,10 @@ where } } } + Shape::Image(..) => { + // TODO image binning/aggregation + err::todoval() + } } } diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 6930a82..4038761 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -142,6 +142,10 @@ where } } } + Shape::Image(..) => { + // TODO needed for binning or json event retrieval + err::todoval() + } } } diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index c991b9c..92db766 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -90,103 +90,120 @@ impl Stream for EventChunkerMultifile { type Item = Result>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("EventBlobsComplete poll_next on completed"); - } else if self.errored { - self.completed = true; - return Ready(None); - } else if self.data_completed { - self.completed = true; - return Ready(None); - } else { - match &mut self.evs { - Some(evs) => match evs.poll_next_unpin(cx) { - Ready(Some(k)) => Ready(Some(k)), - Ready(None) => { - self.seen_before_range_count += evs.seen_before_range_count(); - self.evs = None; - continue 'outer; - } - Pending => Pending, - }, - None => match self.file_chan.poll_next_unpin(cx) { - Ready(Some(k)) => match k { - Ok(ofs) => { - self.files_count += ofs.files.len() as u32; - if ofs.files.len() == 1 { - let mut ofs = ofs; - let file = ofs.files.pop().unwrap(); - let path = file.path; - let item = LogItem::quick(Level::INFO, format!("handle OFS {:?}", ofs)); - match file.file { - Some(file) => { - let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); - let chunker = EventChunker::from_event_boundary( - inp, - self.channel_config.clone(), - self.range.clone(), - self.event_chunker_conf.clone(), - path, - self.max_ts.clone(), - self.expand, - ); - self.evs = Some(Box::pin(chunker)); - } - None => {} - } - Ready(Some(Ok(StreamItem::Log(item)))) - } else if ofs.files.len() > 1 { - let item = LogItem::quick(Level::INFO, format!("handle OFS MULTIPLE {:?}", ofs)); - let mut chunkers = vec![]; - for of in ofs.files { - if let Some(file) = of.file { - let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); - let chunker = EventChunker::from_event_boundary( - inp, - self.channel_config.clone(), - self.range.clone(), - self.event_chunker_conf.clone(), - of.path, - self.max_ts.clone(), - self.expand, - ); - chunkers.push(chunker); - } - } - let merged = MergedBlobsStream::new(chunkers); - self.evs = Some(Box::pin(merged)); - Ready(Some(Ok(StreamItem::Log(item)))) - } else { - let item = LogItem::quick(Level::INFO, format!("handle OFS {:?} NO FILES", ofs)); - Ready(Some(Ok(StreamItem::Log(item)))) + let span1 = span!(Level::INFO, "EventChunkerMultifile", desc = tracing::field::Empty); + span1.record("desc", &""); + span1.in_scope(|| { + use Poll::*; + 'outer: loop { + break if self.completed { + panic!("EventBlobsComplete poll_next on completed"); + } else if self.errored { + self.completed = true; + return Ready(None); + } else if self.data_completed { + self.completed = true; + return Ready(None); + } else { + match &mut self.evs { + Some(evs) => match evs.poll_next_unpin(cx) { + Ready(Some(k)) => { + if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(h))) = &k { + info!("EventChunkerMultifile emit {} events", h.tss.len()); } + Ready(Some(k)) } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) + Ready(None) => { + self.seen_before_range_count += evs.seen_before_range_count(); + self.evs = None; + continue 'outer; } + Pending => Pending, }, - Ready(None) => { - self.data_completed = true; - let item = LogItem::quick( - Level::INFO, - format!( - "EventBlobsComplete used {} datafiles beg {} end {} node_ix {}", - self.files_count, - self.range.beg / SEC, - self.range.end / SEC, - self.node_ix - ), - ); - Ready(Some(Ok(StreamItem::Log(item)))) - } - Pending => Pending, - }, - } - }; - } + None => match self.file_chan.poll_next_unpin(cx) { + Ready(Some(k)) => match k { + Ok(ofs) => { + self.files_count += ofs.files.len() as u32; + if ofs.files.len() == 1 { + let mut ofs = ofs; + let file = ofs.files.pop().unwrap(); + let path = file.path; + let msg = format!("handle OFS {:?}", ofs); + info!("{}", msg); + let item = LogItem::quick(Level::INFO, msg); + match file.file { + Some(file) => { + let inp = + Box::pin(file_content_stream(file, self.buffer_size as usize)); + let chunker = EventChunker::from_event_boundary( + inp, + self.channel_config.clone(), + self.range.clone(), + self.event_chunker_conf.clone(), + path, + self.max_ts.clone(), + self.expand, + ); + self.evs = Some(Box::pin(chunker)); + } + None => {} + } + Ready(Some(Ok(StreamItem::Log(item)))) + } else if ofs.files.len() > 1 { + let msg = format!("handle OFS MULTIPLE {:?}", ofs); + warn!("{}", msg); + let item = LogItem::quick(Level::INFO, msg); + let mut chunkers = vec![]; + for of in ofs.files { + if let Some(file) = of.file { + let inp = + Box::pin(file_content_stream(file, self.buffer_size as usize)); + let chunker = EventChunker::from_event_boundary( + inp, + self.channel_config.clone(), + self.range.clone(), + self.event_chunker_conf.clone(), + of.path, + self.max_ts.clone(), + self.expand, + ); + chunkers.push(chunker); + } + } + let merged = MergedBlobsStream::new(chunkers); + self.evs = Some(Box::pin(merged)); + Ready(Some(Ok(StreamItem::Log(item)))) + } else { + let msg = format!("handle OFS {:?} NO FILES", ofs); + info!("{}", msg); + let item = LogItem::quick(Level::INFO, msg); + Ready(Some(Ok(StreamItem::Log(item)))) + } + } + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) + } + }, + Ready(None) => { + self.data_completed = true; + let item = LogItem::quick( + Level::INFO, + format!( + "EventBlobsComplete used {} datafiles beg {} end {} node_ix {}", + self.files_count, + self.range.beg / SEC, + self.range.end / SEC, + self.node_ix + ), + ); + Ready(Some(Ok(StreamItem::Log(item)))) + } + Pending => Pending, + }, + } + }; + } + }) } } diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index d88ac4d..f3b3492 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -10,6 +10,7 @@ use items::{ use netpod::log::*; use netpod::timeunits::SEC; use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape}; +use parse::channelconfig::CompressionMethod; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::path::PathBuf; use std::pin::Pin; @@ -155,9 +156,10 @@ impl EventChunker { } } DataFileState::Event => { + let p0 = 0; let mut sl = std::io::Cursor::new(buf.as_ref()); let len = sl.read_i32::().unwrap(); - if len < 20 || len > 1024 * 1024 * 10 { + if len < 20 || len > 1024 * 1024 * 20 { Err(Error::with_msg("unexpected large event chunk"))?; } let len = len as u32; @@ -256,6 +258,8 @@ impl EventChunker { if is_shaped { if shape_dim == 1 { Shape::Wave(shape_lens[0]) + } else if shape_dim == 2 { + Shape::Image(shape_lens[0], shape_lens[1]) } else { err::todoval() } @@ -263,27 +267,38 @@ impl EventChunker { Shape::Scalar } }; + let comp_this = if is_compressed { + if compression_method == 0 { + Some(CompressionMethod::BitshuffleLZ4) + } else { + err::todoval() + } + } else { + None + }; + let p1 = sl.position(); + let k1 = len as u64 - (p1 - p0) - 4; if is_compressed { //debug!("event ts {} is_compressed {}", ts, is_compressed); let value_bytes = sl.read_u64::().unwrap(); let block_size = sl.read_u32::().unwrap(); - let p1 = sl.position() as u32; - let k1 = len as u32 - p1 - 4; //debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size); - assert!(value_bytes < 1024 * 256); - assert!(block_size < 1024 * 32); + match self.channel_config.shape { + Shape::Scalar => { + assert!(value_bytes < 1024 * 1); + } + Shape::Wave(_) => { + assert!(value_bytes < 1024 * 64); + } + Shape::Image(_, _) => { + assert!(value_bytes < 1024 * 1024 * 20); + } + } + assert!(block_size <= 1024 * 32); let type_size = scalar_type.bytes() as u32; let ele_count = value_bytes / type_size as u64; let ele_size = type_size; match self.channel_config.shape { - Shape::Wave(dim1count) => { - if dim1count != ele_count as u32 { - Err(Error::with_msg(format!( - "ChannelConfig expects {:?} but event has {:?}", - self.channel_config.shape, ele_count, - )))?; - } - } Shape::Scalar => { if is_array { Err(Error::with_msg(format!( @@ -291,28 +306,48 @@ impl EventChunker { )))?; } } + Shape::Wave(dim1count) => { + if dim1count != ele_count as u32 { + Err(Error::with_msg(format!( + "ChannelConfig expects {:?} but event has ele_count {}", + self.channel_config.shape, ele_count, + )))?; + } + } + Shape::Image(n1, n2) => { + let nt = n1 as usize * n2 as usize; + if nt != ele_count as usize { + Err(Error::with_msg(format!( + "ChannelConfig expects {:?} but event has ele_count {}", + self.channel_config.shape, ele_count, + )))?; + } + } } let decomp_bytes = (type_size * ele_count as u32) as usize; let mut decomp = BytesMut::with_capacity(decomp_bytes); unsafe { decomp.set_len(decomp_bytes); } + // TODO limit the buf slice range match bitshuffle_decompress( - &buf.as_ref()[p1 as usize..], + &buf.as_ref()[(p1 as usize + 12)..(p1 as usize + k1 as usize)], &mut decomp, ele_count as usize, ele_size as usize, 0, ) { Ok(c1) => { - assert!(c1 as u32 == k1); + assert!(c1 as u64 + 12 == k1); ret.add_event( ts, pulse, + buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(), Some(decomp), ScalarType::from_dtype_index(type_index)?, is_big_endian, shape_this, + comp_this, ); } Err(e) => { @@ -320,7 +355,6 @@ impl EventChunker { } }; } else { - let p1 = sl.position(); if len < p1 as u32 + 4 { let msg = format!("uncomp len: {} p1: {}", len, p1); Err(Error::with_msg(msg))?; @@ -330,10 +364,12 @@ impl EventChunker { ret.add_event( ts, pulse, + buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(), Some(decomp), ScalarType::from_dtype_index(type_index)?, is_big_endian, shape_this, + comp_this, ); } buf.advance(len as usize); @@ -358,11 +394,13 @@ impl EventChunker { pub struct EventFull { pub tss: Vec, pub pulses: Vec, + pub blobs: Vec>, #[serde(serialize_with = "decomps_ser", deserialize_with = "decomps_de")] pub decomps: Vec>, pub scalar_types: Vec, pub be: Vec, pub shapes: Vec, + pub comps: Vec>, } fn decomps_ser(t: &Vec>, s: S) -> Result @@ -403,10 +441,12 @@ impl EventFull { Self { tss: vec![], pulses: vec![], + blobs: vec![], decomps: vec![], scalar_types: vec![], be: vec![], shapes: vec![], + comps: vec![], } } @@ -414,17 +454,21 @@ impl EventFull { &mut self, ts: u64, pulse: u64, + blob: Vec, decomp: Option, scalar_type: ScalarType, be: bool, shape: Shape, + comp: Option, ) { self.tss.push(ts); self.pulses.push(pulse); + self.blobs.push(blob); self.decomps.push(decomp); self.scalar_types.push(scalar_type); self.be.push(be); self.shapes.push(shape); + self.comps.push(comp); } } @@ -447,10 +491,12 @@ impl Appendable for EventFull { fn append(&mut self, src: &Self) { self.tss.extend_from_slice(&src.tss); self.pulses.extend_from_slice(&src.pulses); + self.blobs.extend_from_slice(&src.blobs); self.decomps.extend_from_slice(&src.decomps); self.scalar_types.extend_from_slice(&src.scalar_types); self.be.extend_from_slice(&src.be); self.shapes.extend_from_slice(&src.shapes); + self.comps.extend_from_slice(&src.comps); } } @@ -465,10 +511,12 @@ impl PushableIndex for EventFull { fn push_index(&mut self, src: &Self, ix: usize) { self.tss.push(src.tss[ix]); self.pulses.push(src.pulses[ix]); + self.blobs.push(src.blobs[ix].clone()); self.decomps.push(src.decomps[ix].clone()); self.scalar_types.push(src.scalar_types[ix].clone()); self.be.push(src.be[ix]); self.shapes.push(src.shapes[ix].clone()); + self.comps.push(src.comps[ix].clone()); } } @@ -523,16 +571,43 @@ impl Stream for EventChunker { // TODO gather stats about this: self.inp.put_back(fcr); } - if self.need_min > 1024 * 8 { - let msg = format!("spurious EventChunker asks for need_min {}", self.need_min); - self.errored = true; - Ready(Some(Err(Error::with_msg(msg)))) - } else { - let x = self.need_min; - self.inp.set_need_min(x); - let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events)); - Ready(Some(Ok(ret))) + match self.channel_config.shape { + Shape::Scalar => { + if self.need_min > 1024 * 8 { + let msg = + format!("spurious EventChunker asks for need_min {}", self.need_min); + self.errored = true; + return Ready(Some(Err(Error::with_msg(msg)))); + } + } + Shape::Wave(_) => { + if self.need_min > 1024 * 32 { + let msg = + format!("spurious EventChunker asks for need_min {}", self.need_min); + self.errored = true; + return Ready(Some(Err(Error::with_msg(msg)))); + } + } + Shape::Image(_, _) => { + if self.need_min > 1024 * 1024 * 20 { + let msg = + format!("spurious EventChunker asks for need_min {}", self.need_min); + self.errored = true; + return Ready(Some(Err(Error::with_msg(msg)))); + } + } } + let x = self.need_min; + self.inp.set_need_min(x); + { + info!( + "EventChunker emits {} events tss {:?}", + res.events.len(), + res.events.tss + ); + }; + let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events)); + Ready(Some(Ok(ret))) } Err(e) => { self.errored = true; diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index 302b9e7..ca6b21d 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -129,15 +129,12 @@ where (Some(None), buf, wp) } else { if len > 1024 * 1024 * 50 { - error!("InMemoryFrameAsyncReadStream too long len {}", len); - return ( - Some(Some(Err(Error::with_msg(format!( - "InMemoryFrameAsyncReadStream tryparse huge buffer len {} self.inp_bytes_consumed {}", - len, self.inp_bytes_consumed - ))))), - buf, - wp, + let msg = format!( + "InMemoryFrameAsyncReadStream tryparse huge buffer len {} self.inp_bytes_consumed {}", + len, self.inp_bytes_consumed ); + error!("{}", msg); + return (Some(Some(Err(Error::with_msg(msg)))), buf, wp); } else if len > 1024 * 1024 * 1 { // TODO //warn!("InMemoryFrameAsyncReadStream big len received {}", len); diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 64590cc..8820ad9 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -249,6 +249,10 @@ async fn gen_config( buf.put_i8(1); buf.put_i32(k as i32); } + Shape::Image(_, _) => { + // TODO test data + err::todoval() + } } let len = buf.len() - p3 - 4; buf.as_mut()[p3..].as_mut().put_i32(len as i32); diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 531e0fc..f887d6f 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -503,6 +503,9 @@ impl ChannelConfigExt for ChannelConfig { Shape::Wave(_) => { ret |= SHAPE; } + Shape::Image(_, _) => { + ret |= SHAPE; + } } if self.byte_order.is_be() { ret |= BIG_ENDIAN; diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 51345e4..8938bc3 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -62,7 +62,7 @@ where range_complete_observed_all: false, range_complete_observed_all_emitted: false, data_emit_complete: false, - batch_size: 64, + batch_size: 1, logitems: VecDeque::new(), event_data_read_stats_items: VecDeque::new(), } diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs index 1615846..32b4208 100644 --- a/disk/src/merge/mergedblobsfromremotes.rs +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -27,6 +27,7 @@ impl MergedBlobsFromRemotes { pub fn new(evq: RawEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { info!("MergedBlobsFromRemotes evq {:?}", evq); let mut tcp_establish_futs = vec![]; + for node in &cluster.nodes { let f = x_processed_event_blobs_stream_from_node(evq.clone(), perf_opts.clone(), node.clone()); let f: T002 = Box::pin(f); diff --git a/disk/src/mergeblobs.rs b/disk/src/mergeblobs.rs index 18dec52..4eb9d3e 100644 --- a/disk/src/mergeblobs.rs +++ b/disk/src/mergeblobs.rs @@ -58,7 +58,7 @@ where range_complete_observed_all: false, range_complete_observed_all_emitted: false, data_emit_complete: false, - batch_size: 64, + batch_size: 1, logitems: VecDeque::new(), event_data_read_stats_items: VecDeque::new(), } @@ -188,6 +188,13 @@ where let emp = I::empty(); let ret = std::mem::replace(&mut self.batch, emp); self.data_emit_complete = true; + { + let mut aa = vec![]; + for ii in 0..ret.len() { + aa.push(ret.ts(ii)); + } + info!("MergedBlobsStream A emits {} events tss {:?}", ret.len(), aa); + }; Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) } else { self.data_emit_complete = true; @@ -220,6 +227,13 @@ where if self.batch.len() >= self.batch_size { let emp = I::empty(); let ret = std::mem::replace(&mut self.batch, emp); + { + let mut aa = vec![]; + for ii in 0..ret.len() { + aa.push(ret.ts(ii)); + } + info!("MergedBlobsStream B emits {} events tss {:?}", ret.len(), aa); + }; Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) } else { continue 'outer; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 7290494..56b0a94 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -3,15 +3,16 @@ use crate::decode::{ EventsDecodedStream, LittleEndian, NumFromBytes, }; use crate::eventblobs::EventChunkerMultifile; -use crate::eventchunker::EventChunkerConf; +use crate::eventchunker::{EventChunkerConf, EventFull}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use items::numops::{BoolNum, NumOps}; use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem}; use netpod::query::RawEventsQuery; -use netpod::{AggKind, ByteOrder, ByteSize, NodeConfigCached, ScalarType, Shape}; -use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry}; +use netpod::{AggKind, ByteOrder, ByteSize, Channel, NanoRange, NodeConfigCached, ScalarType, Shape}; + +use parse::channelconfig::{extract_matching_config_entry, read_local_config, ConfigEntry, MatchingConfigEntry}; use std::pin::Pin; fn make_num_pipeline_stream_evs( @@ -96,6 +97,10 @@ macro_rules! pipe3 { $event_blobs ) } + Shape::Image(_, _) => { + // TODO not needed for python data api v3 protocol, but later for api4. + err::todoval() + } } }; } @@ -193,6 +198,94 @@ pub async fn make_event_pipe( Ok(pipe) } +pub async fn get_applicable_entry( + range: &NanoRange, + channel: Channel, + node_config: &NodeConfigCached, +) -> Result { + let channel_config = read_local_config(channel, node_config.node.clone()).await?; + let entry_res = match extract_matching_config_entry(range, &channel_config) { + Ok(k) => k, + Err(e) => return Err(e)?, + }; + let entry = match entry_res { + MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?, + MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?, + MatchingConfigEntry::Entry(entry) => entry, + }; + Ok(entry.clone()) +} + +pub fn make_local_event_blobs_stream( + range: NanoRange, + channel: Channel, + entry: &ConfigEntry, + expand: bool, + event_chunker_conf: EventChunkerConf, + disk_io_buffer_size: usize, + node_config: &NodeConfigCached, +) -> Result { + let shape = match entry.to_shape() { + Ok(k) => k, + Err(e) => return Err(e)?, + }; + let channel_config = netpod::ChannelConfig { + channel, + keyspace: entry.ks as u8, + time_bin_size: entry.bs, + shape: shape, + scalar_type: entry.scalar_type.clone(), + byte_order: entry.byte_order.clone(), + array: entry.is_array, + compression: entry.is_compressed, + }; + let event_blobs = EventChunkerMultifile::new( + range, + channel_config.clone(), + node_config.node.clone(), + node_config.ix, + disk_io_buffer_size, + event_chunker_conf, + expand, + ); + Ok(event_blobs) +} + +pub fn make_remote_event_blobs_stream( + range: NanoRange, + channel: Channel, + entry: &ConfigEntry, + expand: bool, + event_chunker_conf: EventChunkerConf, + disk_io_buffer_size: usize, + node_config: &NodeConfigCached, +) -> Result>, Error> { + let shape = match entry.to_shape() { + Ok(k) => k, + Err(e) => return Err(e)?, + }; + let channel_config = netpod::ChannelConfig { + channel, + keyspace: entry.ks as u8, + time_bin_size: entry.bs, + shape: shape, + scalar_type: entry.scalar_type.clone(), + byte_order: entry.byte_order.clone(), + array: entry.is_array, + compression: entry.is_compressed, + }; + let event_blobs = EventChunkerMultifile::new( + range, + channel_config.clone(), + node_config.node.clone(), + node_config.ix, + disk_io_buffer_size, + event_chunker_conf, + expand, + ); + Ok(event_blobs) +} + pub async fn make_event_blobs_pipe( evq: &RawEventsQuery, node_config: &NodeConfigCached, @@ -203,53 +296,40 @@ pub async fn make_event_blobs_pipe( Err(e) => return Err(e)?, } } + let expand = evq.agg_kind.need_expand(); let range = &evq.range; - let channel_config = match read_local_config(evq.channel.clone(), node_config.node.clone()).await { - Ok(k) => k, - Err(e) => { - if e.msg().contains("ErrorKind::NotFound") { - let s = futures_util::stream::empty(); - return Ok(Box::pin(s)); - } else { - return Err(e)?; - } - } - }; - let entry_res = match extract_matching_config_entry(range, &channel_config) { - Ok(k) => k, - Err(e) => return Err(e)?, - }; - let entry = match entry_res { - MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?, - MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?, - MatchingConfigEntry::Entry(entry) => entry, - }; - let shape = match entry.to_shape() { - Ok(k) => k, - Err(e) => return Err(e)?, - }; - let channel_config = netpod::ChannelConfig { - channel: evq.channel.clone(), - keyspace: entry.ks as u8, - time_bin_size: entry.bs, - shape: shape, - scalar_type: entry.scalar_type.clone(), - byte_order: entry.byte_order.clone(), - array: entry.is_array, - compression: entry.is_compressed, - }; + let entry = get_applicable_entry(&evq.range, evq.channel.clone(), node_config).await?; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let event_blobs = EventChunkerMultifile::new( - range.clone(), - channel_config.clone(), - node_config.node.clone(), - node_config.ix, - evq.disk_io_buffer_size, - event_chunker_conf, - true, - ); - let s = event_blobs.map(|item| Box::new(item) as Box); - let pipe: Pin> + Send>>; - pipe = Box::pin(s); + let pipe = if true { + let event_blobs = make_remote_event_blobs_stream( + range.clone(), + evq.channel.clone(), + &entry, + expand, + event_chunker_conf, + evq.disk_io_buffer_size, + node_config, + )?; + let s = event_blobs.map(|item| Box::new(item) as Box); + //let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe")); + let pipe: Pin> + Send>>; + pipe = Box::pin(s); + pipe + } else { + let event_blobs = make_local_event_blobs_stream( + range.clone(), + evq.channel.clone(), + &entry, + expand, + event_chunker_conf, + evq.disk_io_buffer_size, + node_config, + )?; + let s = event_blobs.map(|item| Box::new(item) as Box); + //let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe")); + let pipe: Pin> + Send>>; + pipe = Box::pin(s); + pipe + }; Ok(pipe) } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index ed36286..f59baa3 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -1,15 +1,16 @@ use crate::gather::{gather_get_json_generic, SubRes}; use crate::{response, BodyStream}; use bytes::{BufMut, BytesMut}; +use disk::eventchunker::{EventChunkerConf, EventFull}; use err::Error; use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; -use items::{RangeCompletableItem, StreamItem}; +use items::{RangeCompletableItem, Sitemty, StreamItem}; use itertools::Itertools; use netpod::query::RawEventsQuery; -use netpod::{log::*, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, APP_OCTET}; +use netpod::{log::*, ByteSize, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape, APP_OCTET}; use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, Config, MatchingConfigEntry}; use serde::{Deserialize, Serialize}; @@ -566,6 +567,8 @@ impl Stream for DataApiPython3DataStream { MatchingConfigEntry::Entry(entry) => entry.clone(), }; warn!("found channel_config {:?}", entry); + + // TODO pull out the performance settings let evq = RawEventsQuery { channel: self.channels[self.chan_ix - 1].clone(), range: self.range.clone(), @@ -573,11 +576,30 @@ impl Stream for DataApiPython3DataStream { disk_io_buffer_size: 1024 * 4, }; let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 }; - let s = disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes::new( - evq, - perf_opts, - self.node_config.node_config.cluster.clone(), - ); + // TODO is this a good to place decide this? + let s = if self.node_config.node_config.cluster.is_central_storage { + info!("Set up central storage stream"); + // TODO pull up this config + let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + let s = disk::raw::conn::make_local_event_blobs_stream( + evq.range.clone(), + evq.channel.clone(), + &entry, + evq.agg_kind.need_expand(), + event_chunker_conf, + evq.disk_io_buffer_size, + &self.node_config, + )?; + Box::pin(s) as Pin> + Send>> + } else { + info!("Set up merged remote stream"); + let s = disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes::new( + evq, + perf_opts, + self.node_config.node_config.cluster.clone(), + ); + Box::pin(s) as Pin> + Send>> + }; let s = s.map({ let mut header_out = false; let mut count_events = 0; @@ -591,13 +613,15 @@ impl Stream for DataApiPython3DataStream { for i1 in 0..b.tss.len() { if count_events < 6 { info!( - "deco len {:?} BE {} scalar-type {:?} shape {:?}", + "deco len {:?} BE {} scalar-type {:?} shape {:?} comps {:?}", b.decomps[i1].as_ref().map(|x| x.len()), b.be[i1], b.scalar_types[i1], - b.shapes[i1] + b.shapes[i1], + b.comps[i1], ); } + let compression = if let (Shape::Image(..), Some(..)) = (&b.shapes[i1], &b.comps[i1]) { Some(1) } else { None }; if !header_out { let head = Api1ChannelHeader { name: channel.name.clone(), @@ -611,7 +635,7 @@ impl Stream for DataApiPython3DataStream { // The shape is inconsistent on the events. // Seems like the config is to be trusted in this case. shape: shape_to_api3proto(&entry.shape), - compression: None, + compression, }; let h = serde_json::to_string(&head)?; info!("sending channel header {}", h); @@ -623,14 +647,27 @@ impl Stream for DataApiPython3DataStream { header_out = true; } { - if let Some(deco) = &b.decomps[i1] { - let l1 = 17 + deco.len() as u32; - d.put_u32(l1); - d.put_u8(1); - d.put_u64(b.tss[i1]); - d.put_u64(b.pulses[i1]); - d.put_slice(&deco); - d.put_u32(l1); + match &b.shapes[i1] { + Shape::Image(_, _) => { + let l1 = 17 + b.blobs[i1].len() as u32; + d.put_u32(l1); + d.put_u8(1); + d.put_u64(b.tss[i1]); + d.put_u64(b.pulses[i1]); + d.put_slice(&b.blobs[i1]); + d.put_u32(l1); + } + _ => { + if let Some(deco) = &b.decomps[i1] { + let l1 = 17 + deco.len() as u32; + d.put_u32(l1); + d.put_u8(1); + d.put_u64(b.tss[i1]); + d.put_u64(b.pulses[i1]); + d.put_slice(&deco); + d.put_u32(l1); + } + } } } count_events += 1; @@ -710,7 +747,8 @@ pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCach beg: beg_ns, end: end_ns, }; - let backend = "sf-databuffer"; + // TODO use the proper backend name: + let backend = "DUMMY"; let chans = qu .channels .iter() @@ -719,120 +757,10 @@ pub async fn api1_binary_events(req: Request, node_config: &NodeConfigCach name: x.clone(), }) .collect(); - if true { - let s = DataApiPython3DataStream::new(range.clone(), chans, node_config.clone()); - let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", "dummy"); - let ret = ret.body(BodyStream::wrapped(s, format!("plain_events")))?; - return Ok(ret); - } - // TODO to server multiple channels, I need to wrap the loop over channels in a Stream itself. - let channel = qu.channels[0].clone(); - let channel = Channel { - backend: backend.into(), - name: channel, - }; - let channel_config = { - let channel_config = match read_local_config(channel.clone(), node_config.node.clone()).await { - Ok(k) => k, - Err(e) => { - error!("api1_binary_events error {:?}", e); - return Err(Error::with_msg_no_trace("can not parse channel config")); - } - }; - let entry_res = match extract_matching_config_entry(&range, &channel_config) { - Ok(k) => k, - Err(e) => return Err(e)?, - }; - let entry = match entry_res { - MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?, - MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?, - MatchingConfigEntry::Entry(entry) => entry, - }; - entry.clone() - }; - warn!("found channel_config {:?}", channel_config); - let evq = RawEventsQuery { - channel: channel.clone(), - range, - agg_kind: netpod::AggKind::EventBlobs, - disk_io_buffer_size: 1024 * 4, - }; - let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 }; - let s = disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes::new( - evq, - perf_opts, - node_config.node_config.cluster.clone(), - ); - let s = s.map({ - let mut header_out = false; - let mut count_events = 0; - move |b| { - let ret = match b { - Ok(b) => { - let f = match b { - StreamItem::DataItem(RangeCompletableItem::Data(b)) => { - let mut d = BytesMut::new(); - for i1 in 0..b.tss.len() { - if count_events < 6 { - info!( - "deco len {:?} BE {} scalar-type {:?} shape {:?}", - b.decomps[i1].as_ref().map(|x| x.len()), - b.be[i1], - b.scalar_types[i1], - b.shapes[i1] - ); - } - if !header_out { - let head = Api1ChannelHeader { - name: channel.name.clone(), - ty: scalar_type_to_api3proto(&b.scalar_types[i1]).into(), - byte_order: if b.be[i1] { - "BIG_ENDIAN".into() - } else { - "LITTLE_ENDIAN".into() - }, - // The shape is inconsistent on the events. - // Seems like the config is to be trusted in this case. - shape: shape_to_api3proto(&channel_config.shape), - //shape: vec![2560], - compression: None, - }; - let h = serde_json::to_string(&head)?; - info!("sending channel header {}", h); - let l1 = 1 + h.as_bytes().len() as u32; - d.put_u32(l1); - d.put_u8(0); - d.extend_from_slice(h.as_bytes()); - d.put_u32(l1); - header_out = true; - } - { - if let Some(deco) = &b.decomps[i1] { - let l1 = 17 + deco.len() as u32; - d.put_u32(l1); - d.put_u8(1); - d.put_u64(b.tss[i1]); - d.put_u64(b.pulses[i1]); - d.put_slice(&deco); - d.put_u32(l1); - } - } - count_events += 1; - } - d - } - _ => BytesMut::new(), - }; - Ok(f) - } - Err(e) => Err(e), - }; - ret - } - }); + let s = DataApiPython3DataStream::new(range.clone(), chans, node_config.clone()); let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", "dummy"); - let ret = ret.body(BodyStream::wrapped(s, format!("plain_events")))?; - Ok(ret) + let ret = ret.body(BodyStream::wrapped(s, format!("api1_binary_events")))?; + return Ok(ret); } fn scalar_type_to_api3proto(sty: &ScalarType) -> &'static str { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 51a8e76..7bab114 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -39,7 +39,11 @@ fn proxy_mark() -> &'static str { } pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { - let _update_task = UpdateTask::new(node_config.clone()); + let _update_task = if node_config.node_config.cluster.run_map_pulse_task { + Some(UpdateTask::new(node_config.clone())) + } else { + None + }; let rawjh = taskrun::spawn(events_service(node_config.clone())); use std::str::FromStr; let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?; @@ -485,7 +489,7 @@ async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) ); let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; let s = s.map(|item| item.make_frame()); - let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?; + let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events_binary")))?; Ok(ret) } @@ -502,7 +506,7 @@ async fn plain_events_json(req: Request, node_config: &NodeConfigCached) - query.do_log(), ); let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; - let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?; + let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events_json")))?; Ok(ret) } diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs index fc75048..b8dddd2 100644 --- a/netfetch/src/test.rs +++ b/netfetch/src/test.rs @@ -32,6 +32,8 @@ fn ca_connect_1() { user: "".into(), pass: "".into(), }, + run_map_pulse_task: false, + is_central_storage: false, }, }, ix: 0, diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 3aa432e..f4f1a0f 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -161,10 +161,18 @@ pub struct Database { pub pass: String, } +fn bool_false() -> bool { + false +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Cluster { pub nodes: Vec, pub database: Database, + #[serde(default = "bool_false")] + pub run_map_pulse_task: bool, + #[serde(default = "bool_false")] + pub is_central_storage: bool, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -359,6 +367,7 @@ pub struct ChannelConfig { pub enum Shape { Scalar, Wave(u32), + Image(u32, u32), } pub trait HasShape { @@ -479,7 +488,14 @@ fn get_patch_t_len(bin_t_len: u64) -> u64 { } } } - Shape::Wave(_) => { + Shape::Wave(..) => { + for (i1, &j) in PATCH_T_LEN_KEY.iter().enumerate() { + if bin_t_len == j { + return PATCH_T_LEN_OPTIONS_WAVE[i1]; + } + } + } + Shape::Image(..) => { for (i1, &j) in PATCH_T_LEN_KEY.iter().enumerate() { if bin_t_len == j { return PATCH_T_LEN_OPTIONS_WAVE[i1]; @@ -737,6 +753,16 @@ impl AggKind { Self::Plain => false, } } + + pub fn need_expand(&self) -> bool { + match self { + Self::EventBlobs => false, + Self::TimeWeightedScalar => true, + Self::DimXBins1 => false, + Self::DimXBinsN(_) => false, + Self::Plain => false, + } + } } pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize { @@ -749,6 +775,7 @@ pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize { match shape { Shape::Scalar => 0, Shape::Wave(n) => *n as usize, + Shape::Image(j, k) => *j as usize * *k as usize, } } else { *n as usize @@ -757,6 +784,7 @@ pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize { AggKind::Plain => match shape { Shape::Scalar => 0, Shape::Wave(n) => *n as usize, + Shape::Image(j, k) => *j as usize * *k as usize, }, } } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 86b4f90..381b40e 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -30,7 +30,7 @@ pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> async fn events_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: NodeConfigCached) -> Result<(), Error> { //use tracing_futures::Instrument; - let span1 = span!(Level::INFO, "raw::raw_conn_handler"); + let span1 = span!(Level::INFO, "events_conn_handler"); let r = events_conn_handler_inner(stream, addr, &node_config) .instrument(span1) .await; @@ -85,7 +85,7 @@ async fn events_conn_handler_inner_try( let mut frames = vec![]; while let Some(k) = h .next() - .instrument(span!(Level::INFO, "raw_conn_handler INPUT STREAM READ")) + .instrument(span!(Level::INFO, "events_conn_handler INPUT STREAM READ")) .await { match k { @@ -139,10 +139,13 @@ async fn events_conn_handler_inner_try( //info!("conn.rs encode frame typeid {:x}", item.typeid()); let item = item.make_frame(); match item { - Ok(buf) => match netout.write_all(&buf).await { - Ok(_) => {} - Err(e) => return Err((e, netout))?, - }, + Ok(buf) => { + info!("events_conn_handler send {} bytes", buf.len()); + match netout.write_all(&buf).await { + Ok(_) => {} + Err(e) => return Err((e, netout))?, + } + } Err(e) => { return Err((e, netout))?; } diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index bf67919..a5719c8 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -73,7 +73,11 @@ impl ConfigEntry { Some(lens) => { if lens.len() == 1 { Shape::Wave(lens[0]) + } else if lens.len() == 2 { + Shape::Image(lens[0], lens[1]) } else { + // TODO + // Need a new Shape variant for images. return Err(Error::with_msg(format!("Channel config unsupported shape {:?}", self)))?; } } diff --git a/taskrun/src/append.rs b/taskrun/src/append.rs index 56f4829..2a302e2 100644 --- a/taskrun/src/append.rs +++ b/taskrun/src/append.rs @@ -1,7 +1,7 @@ use err::Error; use std::borrow::Cow; use std::fs; -use std::io::{self, BufWriter, Read, Stderr, Stdin, Write}; +use std::io::{BufWriter, Read, Seek, SeekFrom, Stderr, Stdin, Write}; use std::path::{Path, PathBuf}; pub struct Buffer { @@ -98,18 +98,59 @@ fn parse_lines(buf: &[u8]) -> Result<(Vec>, usize), Error> { Ok((ret, i2)) } -const MAX_PER_FILE: usize = 1024 * 1024 * 2; -const MAX_TOTAL_SIZE: usize = 1024 * 1024 * 20; +const MAX_PER_FILE: u64 = 1024 * 1024 * 2; +const MAX_TOTAL_SIZE: u64 = 1024 * 1024 * 20; -fn next_file(dir: &Path, append: bool, truncate: bool) -> io::Result> { +struct Fileinfo { + path: PathBuf, + name: String, + len: u64, +} + +fn file_list(dir: &Path) -> Result, Error> { + let mut ret = vec![]; + let rd = fs::read_dir(&dir)?; + for e in rd { + let e = e?; + let fnos = e.file_name(); + let fns = fnos.to_str().unwrap_or(""); + if fns.starts_with("info-20") && fns.ends_with(".log") { + let meta = e.metadata()?; + let info = Fileinfo { + path: e.path(), + name: fns.into(), + len: meta.len(), + }; + ret.push(info); + } + } + ret.sort_by(|a, b| std::cmp::Ord::cmp(&a.name, &b.name)); + Ok(ret) +} + +fn open_latest_or_new(dir: &Path) -> Result, Error> { + let list = file_list(dir)?; + if let Some(latest) = list.last() { + if latest.len < MAX_PER_FILE { + let ret = fs::OpenOptions::new().write(true).append(true).open(&latest.path)?; + let ret = BufWriter::new(ret); + return Ok(ret); + } + } + next_file(dir) +} + +fn next_file(dir: &Path) -> Result, Error> { let ts = chrono::Utc::now(); let s = ts.format("%Y-%m-%d--%H-%M-%S").to_string(); - let ret = fs::OpenOptions::new() + let mut ret = fs::OpenOptions::new() .write(true) .create(true) - .append(append) - .truncate(truncate) + .append(true) .open(dir.join(format!("info-{}.log", s)))?; + if ret.seek(SeekFrom::Current(0))? != 0 { + return Err(Error::with_msg_no_trace("new file already exists")); + } let ret = BufWriter::new(ret); Ok(ret) } @@ -117,7 +158,7 @@ fn next_file(dir: &Path, append: bool, truncate: bool) -> io::Result Result<(), Error> { let mut bytes_written = 0; let dir = PathBuf::from(dirname); - let mut fout = next_file(&dir, true, false)?; + let mut fout = open_latest_or_new(&dir)?; let mut buf = Buffer::new(); loop { // Get some more data. @@ -144,7 +185,7 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin, _stderr: Stderr) -> Result< let j = line.as_bytes(); fout.write_all(j)?; fout.write_all(b"\n")?; - bytes_written += j.len() + 1; + bytes_written += j.len() as u64 + 1; } buf.advance(n2); } @@ -154,38 +195,41 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin, _stderr: Stderr) -> Result< } } fout.flush()?; - if bytes_written >= MAX_PER_FILE { + if bytes_written >= (MAX_PER_FILE >> 3) { bytes_written = 0; - let rd = fs::read_dir(&dir)?; - let mut w = vec![]; - for e in rd { - let e = e?; - let fnos = e.file_name(); - let fns = fnos.to_str().unwrap(); - if fns.starts_with("info-20") && fns.ends_with(".log") { - let meta = e.metadata()?; - w.push((e.path(), meta.len())); + let l1 = fout.seek(SeekFrom::End(0))?; + if l1 >= MAX_PER_FILE { + let rd = fs::read_dir(&dir)?; + let mut w = vec![]; + for e in rd { + let e = e?; + let fnos = e.file_name(); + let fns = fnos.to_str().unwrap(); + if fns.starts_with("info-20") && fns.ends_with(".log") { + let meta = e.metadata()?; + w.push((e.path(), meta.len())); + } } - } - w.sort_by(|a, b| std::cmp::Ord::cmp(a, b)); - for q in &w { - write!(&mut fout, "file:::: {}\n", q.0.to_string_lossy())?; - } - let mut lentot = w.iter().map(|g| g.1).fold(0, |a, x| a + x); - write!(&mut fout, "lentot: {}\n", lentot)?; - for q in w { - if lentot <= MAX_TOTAL_SIZE as u64 { - break; + w.sort_by(|a, b| std::cmp::Ord::cmp(a, b)); + for q in &w { + write!(&mut fout, "file:::: {}\n", q.0.to_string_lossy())?; } - write!(&mut fout, "REMOVE {} {}\n", q.1, q.0.to_string_lossy())?; - fs::remove_file(q.0)?; - if q.1 < lentot { - lentot -= q.1; - } else { - lentot = 0; + let mut lentot = w.iter().map(|g| g.1).fold(0, |a, x| a + x); + write!(&mut fout, "lentot: {}\n", lentot)?; + for q in w { + if lentot <= MAX_TOTAL_SIZE as u64 { + break; + } + write!(&mut fout, "REMOVE {} {}\n", q.1, q.0.to_string_lossy())?; + fs::remove_file(q.0)?; + if q.1 < lentot { + lentot -= q.1; + } else { + lentot = 0; + } } - } - fout = next_file(&dir, true, false)?; + fout = next_file(&dir)?; + }; } } } diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 0ce7d99..228ec97 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -130,5 +130,7 @@ pub fn test_cluster() -> netpod::Cluster { user: "daqbuffer".into(), pass: "daqbuffer".into(), }, + run_map_pulse_task: false, + is_central_storage: false, } }