Can retrieve the problematic channel with split on wrong node
This commit is contained in:
@@ -53,6 +53,7 @@ where
|
||||
Shape::Scalar => {
|
||||
let evs = EventValuesDim0Case::new();
|
||||
match agg_kind {
|
||||
AggKind::EventBlobs => panic!(),
|
||||
AggKind::TimeWeightedScalar | AggKind::DimXBins1 => {
|
||||
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, _>(
|
||||
@@ -83,6 +84,7 @@ where
|
||||
Shape::Wave(n) => {
|
||||
let evs = EventValuesDim1Case::new(n);
|
||||
match agg_kind {
|
||||
AggKind::EventBlobs => panic!(),
|
||||
AggKind::TimeWeightedScalar | AggKind::DimXBins1 => {
|
||||
let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
make_num_pipeline_nty_end_evs_enp::<NTY, END, _, _>(
|
||||
|
||||
@@ -382,6 +382,7 @@ impl AppendToUrl for BinnedQuery {
|
||||
fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) {
|
||||
let mut g = url.query_pairs_mut();
|
||||
match agg_kind {
|
||||
AggKind::EventBlobs => panic!(),
|
||||
AggKind::TimeWeightedScalar => {
|
||||
g.append_pair("binningScheme", "timeWeightedScalar");
|
||||
}
|
||||
@@ -403,7 +404,9 @@ fn agg_kind_from_binning_scheme(pairs: &BTreeMap<String, String>) -> Result<AggK
|
||||
let s = pairs
|
||||
.get(key)
|
||||
.map_or(Err(Error::with_msg(format!("can not find {}", key))), |k| Ok(k))?;
|
||||
let ret = if s == "fullValue" {
|
||||
let ret = if s == "eventBlobs" {
|
||||
AggKind::EventBlobs
|
||||
} else if s == "fullValue" {
|
||||
AggKind::Plain
|
||||
} else if s == "timeWeightedScalar" {
|
||||
AggKind::TimeWeightedScalar
|
||||
|
||||
@@ -93,6 +93,7 @@ where
|
||||
Shape::Scalar => {
|
||||
//
|
||||
match agg_kind {
|
||||
AggKind::EventBlobs => panic!(),
|
||||
AggKind::Plain => {
|
||||
let evs = EventValuesDim0Case::new();
|
||||
let events_node_proc = <<EventValuesDim0Case<NTY> as EventValueShape<NTY, END>>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
@@ -118,6 +119,7 @@ where
|
||||
Shape::Wave(n) => {
|
||||
//
|
||||
match agg_kind {
|
||||
AggKind::EventBlobs => panic!(),
|
||||
AggKind::Plain => {
|
||||
let evs = EventValuesDim1Case::new(n);
|
||||
let events_node_proc = <<EventValuesDim1Case<NTY> as EventValueShape<NTY, END>>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone());
|
||||
|
||||
@@ -113,20 +113,28 @@ async fn position_file(
|
||||
match OpenOptions::new().read(true).open(&index_path).await {
|
||||
Ok(mut index_file) => {
|
||||
let meta = index_file.metadata().await?;
|
||||
if meta.len() > 1024 * 1024 * 80 {
|
||||
if meta.len() > 1024 * 1024 * 120 {
|
||||
let msg = format!(
|
||||
"too large index file {} bytes for {}",
|
||||
meta.len(),
|
||||
channel_config.channel.name
|
||||
);
|
||||
error!("{}", msg);
|
||||
return Err(Error::with_msg(msg));
|
||||
} else if meta.len() > 1024 * 1024 * 80 {
|
||||
let msg = format!(
|
||||
"very large index file {} bytes for {}",
|
||||
meta.len(),
|
||||
channel_config.channel.name
|
||||
);
|
||||
warn!("{}", msg);
|
||||
} else if meta.len() > 1024 * 1024 * 20 {
|
||||
let msg = format!(
|
||||
"large index file {} bytes for {}",
|
||||
meta.len(),
|
||||
channel_config.channel.name
|
||||
);
|
||||
warn!("{}", msg);
|
||||
info!("{}", msg);
|
||||
}
|
||||
if meta.len() < 2 {
|
||||
return Err(Error::with_msg(format!(
|
||||
|
||||
@@ -4,10 +4,13 @@ use bytes::{Buf, BytesMut};
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items::{Appendable, PushableIndex, RangeCompletableItem, StatsItem, StreamItem, WithLen, WithTimestamps};
|
||||
use items::{
|
||||
Appendable, PushableIndex, RangeCompletableItem, SitemtyFrameType, StatsItem, StreamItem, WithLen, WithTimestamps,
|
||||
};
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
@@ -33,6 +36,7 @@ pub struct EventChunker {
|
||||
expand: bool,
|
||||
seen_before_range_count: usize,
|
||||
seen_after_range_count: usize,
|
||||
unordered_warn_count: usize,
|
||||
}
|
||||
|
||||
enum DataFileState {
|
||||
@@ -87,6 +91,7 @@ impl EventChunker {
|
||||
expand,
|
||||
seen_before_range_count: 0,
|
||||
seen_after_range_count: 0,
|
||||
unordered_warn_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,15 +173,20 @@ impl EventChunker {
|
||||
let pulse = sl.read_i64::<BE>().unwrap() as u64;
|
||||
let max_ts = self.max_ts.load(Ordering::SeqCst);
|
||||
if ts < max_ts {
|
||||
Err(Error::with_msg(format!(
|
||||
"unordered event ts: {}.{} max_ts {}.{} config {:?} path {:?}",
|
||||
ts / SEC,
|
||||
ts % SEC,
|
||||
max_ts / SEC,
|
||||
max_ts % SEC,
|
||||
self.channel_config.shape,
|
||||
self.path
|
||||
)))?;
|
||||
if self.unordered_warn_count < 20 {
|
||||
let msg = format!(
|
||||
"unordered event no {} ts: {}.{} max_ts {}.{} config {:?} path {:?}",
|
||||
self.unordered_warn_count,
|
||||
ts / SEC,
|
||||
ts % SEC,
|
||||
max_ts / SEC,
|
||||
max_ts % SEC,
|
||||
self.channel_config.shape,
|
||||
self.path
|
||||
);
|
||||
warn!("{}", msg);
|
||||
self.unordered_warn_count += 1;
|
||||
}
|
||||
}
|
||||
self.max_ts.store(ts, Ordering::SeqCst);
|
||||
if ts >= self.range.end {
|
||||
@@ -242,6 +252,17 @@ impl EventChunker {
|
||||
for i1 in 0..shape_dim {
|
||||
shape_lens[i1 as usize] = sl.read_u32::<BE>().unwrap();
|
||||
}
|
||||
let shape_this = {
|
||||
if is_shaped {
|
||||
if shape_dim == 1 {
|
||||
Shape::Wave(shape_lens[0])
|
||||
} else {
|
||||
err::todoval()
|
||||
}
|
||||
} else {
|
||||
Shape::Scalar
|
||||
}
|
||||
};
|
||||
if is_compressed {
|
||||
//debug!("event ts {} is_compressed {}", ts, is_compressed);
|
||||
let value_bytes = sl.read_u64::<BE>().unwrap();
|
||||
@@ -291,6 +312,7 @@ impl EventChunker {
|
||||
Some(decomp),
|
||||
ScalarType::from_dtype_index(type_index)?,
|
||||
is_big_endian,
|
||||
shape_this,
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -311,6 +333,7 @@ impl EventChunker {
|
||||
Some(decomp),
|
||||
ScalarType::from_dtype_index(type_index)?,
|
||||
is_big_endian,
|
||||
shape_this,
|
||||
);
|
||||
}
|
||||
buf.advance(len as usize);
|
||||
@@ -331,13 +354,48 @@ impl EventChunker {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct EventFull {
|
||||
pub tss: Vec<u64>,
|
||||
pub pulses: Vec<u64>,
|
||||
#[serde(serialize_with = "decomps_ser", deserialize_with = "decomps_de")]
|
||||
pub decomps: Vec<Option<BytesMut>>,
|
||||
pub scalar_types: Vec<ScalarType>,
|
||||
pub be: Vec<bool>,
|
||||
pub shapes: Vec<Shape>,
|
||||
}
|
||||
|
||||
fn decomps_ser<S>(t: &Vec<Option<BytesMut>>, s: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let a: Vec<_> = t
|
||||
.iter()
|
||||
.map(|k| match k {
|
||||
None => None,
|
||||
Some(j) => Some(j[..].to_vec()),
|
||||
})
|
||||
.collect();
|
||||
Serialize::serialize(&a, s)
|
||||
}
|
||||
|
||||
fn decomps_de<'de, D>(d: D) -> Result<Vec<Option<BytesMut>>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let a: Vec<Option<Vec<u8>>> = Deserialize::deserialize(d)?;
|
||||
let a = a
|
||||
.iter()
|
||||
.map(|k| match k {
|
||||
None => None,
|
||||
Some(j) => {
|
||||
let mut a = BytesMut::new();
|
||||
a.extend_from_slice(&j);
|
||||
Some(a)
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
Ok(a)
|
||||
}
|
||||
|
||||
impl EventFull {
|
||||
@@ -348,18 +406,32 @@ impl EventFull {
|
||||
decomps: vec![],
|
||||
scalar_types: vec![],
|
||||
be: vec![],
|
||||
shapes: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option<BytesMut>, scalar_type: ScalarType, be: bool) {
|
||||
fn add_event(
|
||||
&mut self,
|
||||
ts: u64,
|
||||
pulse: u64,
|
||||
decomp: Option<BytesMut>,
|
||||
scalar_type: ScalarType,
|
||||
be: bool,
|
||||
shape: Shape,
|
||||
) {
|
||||
self.tss.push(ts);
|
||||
self.pulses.push(pulse);
|
||||
self.decomps.push(decomp);
|
||||
self.scalar_types.push(scalar_type);
|
||||
self.be.push(be);
|
||||
self.shapes.push(shape);
|
||||
}
|
||||
}
|
||||
|
||||
impl SitemtyFrameType for EventFull {
|
||||
const FRAME_TYPE_ID: u32 = items::EVENT_FULL_FRAME_TYPE_ID;
|
||||
}
|
||||
|
||||
impl WithLen for EventFull {
|
||||
fn len(&self) -> usize {
|
||||
self.tss.len()
|
||||
@@ -378,6 +450,7 @@ impl Appendable for EventFull {
|
||||
self.decomps.extend_from_slice(&src.decomps);
|
||||
self.scalar_types.extend_from_slice(&src.scalar_types);
|
||||
self.be.extend_from_slice(&src.be);
|
||||
self.shapes.extend_from_slice(&src.shapes);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -395,6 +468,7 @@ impl PushableIndex for EventFull {
|
||||
self.decomps.push(src.decomps[ix].clone());
|
||||
self.scalar_types.push(src.scalar_types[ix].clone());
|
||||
self.be.push(src.be[ix]);
|
||||
self.shapes.push(src.shapes[ix].clone());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pub mod mergedblobsfromremotes;
|
||||
pub mod mergedfromremotes;
|
||||
|
||||
enum MergedCurVal<T> {
|
||||
|
||||
106
disk/src/merge/mergedblobsfromremotes.rs
Normal file
106
disk/src/merge/mergedblobsfromremotes.rs
Normal file
@@ -0,0 +1,106 @@
|
||||
use crate::eventchunker::EventFull;
|
||||
use crate::mergeblobs::MergedBlobsStream;
|
||||
use crate::raw::client::x_processed_event_blobs_stream_from_node;
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::{pin_mut, StreamExt};
|
||||
use items::Sitemty;
|
||||
use netpod::log::*;
|
||||
use netpod::query::RawEventsQuery;
|
||||
use netpod::{Cluster, PerfOpts};
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
type T001<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
|
||||
type T002<T> = Pin<Box<dyn Future<Output = Result<T001<T>, Error>> + Send>>;
|
||||
|
||||
pub struct MergedBlobsFromRemotes {
|
||||
tcp_establish_futs: Vec<T002<EventFull>>,
|
||||
nodein: Vec<Option<T001<EventFull>>>,
|
||||
merged: Option<T001<EventFull>>,
|
||||
completed: bool,
|
||||
errored: bool,
|
||||
}
|
||||
|
||||
impl MergedBlobsFromRemotes {
|
||||
pub fn new(evq: RawEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self {
|
||||
info!("MergedBlobsFromRemotes evq {:?}", evq);
|
||||
let mut tcp_establish_futs = vec![];
|
||||
for node in &cluster.nodes {
|
||||
let f = x_processed_event_blobs_stream_from_node(evq.clone(), perf_opts.clone(), node.clone());
|
||||
let f: T002<EventFull> = Box::pin(f);
|
||||
tcp_establish_futs.push(f);
|
||||
}
|
||||
let n = tcp_establish_futs.len();
|
||||
Self {
|
||||
tcp_establish_futs,
|
||||
nodein: (0..n).into_iter().map(|_| None).collect(),
|
||||
merged: None,
|
||||
completed: false,
|
||||
errored: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for MergedBlobsFromRemotes {
|
||||
type Item = Sitemty<EventFull>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
'outer: loop {
|
||||
break if self.completed {
|
||||
panic!("poll_next on completed");
|
||||
} else if self.errored {
|
||||
self.completed = true;
|
||||
return Ready(None);
|
||||
} else if let Some(fut) = &mut self.merged {
|
||||
match fut.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => Ready(Some(Ok(k))),
|
||||
Ready(Some(Err(e))) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Ready(None) => {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
} else {
|
||||
let mut pend = false;
|
||||
let mut c1 = 0;
|
||||
for i1 in 0..self.tcp_establish_futs.len() {
|
||||
if self.nodein[i1].is_none() {
|
||||
let f = &mut self.tcp_establish_futs[i1];
|
||||
pin_mut!(f);
|
||||
match f.poll(cx) {
|
||||
Ready(Ok(k)) => {
|
||||
self.nodein[i1] = Some(k);
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
self.errored = true;
|
||||
return Ready(Some(Err(e)));
|
||||
}
|
||||
Pending => {
|
||||
pend = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
c1 += 1;
|
||||
}
|
||||
}
|
||||
if pend {
|
||||
Pending
|
||||
} else {
|
||||
if c1 == self.tcp_establish_futs.len() {
|
||||
let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
|
||||
let s1 = MergedBlobsStream::new(inps);
|
||||
self.merged = Some(Box::pin(s1));
|
||||
}
|
||||
continue 'outer;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ Delivers event data (not yet time-binned) from local storage and provides client
|
||||
to request such data from nodes.
|
||||
*/
|
||||
|
||||
use crate::eventchunker::EventFull;
|
||||
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
|
||||
use crate::raw::eventsfromframes::EventsFromFrames;
|
||||
use err::Error;
|
||||
@@ -41,3 +42,27 @@ where
|
||||
let items = EventsFromFrames::new(frames);
|
||||
Ok(Box::pin(items))
|
||||
}
|
||||
|
||||
pub async fn x_processed_event_blobs_stream_from_node(
|
||||
query: RawEventsQuery,
|
||||
perf_opts: PerfOpts,
|
||||
node: Node,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
|
||||
netpod::log::info!(
|
||||
"x_processed_event_blobs_stream_from_node to: {}:{}",
|
||||
node.host,
|
||||
node.port_raw
|
||||
);
|
||||
let net = TcpStream::connect(format!("{}:{}", node.host, node.port_raw)).await?;
|
||||
let qjs = serde_json::to_string(&query)?;
|
||||
let (netin, mut netout) = net.into_split();
|
||||
let buf = make_frame(&EventQueryJsonStringFrame(qjs))?;
|
||||
netout.write_all(&buf).await?;
|
||||
let buf = make_term_frame();
|
||||
netout.write_all(&buf).await?;
|
||||
netout.flush().await?;
|
||||
netout.forget();
|
||||
let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap);
|
||||
let items = EventsFromFrames::new(frames);
|
||||
Ok(Box::pin(items))
|
||||
}
|
||||
|
||||
@@ -49,6 +49,7 @@ where
|
||||
macro_rules! pipe4 {
|
||||
($nty:ident, $end:ident, $shape:expr, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => {
|
||||
match $agg_kind {
|
||||
AggKind::EventBlobs => panic!(),
|
||||
AggKind::TimeWeightedScalar | AggKind::DimXBins1 => {
|
||||
make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>(
|
||||
$evsv,
|
||||
@@ -191,3 +192,64 @@ pub async fn make_event_pipe(
|
||||
);
|
||||
Ok(pipe)
|
||||
}
|
||||
|
||||
pub async fn make_event_blobs_pipe(
|
||||
evq: &RawEventsQuery,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
|
||||
if false {
|
||||
match dbconn::channel_exists(&evq.channel, &node_config).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err(e)?,
|
||||
}
|
||||
}
|
||||
let range = &evq.range;
|
||||
let channel_config = match read_local_config(&evq.channel, &node_config.node).await {
|
||||
Ok(k) => k,
|
||||
Err(e) => {
|
||||
if e.msg().contains("ErrorKind::NotFound") {
|
||||
let s = futures_util::stream::empty();
|
||||
return Ok(Box::pin(s));
|
||||
} else {
|
||||
return Err(e)?;
|
||||
}
|
||||
}
|
||||
};
|
||||
let entry_res = match extract_matching_config_entry(range, &channel_config) {
|
||||
Ok(k) => k,
|
||||
Err(e) => return Err(e)?,
|
||||
};
|
||||
let entry = match entry_res {
|
||||
MatchingConfigEntry::None => return Err(Error::with_msg("no config entry found"))?,
|
||||
MatchingConfigEntry::Multiple => return Err(Error::with_msg("multiple config entries found"))?,
|
||||
MatchingConfigEntry::Entry(entry) => entry,
|
||||
};
|
||||
let shape = match entry.to_shape() {
|
||||
Ok(k) => k,
|
||||
Err(e) => return Err(e)?,
|
||||
};
|
||||
let channel_config = netpod::ChannelConfig {
|
||||
channel: evq.channel.clone(),
|
||||
keyspace: entry.ks as u8,
|
||||
time_bin_size: entry.bs,
|
||||
shape: shape,
|
||||
scalar_type: entry.scalar_type.clone(),
|
||||
byte_order: entry.byte_order.clone(),
|
||||
array: entry.is_array,
|
||||
compression: entry.is_compressed,
|
||||
};
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||
let event_blobs = EventChunkerMultifile::new(
|
||||
range.clone(),
|
||||
channel_config.clone(),
|
||||
node_config.node.clone(),
|
||||
node_config.ix,
|
||||
evq.disk_io_buffer_size,
|
||||
event_chunker_conf,
|
||||
true,
|
||||
);
|
||||
let s = event_blobs.map(|item| Box::new(item) as Box<dyn Framable>);
|
||||
let pipe: Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>;
|
||||
pipe = Box::pin(s);
|
||||
Ok(pipe)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user