Download small image sample

This commit is contained in:
Dominik Werder
2021-09-14 21:59:17 +02:00
parent 8f33b894a8
commit 22ba7bb0d3
21 changed files with 576 additions and 358 deletions

View File

@@ -324,6 +324,10 @@ macro_rules! arm1 {
$sty2
),
},
Shape::Image(..) => {
// There should be no images on archiver.
err::todoval()
}
}
}};
}

View File

@@ -22,8 +22,8 @@ byteorder = "1.4.3"
futures-core = "0.3.14"
futures-util = "0.3.14"
async-stream = "0.3.0"
tracing = "0.1.25"
#tracing-futures = "0.2.5"
tracing = { version = "0.1.25", features = [] }
tracing-futures = { version = "0.2.5", features = ["futures-01", "futures-03", "std-future"] }
fs2 = "0.4.3"
libc = "0.2.93"
hex = "0.4.3"

View File

@@ -112,6 +112,10 @@ where
}
}
}
Shape::Image(..) => {
// TODO image binning/aggregation
err::todoval()
}
}
}

View File

@@ -142,6 +142,10 @@ where
}
}
}
Shape::Image(..) => {
// TODO needed for binning or json event retrieval
err::todoval()
}
}
}

View File

@@ -90,103 +90,120 @@ impl Stream for EventChunkerMultifile {
type Item = Result<StreamItem<RangeCompletableItem<EventFull>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
break if self.completed {
panic!("EventBlobsComplete poll_next on completed");
} else if self.errored {
self.completed = true;
return Ready(None);
} else if self.data_completed {
self.completed = true;
return Ready(None);
} else {
match &mut self.evs {
Some(evs) => match evs.poll_next_unpin(cx) {
Ready(Some(k)) => Ready(Some(k)),
Ready(None) => {
self.seen_before_range_count += evs.seen_before_range_count();
self.evs = None;
continue 'outer;
}
Pending => Pending,
},
None => match self.file_chan.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(ofs) => {
self.files_count += ofs.files.len() as u32;
if ofs.files.len() == 1 {
let mut ofs = ofs;
let file = ofs.files.pop().unwrap();
let path = file.path;
let item = LogItem::quick(Level::INFO, format!("handle OFS {:?}", ofs));
match file.file {
Some(file) => {
let inp = Box::pin(file_content_stream(file, self.buffer_size as usize));
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),
self.range.clone(),
self.event_chunker_conf.clone(),
path,
self.max_ts.clone(),
self.expand,
);
self.evs = Some(Box::pin(chunker));
}
None => {}
}
Ready(Some(Ok(StreamItem::Log(item))))
} else if ofs.files.len() > 1 {
let item = LogItem::quick(Level::INFO, format!("handle OFS MULTIPLE {:?}", ofs));
let mut chunkers = vec![];
for of in ofs.files {
if let Some(file) = of.file {
let inp = Box::pin(file_content_stream(file, self.buffer_size as usize));
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),
self.range.clone(),
self.event_chunker_conf.clone(),
of.path,
self.max_ts.clone(),
self.expand,
);
chunkers.push(chunker);
}
}
let merged = MergedBlobsStream::new(chunkers);
self.evs = Some(Box::pin(merged));
Ready(Some(Ok(StreamItem::Log(item))))
} else {
let item = LogItem::quick(Level::INFO, format!("handle OFS {:?} NO FILES", ofs));
Ready(Some(Ok(StreamItem::Log(item))))
let span1 = span!(Level::INFO, "EventChunkerMultifile", desc = tracing::field::Empty);
span1.record("desc", &"");
span1.in_scope(|| {
use Poll::*;
'outer: loop {
break if self.completed {
panic!("EventBlobsComplete poll_next on completed");
} else if self.errored {
self.completed = true;
return Ready(None);
} else if self.data_completed {
self.completed = true;
return Ready(None);
} else {
match &mut self.evs {
Some(evs) => match evs.poll_next_unpin(cx) {
Ready(Some(k)) => {
if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(h))) = &k {
info!("EventChunkerMultifile emit {} events", h.tss.len());
}
Ready(Some(k))
}
Err(e) => {
self.errored = true;
Ready(Some(Err(e)))
Ready(None) => {
self.seen_before_range_count += evs.seen_before_range_count();
self.evs = None;
continue 'outer;
}
Pending => Pending,
},
Ready(None) => {
self.data_completed = true;
let item = LogItem::quick(
Level::INFO,
format!(
"EventBlobsComplete used {} datafiles beg {} end {} node_ix {}",
self.files_count,
self.range.beg / SEC,
self.range.end / SEC,
self.node_ix
),
);
Ready(Some(Ok(StreamItem::Log(item))))
}
Pending => Pending,
},
}
};
}
None => match self.file_chan.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(ofs) => {
self.files_count += ofs.files.len() as u32;
if ofs.files.len() == 1 {
let mut ofs = ofs;
let file = ofs.files.pop().unwrap();
let path = file.path;
let msg = format!("handle OFS {:?}", ofs);
info!("{}", msg);
let item = LogItem::quick(Level::INFO, msg);
match file.file {
Some(file) => {
let inp =
Box::pin(file_content_stream(file, self.buffer_size as usize));
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),
self.range.clone(),
self.event_chunker_conf.clone(),
path,
self.max_ts.clone(),
self.expand,
);
self.evs = Some(Box::pin(chunker));
}
None => {}
}
Ready(Some(Ok(StreamItem::Log(item))))
} else if ofs.files.len() > 1 {
let msg = format!("handle OFS MULTIPLE {:?}", ofs);
warn!("{}", msg);
let item = LogItem::quick(Level::INFO, msg);
let mut chunkers = vec![];
for of in ofs.files {
if let Some(file) = of.file {
let inp =
Box::pin(file_content_stream(file, self.buffer_size as usize));
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),
self.range.clone(),
self.event_chunker_conf.clone(),
of.path,
self.max_ts.clone(),
self.expand,
);
chunkers.push(chunker);
}
}
let merged = MergedBlobsStream::new(chunkers);
self.evs = Some(Box::pin(merged));
Ready(Some(Ok(StreamItem::Log(item))))
} else {
let msg = format!("handle OFS {:?} NO FILES", ofs);
info!("{}", msg);
let item = LogItem::quick(Level::INFO, msg);
Ready(Some(Ok(StreamItem::Log(item))))
}
}
Err(e) => {
self.errored = true;
Ready(Some(Err(e)))
}
},
Ready(None) => {
self.data_completed = true;
let item = LogItem::quick(
Level::INFO,
format!(
"EventBlobsComplete used {} datafiles beg {} end {} node_ix {}",
self.files_count,
self.range.beg / SEC,
self.range.end / SEC,
self.node_ix
),
);
Ready(Some(Ok(StreamItem::Log(item))))
}
Pending => Pending,
},
}
};
}
})
}
}

View File

@@ -10,6 +10,7 @@ use items::{
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape};
use parse::channelconfig::CompressionMethod;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::path::PathBuf;
use std::pin::Pin;
@@ -155,9 +156,10 @@ impl EventChunker {
}
}
DataFileState::Event => {
let p0 = 0;
let mut sl = std::io::Cursor::new(buf.as_ref());
let len = sl.read_i32::<BE>().unwrap();
if len < 20 || len > 1024 * 1024 * 10 {
if len < 20 || len > 1024 * 1024 * 20 {
Err(Error::with_msg("unexpected large event chunk"))?;
}
let len = len as u32;
@@ -256,6 +258,8 @@ impl EventChunker {
if is_shaped {
if shape_dim == 1 {
Shape::Wave(shape_lens[0])
} else if shape_dim == 2 {
Shape::Image(shape_lens[0], shape_lens[1])
} else {
err::todoval()
}
@@ -263,27 +267,38 @@ impl EventChunker {
Shape::Scalar
}
};
let comp_this = if is_compressed {
if compression_method == 0 {
Some(CompressionMethod::BitshuffleLZ4)
} else {
err::todoval()
}
} else {
None
};
let p1 = sl.position();
let k1 = len as u64 - (p1 - p0) - 4;
if is_compressed {
//debug!("event ts {} is_compressed {}", ts, is_compressed);
let value_bytes = sl.read_u64::<BE>().unwrap();
let block_size = sl.read_u32::<BE>().unwrap();
let p1 = sl.position() as u32;
let k1 = len as u32 - p1 - 4;
//debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size);
assert!(value_bytes < 1024 * 256);
assert!(block_size < 1024 * 32);
match self.channel_config.shape {
Shape::Scalar => {
assert!(value_bytes < 1024 * 1);
}
Shape::Wave(_) => {
assert!(value_bytes < 1024 * 64);
}
Shape::Image(_, _) => {
assert!(value_bytes < 1024 * 1024 * 20);
}
}
assert!(block_size <= 1024 * 32);
let type_size = scalar_type.bytes() as u32;
let ele_count = value_bytes / type_size as u64;
let ele_size = type_size;
match self.channel_config.shape {
Shape::Wave(dim1count) => {
if dim1count != ele_count as u32 {
Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has {:?}",
self.channel_config.shape, ele_count,
)))?;
}
}
Shape::Scalar => {
if is_array {
Err(Error::with_msg(format!(
@@ -291,28 +306,48 @@ impl EventChunker {
)))?;
}
}
Shape::Wave(dim1count) => {
if dim1count != ele_count as u32 {
Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has ele_count {}",
self.channel_config.shape, ele_count,
)))?;
}
}
Shape::Image(n1, n2) => {
let nt = n1 as usize * n2 as usize;
if nt != ele_count as usize {
Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has ele_count {}",
self.channel_config.shape, ele_count,
)))?;
}
}
}
let decomp_bytes = (type_size * ele_count as u32) as usize;
let mut decomp = BytesMut::with_capacity(decomp_bytes);
unsafe {
decomp.set_len(decomp_bytes);
}
// TODO limit the buf slice range
match bitshuffle_decompress(
&buf.as_ref()[p1 as usize..],
&buf.as_ref()[(p1 as usize + 12)..(p1 as usize + k1 as usize)],
&mut decomp,
ele_count as usize,
ele_size as usize,
0,
) {
Ok(c1) => {
assert!(c1 as u32 == k1);
assert!(c1 as u64 + 12 == k1);
ret.add_event(
ts,
pulse,
buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(),
Some(decomp),
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
shape_this,
comp_this,
);
}
Err(e) => {
@@ -320,7 +355,6 @@ impl EventChunker {
}
};
} else {
let p1 = sl.position();
if len < p1 as u32 + 4 {
let msg = format!("uncomp len: {} p1: {}", len, p1);
Err(Error::with_msg(msg))?;
@@ -330,10 +364,12 @@ impl EventChunker {
ret.add_event(
ts,
pulse,
buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(),
Some(decomp),
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
shape_this,
comp_this,
);
}
buf.advance(len as usize);
@@ -358,11 +394,13 @@ impl EventChunker {
pub struct EventFull {
pub tss: Vec<u64>,
pub pulses: Vec<u64>,
pub blobs: Vec<Vec<u8>>,
#[serde(serialize_with = "decomps_ser", deserialize_with = "decomps_de")]
pub decomps: Vec<Option<BytesMut>>,
pub scalar_types: Vec<ScalarType>,
pub be: Vec<bool>,
pub shapes: Vec<Shape>,
pub comps: Vec<Option<CompressionMethod>>,
}
fn decomps_ser<S>(t: &Vec<Option<BytesMut>>, s: S) -> Result<S::Ok, S::Error>
@@ -403,10 +441,12 @@ impl EventFull {
Self {
tss: vec![],
pulses: vec![],
blobs: vec![],
decomps: vec![],
scalar_types: vec![],
be: vec![],
shapes: vec![],
comps: vec![],
}
}
@@ -414,17 +454,21 @@ impl EventFull {
&mut self,
ts: u64,
pulse: u64,
blob: Vec<u8>,
decomp: Option<BytesMut>,
scalar_type: ScalarType,
be: bool,
shape: Shape,
comp: Option<CompressionMethod>,
) {
self.tss.push(ts);
self.pulses.push(pulse);
self.blobs.push(blob);
self.decomps.push(decomp);
self.scalar_types.push(scalar_type);
self.be.push(be);
self.shapes.push(shape);
self.comps.push(comp);
}
}
@@ -447,10 +491,12 @@ impl Appendable for EventFull {
fn append(&mut self, src: &Self) {
self.tss.extend_from_slice(&src.tss);
self.pulses.extend_from_slice(&src.pulses);
self.blobs.extend_from_slice(&src.blobs);
self.decomps.extend_from_slice(&src.decomps);
self.scalar_types.extend_from_slice(&src.scalar_types);
self.be.extend_from_slice(&src.be);
self.shapes.extend_from_slice(&src.shapes);
self.comps.extend_from_slice(&src.comps);
}
}
@@ -465,10 +511,12 @@ impl PushableIndex for EventFull {
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
self.pulses.push(src.pulses[ix]);
self.blobs.push(src.blobs[ix].clone());
self.decomps.push(src.decomps[ix].clone());
self.scalar_types.push(src.scalar_types[ix].clone());
self.be.push(src.be[ix]);
self.shapes.push(src.shapes[ix].clone());
self.comps.push(src.comps[ix].clone());
}
}
@@ -523,16 +571,43 @@ impl Stream for EventChunker {
// TODO gather stats about this:
self.inp.put_back(fcr);
}
if self.need_min > 1024 * 8 {
let msg = format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
Ready(Some(Err(Error::with_msg(msg))))
} else {
let x = self.need_min;
self.inp.set_need_min(x);
let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events));
Ready(Some(Ok(ret)))
match self.channel_config.shape {
Shape::Scalar => {
if self.need_min > 1024 * 8 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
Shape::Wave(_) => {
if self.need_min > 1024 * 32 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
Shape::Image(_, _) => {
if self.need_min > 1024 * 1024 * 20 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
}
let x = self.need_min;
self.inp.set_need_min(x);
{
info!(
"EventChunker emits {} events tss {:?}",
res.events.len(),
res.events.tss
);
};
let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events));
Ready(Some(Ok(ret)))
}
Err(e) => {
self.errored = true;

View File

@@ -129,15 +129,12 @@ where
(Some(None), buf, wp)
} else {
if len > 1024 * 1024 * 50 {
error!("InMemoryFrameAsyncReadStream too long len {}", len);
return (
Some(Some(Err(Error::with_msg(format!(
"InMemoryFrameAsyncReadStream tryparse huge buffer len {} self.inp_bytes_consumed {}",
len, self.inp_bytes_consumed
))))),
buf,
wp,
let msg = format!(
"InMemoryFrameAsyncReadStream tryparse huge buffer len {} self.inp_bytes_consumed {}",
len, self.inp_bytes_consumed
);
error!("{}", msg);
return (Some(Some(Err(Error::with_msg(msg)))), buf, wp);
} else if len > 1024 * 1024 * 1 {
// TODO
//warn!("InMemoryFrameAsyncReadStream big len received {}", len);

View File

@@ -249,6 +249,10 @@ async fn gen_config(
buf.put_i8(1);
buf.put_i32(k as i32);
}
Shape::Image(_, _) => {
// TODO test data
err::todoval()
}
}
let len = buf.len() - p3 - 4;
buf.as_mut()[p3..].as_mut().put_i32(len as i32);

View File

@@ -503,6 +503,9 @@ impl ChannelConfigExt for ChannelConfig {
Shape::Wave(_) => {
ret |= SHAPE;
}
Shape::Image(_, _) => {
ret |= SHAPE;
}
}
if self.byte_order.is_be() {
ret |= BIG_ENDIAN;

View File

@@ -62,7 +62,7 @@ where
range_complete_observed_all: false,
range_complete_observed_all_emitted: false,
data_emit_complete: false,
batch_size: 64,
batch_size: 1,
logitems: VecDeque::new(),
event_data_read_stats_items: VecDeque::new(),
}

View File

@@ -27,6 +27,7 @@ impl MergedBlobsFromRemotes {
pub fn new(evq: RawEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self {
info!("MergedBlobsFromRemotes evq {:?}", evq);
let mut tcp_establish_futs = vec![];
for node in &cluster.nodes {
let f = x_processed_event_blobs_stream_from_node(evq.clone(), perf_opts.clone(), node.clone());
let f: T002<EventFull> = Box::pin(f);

View File

@@ -58,7 +58,7 @@ where
range_complete_observed_all: false,
range_complete_observed_all_emitted: false,
data_emit_complete: false,
batch_size: 64,
batch_size: 1,
logitems: VecDeque::new(),
event_data_read_stats_items: VecDeque::new(),
}
@@ -188,6 +188,13 @@ where
let emp = I::empty();
let ret = std::mem::replace(&mut self.batch, emp);
self.data_emit_complete = true;
{
let mut aa = vec![];
for ii in 0..ret.len() {
aa.push(ret.ts(ii));
}
info!("MergedBlobsStream A emits {} events tss {:?}", ret.len(), aa);
};
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
self.data_emit_complete = true;
@@ -220,6 +227,13 @@ where
if self.batch.len() >= self.batch_size {
let emp = I::empty();
let ret = std::mem::replace(&mut self.batch, emp);
{
let mut aa = vec![];
for ii in 0..ret.len() {
aa.push(ret.ts(ii));
}
info!("MergedBlobsStream B emits {} events tss {:?}", ret.len(), aa);
};
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
continue 'outer;

View File

@@ -3,15 +3,16 @@ use crate::decode::{
EventsDecodedStream, LittleEndian, NumFromBytes,
};
use crate::eventblobs::EventChunkerMultifile;
use crate::eventchunker::EventChunkerConf;
use crate::eventchunker::{EventChunkerConf, EventFull};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::numops::{BoolNum, NumOps};
use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem};
use netpod::query::RawEventsQuery;
use netpod::{AggKind, ByteOrder, ByteSize, NodeConfigCached, ScalarType, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, MatchingConfigEntry};
use netpod::{AggKind, ByteOrder, ByteSize, Channel, NanoRange, NodeConfigCached, ScalarType, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, ConfigEntry, MatchingConfigEntry};
use std::pin::Pin;
fn make_num_pipeline_stream_evs<NTY, END, EVS, ENP>(
@@ -96,6 +97,10 @@ macro_rules! pipe3 {
$event_blobs
)
}
Shape::Image(_, _) => {
// TODO not needed for python data api v3 protocol, but later for api4.
err::todoval()
}
}
};
}
@@ -193,6 +198,94 @@ pub async fn make_event_pipe(
Ok(pipe)
}
pub async fn get_applicable_entry(
range: &NanoRange,
channel: Channel,
node_config: &NodeConfigCached,
) -> Result<ConfigEntry, Error> {
let channel_config = read_local_config(channel, node_config.node.clone()).await?;
let entry_res = match extract_matching_config_entry(range, &channel_config) {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let entry = match entry_res {
MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?,
MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?,
MatchingConfigEntry::Entry(entry) => entry,
};
Ok(entry.clone())
}
pub fn make_local_event_blobs_stream(
range: NanoRange,
channel: Channel,
entry: &ConfigEntry,
expand: bool,
event_chunker_conf: EventChunkerConf,
disk_io_buffer_size: usize,
node_config: &NodeConfigCached,
) -> Result<EventChunkerMultifile, Error> {
let shape = match entry.to_shape() {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let channel_config = netpod::ChannelConfig {
channel,
keyspace: entry.ks as u8,
time_bin_size: entry.bs,
shape: shape,
scalar_type: entry.scalar_type.clone(),
byte_order: entry.byte_order.clone(),
array: entry.is_array,
compression: entry.is_compressed,
};
let event_blobs = EventChunkerMultifile::new(
range,
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
disk_io_buffer_size,
event_chunker_conf,
expand,
);
Ok(event_blobs)
}
pub fn make_remote_event_blobs_stream(
range: NanoRange,
channel: Channel,
entry: &ConfigEntry,
expand: bool,
event_chunker_conf: EventChunkerConf,
disk_io_buffer_size: usize,
node_config: &NodeConfigCached,
) -> Result<impl Stream<Item = Sitemty<EventFull>>, Error> {
let shape = match entry.to_shape() {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let channel_config = netpod::ChannelConfig {
channel,
keyspace: entry.ks as u8,
time_bin_size: entry.bs,
shape: shape,
scalar_type: entry.scalar_type.clone(),
byte_order: entry.byte_order.clone(),
array: entry.is_array,
compression: entry.is_compressed,
};
let event_blobs = EventChunkerMultifile::new(
range,
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
disk_io_buffer_size,
event_chunker_conf,
expand,
);
Ok(event_blobs)
}
pub async fn make_event_blobs_pipe(
evq: &RawEventsQuery,
node_config: &NodeConfigCached,
@@ -203,53 +296,40 @@ pub async fn make_event_blobs_pipe(
Err(e) => return Err(e)?,
}
}
let expand = evq.agg_kind.need_expand();
let range = &evq.range;
let channel_config = match read_local_config(evq.channel.clone(), node_config.node.clone()).await {
Ok(k) => k,
Err(e) => {
if e.msg().contains("ErrorKind::NotFound") {
let s = futures_util::stream::empty();
return Ok(Box::pin(s));
} else {
return Err(e)?;
}
}
};
let entry_res = match extract_matching_config_entry(range, &channel_config) {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let entry = match entry_res {
MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?,
MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?,
MatchingConfigEntry::Entry(entry) => entry,
};
let shape = match entry.to_shape() {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let channel_config = netpod::ChannelConfig {
channel: evq.channel.clone(),
keyspace: entry.ks as u8,
time_bin_size: entry.bs,
shape: shape,
scalar_type: entry.scalar_type.clone(),
byte_order: entry.byte_order.clone(),
array: entry.is_array,
compression: entry.is_compressed,
};
let entry = get_applicable_entry(&evq.range, evq.channel.clone(), node_config).await?;
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let event_blobs = EventChunkerMultifile::new(
range.clone(),
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
evq.disk_io_buffer_size,
event_chunker_conf,
true,
);
let s = event_blobs.map(|item| Box::new(item) as Box<dyn Framable>);
let pipe: Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>;
pipe = Box::pin(s);
let pipe = if true {
let event_blobs = make_remote_event_blobs_stream(
range.clone(),
evq.channel.clone(),
&entry,
expand,
event_chunker_conf,
evq.disk_io_buffer_size,
node_config,
)?;
let s = event_blobs.map(|item| Box::new(item) as Box<dyn Framable>);
//let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe"));
let pipe: Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>;
pipe = Box::pin(s);
pipe
} else {
let event_blobs = make_local_event_blobs_stream(
range.clone(),
evq.channel.clone(),
&entry,
expand,
event_chunker_conf,
evq.disk_io_buffer_size,
node_config,
)?;
let s = event_blobs.map(|item| Box::new(item) as Box<dyn Framable>);
//let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe"));
let pipe: Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>;
pipe = Box::pin(s);
pipe
};
Ok(pipe)
}

View File

@@ -1,15 +1,16 @@
use crate::gather::{gather_get_json_generic, SubRes};
use crate::{response, BodyStream};
use bytes::{BufMut, BytesMut};
use disk::eventchunker::{EventChunkerConf, EventFull};
use err::Error;
use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
use http::{Method, StatusCode};
use hyper::{Body, Client, Request, Response};
use items::{RangeCompletableItem, StreamItem};
use items::{RangeCompletableItem, Sitemty, StreamItem};
use itertools::Itertools;
use netpod::query::RawEventsQuery;
use netpod::{log::*, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, APP_OCTET};
use netpod::{log::*, ByteSize, Channel, NanoRange, NodeConfigCached, PerfOpts, ScalarType, Shape, APP_OCTET};
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, Config, MatchingConfigEntry};
use serde::{Deserialize, Serialize};
@@ -566,6 +567,8 @@ impl Stream for DataApiPython3DataStream {
MatchingConfigEntry::Entry(entry) => entry.clone(),
};
warn!("found channel_config {:?}", entry);
// TODO pull out the performance settings
let evq = RawEventsQuery {
channel: self.channels[self.chan_ix - 1].clone(),
range: self.range.clone(),
@@ -573,11 +576,30 @@ impl Stream for DataApiPython3DataStream {
disk_io_buffer_size: 1024 * 4,
};
let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 };
let s = disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes::new(
evq,
perf_opts,
self.node_config.node_config.cluster.clone(),
);
// TODO is this a good to place decide this?
let s = if self.node_config.node_config.cluster.is_central_storage {
info!("Set up central storage stream");
// TODO pull up this config
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let s = disk::raw::conn::make_local_event_blobs_stream(
evq.range.clone(),
evq.channel.clone(),
&entry,
evq.agg_kind.need_expand(),
event_chunker_conf,
evq.disk_io_buffer_size,
&self.node_config,
)?;
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
} else {
info!("Set up merged remote stream");
let s = disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes::new(
evq,
perf_opts,
self.node_config.node_config.cluster.clone(),
);
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
};
let s = s.map({
let mut header_out = false;
let mut count_events = 0;
@@ -591,13 +613,15 @@ impl Stream for DataApiPython3DataStream {
for i1 in 0..b.tss.len() {
if count_events < 6 {
info!(
"deco len {:?} BE {} scalar-type {:?} shape {:?}",
"deco len {:?} BE {} scalar-type {:?} shape {:?} comps {:?}",
b.decomps[i1].as_ref().map(|x| x.len()),
b.be[i1],
b.scalar_types[i1],
b.shapes[i1]
b.shapes[i1],
b.comps[i1],
);
}
let compression = if let (Shape::Image(..), Some(..)) = (&b.shapes[i1], &b.comps[i1]) { Some(1) } else { None };
if !header_out {
let head = Api1ChannelHeader {
name: channel.name.clone(),
@@ -611,7 +635,7 @@ impl Stream for DataApiPython3DataStream {
// The shape is inconsistent on the events.
// Seems like the config is to be trusted in this case.
shape: shape_to_api3proto(&entry.shape),
compression: None,
compression,
};
let h = serde_json::to_string(&head)?;
info!("sending channel header {}", h);
@@ -623,14 +647,27 @@ impl Stream for DataApiPython3DataStream {
header_out = true;
}
{
if let Some(deco) = &b.decomps[i1] {
let l1 = 17 + deco.len() as u32;
d.put_u32(l1);
d.put_u8(1);
d.put_u64(b.tss[i1]);
d.put_u64(b.pulses[i1]);
d.put_slice(&deco);
d.put_u32(l1);
match &b.shapes[i1] {
Shape::Image(_, _) => {
let l1 = 17 + b.blobs[i1].len() as u32;
d.put_u32(l1);
d.put_u8(1);
d.put_u64(b.tss[i1]);
d.put_u64(b.pulses[i1]);
d.put_slice(&b.blobs[i1]);
d.put_u32(l1);
}
_ => {
if let Some(deco) = &b.decomps[i1] {
let l1 = 17 + deco.len() as u32;
d.put_u32(l1);
d.put_u8(1);
d.put_u64(b.tss[i1]);
d.put_u64(b.pulses[i1]);
d.put_slice(&deco);
d.put_u32(l1);
}
}
}
}
count_events += 1;
@@ -710,7 +747,8 @@ pub async fn api1_binary_events(req: Request<Body>, node_config: &NodeConfigCach
beg: beg_ns,
end: end_ns,
};
let backend = "sf-databuffer";
// TODO use the proper backend name:
let backend = "DUMMY";
let chans = qu
.channels
.iter()
@@ -719,120 +757,10 @@ pub async fn api1_binary_events(req: Request<Body>, node_config: &NodeConfigCach
name: x.clone(),
})
.collect();
if true {
let s = DataApiPython3DataStream::new(range.clone(), chans, node_config.clone());
let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", "dummy");
let ret = ret.body(BodyStream::wrapped(s, format!("plain_events")))?;
return Ok(ret);
}
// TODO to server multiple channels, I need to wrap the loop over channels in a Stream itself.
let channel = qu.channels[0].clone();
let channel = Channel {
backend: backend.into(),
name: channel,
};
let channel_config = {
let channel_config = match read_local_config(channel.clone(), node_config.node.clone()).await {
Ok(k) => k,
Err(e) => {
error!("api1_binary_events error {:?}", e);
return Err(Error::with_msg_no_trace("can not parse channel config"));
}
};
let entry_res = match extract_matching_config_entry(&range, &channel_config) {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let entry = match entry_res {
MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?,
MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?,
MatchingConfigEntry::Entry(entry) => entry,
};
entry.clone()
};
warn!("found channel_config {:?}", channel_config);
let evq = RawEventsQuery {
channel: channel.clone(),
range,
agg_kind: netpod::AggKind::EventBlobs,
disk_io_buffer_size: 1024 * 4,
};
let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 };
let s = disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes::new(
evq,
perf_opts,
node_config.node_config.cluster.clone(),
);
let s = s.map({
let mut header_out = false;
let mut count_events = 0;
move |b| {
let ret = match b {
Ok(b) => {
let f = match b {
StreamItem::DataItem(RangeCompletableItem::Data(b)) => {
let mut d = BytesMut::new();
for i1 in 0..b.tss.len() {
if count_events < 6 {
info!(
"deco len {:?} BE {} scalar-type {:?} shape {:?}",
b.decomps[i1].as_ref().map(|x| x.len()),
b.be[i1],
b.scalar_types[i1],
b.shapes[i1]
);
}
if !header_out {
let head = Api1ChannelHeader {
name: channel.name.clone(),
ty: scalar_type_to_api3proto(&b.scalar_types[i1]).into(),
byte_order: if b.be[i1] {
"BIG_ENDIAN".into()
} else {
"LITTLE_ENDIAN".into()
},
// The shape is inconsistent on the events.
// Seems like the config is to be trusted in this case.
shape: shape_to_api3proto(&channel_config.shape),
//shape: vec![2560],
compression: None,
};
let h = serde_json::to_string(&head)?;
info!("sending channel header {}", h);
let l1 = 1 + h.as_bytes().len() as u32;
d.put_u32(l1);
d.put_u8(0);
d.extend_from_slice(h.as_bytes());
d.put_u32(l1);
header_out = true;
}
{
if let Some(deco) = &b.decomps[i1] {
let l1 = 17 + deco.len() as u32;
d.put_u32(l1);
d.put_u8(1);
d.put_u64(b.tss[i1]);
d.put_u64(b.pulses[i1]);
d.put_slice(&deco);
d.put_u32(l1);
}
}
count_events += 1;
}
d
}
_ => BytesMut::new(),
};
Ok(f)
}
Err(e) => Err(e),
};
ret
}
});
let s = DataApiPython3DataStream::new(range.clone(), chans, node_config.clone());
let ret = response(StatusCode::OK).header("x-daqbuffer-request-id", "dummy");
let ret = ret.body(BodyStream::wrapped(s, format!("plain_events")))?;
Ok(ret)
let ret = ret.body(BodyStream::wrapped(s, format!("api1_binary_events")))?;
return Ok(ret);
}
fn scalar_type_to_api3proto(sty: &ScalarType) -> &'static str {

View File

@@ -39,7 +39,11 @@ fn proxy_mark() -> &'static str {
}
pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> {
let _update_task = UpdateTask::new(node_config.clone());
let _update_task = if node_config.node_config.cluster.run_map_pulse_task {
Some(UpdateTask::new(node_config.clone()))
} else {
None
};
let rawjh = taskrun::spawn(events_service(node_config.clone()));
use std::str::FromStr;
let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?;
@@ -485,7 +489,7 @@ async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached)
);
let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
let s = s.map(|item| item.make_frame());
let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?;
let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events_binary")))?;
Ok(ret)
}
@@ -502,7 +506,7 @@ async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -
query.do_log(),
);
let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?;
let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events_json")))?;
Ok(ret)
}

View File

@@ -32,6 +32,8 @@ fn ca_connect_1() {
user: "".into(),
pass: "".into(),
},
run_map_pulse_task: false,
is_central_storage: false,
},
},
ix: 0,

View File

@@ -161,10 +161,18 @@ pub struct Database {
pub pass: String,
}
fn bool_false() -> bool {
false
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Cluster {
pub nodes: Vec<Node>,
pub database: Database,
#[serde(default = "bool_false")]
pub run_map_pulse_task: bool,
#[serde(default = "bool_false")]
pub is_central_storage: bool,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -359,6 +367,7 @@ pub struct ChannelConfig {
pub enum Shape {
Scalar,
Wave(u32),
Image(u32, u32),
}
pub trait HasShape {
@@ -479,7 +488,14 @@ fn get_patch_t_len(bin_t_len: u64) -> u64 {
}
}
}
Shape::Wave(_) => {
Shape::Wave(..) => {
for (i1, &j) in PATCH_T_LEN_KEY.iter().enumerate() {
if bin_t_len == j {
return PATCH_T_LEN_OPTIONS_WAVE[i1];
}
}
}
Shape::Image(..) => {
for (i1, &j) in PATCH_T_LEN_KEY.iter().enumerate() {
if bin_t_len == j {
return PATCH_T_LEN_OPTIONS_WAVE[i1];
@@ -737,6 +753,16 @@ impl AggKind {
Self::Plain => false,
}
}
pub fn need_expand(&self) -> bool {
match self {
Self::EventBlobs => false,
Self::TimeWeightedScalar => true,
Self::DimXBins1 => false,
Self::DimXBinsN(_) => false,
Self::Plain => false,
}
}
}
pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize {
@@ -749,6 +775,7 @@ pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize {
match shape {
Shape::Scalar => 0,
Shape::Wave(n) => *n as usize,
Shape::Image(j, k) => *j as usize * *k as usize,
}
} else {
*n as usize
@@ -757,6 +784,7 @@ pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize {
AggKind::Plain => match shape {
Shape::Scalar => 0,
Shape::Wave(n) => *n as usize,
Shape::Image(j, k) => *j as usize * *k as usize,
},
}
}

View File

@@ -30,7 +30,7 @@ pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error>
async fn events_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: NodeConfigCached) -> Result<(), Error> {
//use tracing_futures::Instrument;
let span1 = span!(Level::INFO, "raw::raw_conn_handler");
let span1 = span!(Level::INFO, "events_conn_handler");
let r = events_conn_handler_inner(stream, addr, &node_config)
.instrument(span1)
.await;
@@ -85,7 +85,7 @@ async fn events_conn_handler_inner_try(
let mut frames = vec![];
while let Some(k) = h
.next()
.instrument(span!(Level::INFO, "raw_conn_handler INPUT STREAM READ"))
.instrument(span!(Level::INFO, "events_conn_handler INPUT STREAM READ"))
.await
{
match k {
@@ -139,10 +139,13 @@ async fn events_conn_handler_inner_try(
//info!("conn.rs encode frame typeid {:x}", item.typeid());
let item = item.make_frame();
match item {
Ok(buf) => match netout.write_all(&buf).await {
Ok(_) => {}
Err(e) => return Err((e, netout))?,
},
Ok(buf) => {
info!("events_conn_handler send {} bytes", buf.len());
match netout.write_all(&buf).await {
Ok(_) => {}
Err(e) => return Err((e, netout))?,
}
}
Err(e) => {
return Err((e, netout))?;
}

View File

@@ -73,7 +73,11 @@ impl ConfigEntry {
Some(lens) => {
if lens.len() == 1 {
Shape::Wave(lens[0])
} else if lens.len() == 2 {
Shape::Image(lens[0], lens[1])
} else {
// TODO
// Need a new Shape variant for images.
return Err(Error::with_msg(format!("Channel config unsupported shape {:?}", self)))?;
}
}

View File

@@ -1,7 +1,7 @@
use err::Error;
use std::borrow::Cow;
use std::fs;
use std::io::{self, BufWriter, Read, Stderr, Stdin, Write};
use std::io::{BufWriter, Read, Seek, SeekFrom, Stderr, Stdin, Write};
use std::path::{Path, PathBuf};
pub struct Buffer {
@@ -98,18 +98,59 @@ fn parse_lines(buf: &[u8]) -> Result<(Vec<Cow<str>>, usize), Error> {
Ok((ret, i2))
}
const MAX_PER_FILE: usize = 1024 * 1024 * 2;
const MAX_TOTAL_SIZE: usize = 1024 * 1024 * 20;
const MAX_PER_FILE: u64 = 1024 * 1024 * 2;
const MAX_TOTAL_SIZE: u64 = 1024 * 1024 * 20;
fn next_file(dir: &Path, append: bool, truncate: bool) -> io::Result<BufWriter<fs::File>> {
struct Fileinfo {
path: PathBuf,
name: String,
len: u64,
}
fn file_list(dir: &Path) -> Result<Vec<Fileinfo>, Error> {
let mut ret = vec![];
let rd = fs::read_dir(&dir)?;
for e in rd {
let e = e?;
let fnos = e.file_name();
let fns = fnos.to_str().unwrap_or("");
if fns.starts_with("info-20") && fns.ends_with(".log") {
let meta = e.metadata()?;
let info = Fileinfo {
path: e.path(),
name: fns.into(),
len: meta.len(),
};
ret.push(info);
}
}
ret.sort_by(|a, b| std::cmp::Ord::cmp(&a.name, &b.name));
Ok(ret)
}
fn open_latest_or_new(dir: &Path) -> Result<BufWriter<fs::File>, Error> {
let list = file_list(dir)?;
if let Some(latest) = list.last() {
if latest.len < MAX_PER_FILE {
let ret = fs::OpenOptions::new().write(true).append(true).open(&latest.path)?;
let ret = BufWriter::new(ret);
return Ok(ret);
}
}
next_file(dir)
}
fn next_file(dir: &Path) -> Result<BufWriter<fs::File>, Error> {
let ts = chrono::Utc::now();
let s = ts.format("%Y-%m-%d--%H-%M-%S").to_string();
let ret = fs::OpenOptions::new()
let mut ret = fs::OpenOptions::new()
.write(true)
.create(true)
.append(append)
.truncate(truncate)
.append(true)
.open(dir.join(format!("info-{}.log", s)))?;
if ret.seek(SeekFrom::Current(0))? != 0 {
return Err(Error::with_msg_no_trace("new file already exists"));
}
let ret = BufWriter::new(ret);
Ok(ret)
}
@@ -117,7 +158,7 @@ fn next_file(dir: &Path, append: bool, truncate: bool) -> io::Result<BufWriter<f
pub fn append_inner(dirname: &str, mut stdin: Stdin, _stderr: Stderr) -> Result<(), Error> {
let mut bytes_written = 0;
let dir = PathBuf::from(dirname);
let mut fout = next_file(&dir, true, false)?;
let mut fout = open_latest_or_new(&dir)?;
let mut buf = Buffer::new();
loop {
// Get some more data.
@@ -144,7 +185,7 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin, _stderr: Stderr) -> Result<
let j = line.as_bytes();
fout.write_all(j)?;
fout.write_all(b"\n")?;
bytes_written += j.len() + 1;
bytes_written += j.len() as u64 + 1;
}
buf.advance(n2);
}
@@ -154,38 +195,41 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin, _stderr: Stderr) -> Result<
}
}
fout.flush()?;
if bytes_written >= MAX_PER_FILE {
if bytes_written >= (MAX_PER_FILE >> 3) {
bytes_written = 0;
let rd = fs::read_dir(&dir)?;
let mut w = vec![];
for e in rd {
let e = e?;
let fnos = e.file_name();
let fns = fnos.to_str().unwrap();
if fns.starts_with("info-20") && fns.ends_with(".log") {
let meta = e.metadata()?;
w.push((e.path(), meta.len()));
let l1 = fout.seek(SeekFrom::End(0))?;
if l1 >= MAX_PER_FILE {
let rd = fs::read_dir(&dir)?;
let mut w = vec![];
for e in rd {
let e = e?;
let fnos = e.file_name();
let fns = fnos.to_str().unwrap();
if fns.starts_with("info-20") && fns.ends_with(".log") {
let meta = e.metadata()?;
w.push((e.path(), meta.len()));
}
}
}
w.sort_by(|a, b| std::cmp::Ord::cmp(a, b));
for q in &w {
write!(&mut fout, "file:::: {}\n", q.0.to_string_lossy())?;
}
let mut lentot = w.iter().map(|g| g.1).fold(0, |a, x| a + x);
write!(&mut fout, "lentot: {}\n", lentot)?;
for q in w {
if lentot <= MAX_TOTAL_SIZE as u64 {
break;
w.sort_by(|a, b| std::cmp::Ord::cmp(a, b));
for q in &w {
write!(&mut fout, "file:::: {}\n", q.0.to_string_lossy())?;
}
write!(&mut fout, "REMOVE {} {}\n", q.1, q.0.to_string_lossy())?;
fs::remove_file(q.0)?;
if q.1 < lentot {
lentot -= q.1;
} else {
lentot = 0;
let mut lentot = w.iter().map(|g| g.1).fold(0, |a, x| a + x);
write!(&mut fout, "lentot: {}\n", lentot)?;
for q in w {
if lentot <= MAX_TOTAL_SIZE as u64 {
break;
}
write!(&mut fout, "REMOVE {} {}\n", q.1, q.0.to_string_lossy())?;
fs::remove_file(q.0)?;
if q.1 < lentot {
lentot -= q.1;
} else {
lentot = 0;
}
}
}
fout = next_file(&dir, true, false)?;
fout = next_file(&dir)?;
};
}
}
}

View File

@@ -130,5 +130,7 @@ pub fn test_cluster() -> netpod::Cluster {
user: "daqbuffer".into(),
pass: "daqbuffer".into(),
},
run_map_pulse_task: false,
is_central_storage: false,
}
}