Rename SfDbChannel

This commit is contained in:
Dominik Werder
2023-06-13 16:17:08 +02:00
parent 9c0062d27c
commit 7c77b07db5
40 changed files with 733 additions and 1024 deletions

View File

@@ -1,3 +1,5 @@
pub mod configquorum;
use crate::err::Error;
use crate::gather::gather_get_json_generic;
use crate::gather::SubRes;
@@ -32,7 +34,6 @@ use netpod::query::api1::Api1Query;
use netpod::range::evrange::NanoRange;
use netpod::timeunits::SEC;
use netpod::ByteSize;
use netpod::Channel;
use netpod::ChannelSearchQuery;
use netpod::ChannelSearchResult;
use netpod::DiskIoTune;
@@ -40,6 +41,7 @@ use netpod::NodeConfigCached;
use netpod::PerfOpts;
use netpod::ProxyConfig;
use netpod::ScalarType;
use netpod::SfDbChannel;
use netpod::Shape;
use netpod::ACCEPT_ALL;
use netpod::APP_JSON;
@@ -53,6 +55,7 @@ use query::api4::events::PlainEventsQuery;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
@@ -639,9 +642,9 @@ impl Api1ChannelHeader {
pub struct DataApiPython3DataStream {
range: NanoRange,
channels: Vec<Channel>,
channels: VecDeque<SfDbChannel>,
current_channel: Option<SfDbChannel>,
node_config: NodeConfigCached,
chan_ix: usize,
chan_stream: Option<Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send>>>,
config_fut: Option<Pin<Box<dyn Future<Output = Result<ChannelConfigs, Error>> + Send>>>,
disk_io_tune: DiskIoTune,
@@ -658,7 +661,7 @@ pub struct DataApiPython3DataStream {
impl DataApiPython3DataStream {
pub fn new(
range: NanoRange,
channels: Vec<Channel>,
channels: Vec<SfDbChannel>,
disk_io_tune: DiskIoTune,
do_decompress: bool,
events_max: u64,
@@ -667,9 +670,9 @@ impl DataApiPython3DataStream {
) -> Self {
Self {
range,
channels,
channels: channels.into_iter().collect(),
current_channel: None,
node_config,
chan_ix: 0,
chan_stream: None,
config_fut: None,
disk_io_tune,
@@ -685,7 +688,7 @@ impl DataApiPython3DataStream {
fn convert_item(
b: EventFull,
channel: &Channel,
channel: &SfDbChannel,
entry: &ConfigEntry,
header_out: &mut bool,
count_events: &mut usize,
@@ -755,6 +758,136 @@ impl DataApiPython3DataStream {
}
Ok(d)
}
fn handle_chan_stream_ready(&mut self, item: Result<BytesMut, Error>) -> Option<Result<BytesMut, Error>> {
match item {
Ok(k) => {
let n = Instant::now();
if n.duration_since(self.ping_last) >= Duration::from_millis(2000) {
let mut sb = crate::status_board().unwrap();
sb.mark_alive(&self.status_id);
self.ping_last = n;
}
Some(Ok(k))
}
Err(e) => {
error!("DataApiPython3DataStream emit error: {e:?}");
self.chan_stream = None;
self.data_done = true;
let mut sb = crate::status_board().unwrap();
sb.add_error(&self.status_id, e);
if false {
// TODO format as python data api error frame:
let mut buf = BytesMut::with_capacity(1024);
buf.put_slice("".as_bytes());
Some(Ok(buf))
} else {
None
}
}
}
}
fn handle_config_fut_ready(&mut self, item: Result<ChannelConfigs, Error>) -> Result<(), Error> {
match item {
Ok(config) => {
self.config_fut = None;
let entry_res = match extract_matching_config_entry(&self.range, &config) {
Ok(k) => k,
Err(e) => return Err(e)?,
};
match entry_res {
MatchingConfigEntry::None => {
warn!("DataApiPython3DataStream no config entry found for {:?}", config);
self.chan_stream = Some(Box::pin(stream::empty()));
Ok(())
}
MatchingConfigEntry::Multiple => {
warn!(
"DataApiPython3DataStream multiple config entries found for {:?}",
config
);
self.chan_stream = Some(Box::pin(stream::empty()));
Ok(())
}
MatchingConfigEntry::Entry(entry) => {
let entry = entry.clone();
let channel = self.current_channel.as_ref().unwrap();
debug!("found channel_config for {}: {:?}", channel.name, entry);
let evq = PlainEventsQuery::new(channel.clone(), self.range.clone()).for_event_blobs();
debug!("query for event blobs retrieval: evq {evq:?}");
// TODO important TODO
debug!("TODO fix magic inmem_bufcap");
debug!("TODO add timeout option to data api3 download");
let perf_opts = PerfOpts::default();
// TODO is this a good to place decide this?
let s = if self.node_config.node_config.cluster.is_central_storage {
info!("Set up central storage stream");
// TODO pull up this config
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let s = make_local_event_blobs_stream(
evq.range().try_into()?,
evq.channel().clone(),
&entry,
evq.one_before_range(),
self.do_decompress,
event_chunker_conf,
self.disk_io_tune.clone(),
&self.node_config,
)?;
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
} else {
if let Some(sh) = &entry.shape {
if sh.len() > 1 {
warn!("Remote stream fetch for shape {sh:?}");
}
}
debug!("Set up merged remote stream");
let s = MergedBlobsFromRemotes::new(
evq,
perf_opts,
self.node_config.node_config.cluster.clone(),
);
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
};
let s = s.map({
let mut header_out = false;
let mut count_events = 0;
let channel = self.current_channel.clone().unwrap();
move |b| {
let ret = match b {
Ok(b) => {
let f = match b {
StreamItem::DataItem(RangeCompletableItem::Data(b)) => Self::convert_item(
b,
&channel,
&entry,
&mut header_out,
&mut count_events,
)?,
_ => BytesMut::new(),
};
Ok(f)
}
Err(e) => Err(e),
};
ret
}
});
//let _ = Box::new(s) as Box<dyn Stream<Item = Result<BytesMut, Error>> + Unpin>;
let evm = if self.events_max == 0 {
usize::MAX
} else {
self.events_max as usize
};
self.chan_stream = Some(Box::pin(s.map_err(Error::from).take(evm)));
Ok(())
}
}
}
Err(e) => Err(Error::with_msg_no_trace(format!("can not parse channel config {e}"))),
}
}
}
impl Stream for DataApiPython3DataStream {
@@ -771,148 +904,33 @@ impl Stream for DataApiPython3DataStream {
} else {
if let Some(stream) = &mut self.chan_stream {
match stream.poll_next_unpin(cx) {
Ready(k) => match k {
Some(k) => match k {
Ok(k) => {
let n = Instant::now();
if n.duration_since(self.ping_last) >= Duration::from_millis(2000) {
let mut sb = crate::status_board().unwrap();
sb.mark_alive(&self.status_id);
self.ping_last = n;
}
Ready(Some(Ok(k)))
}
Err(e) => {
error!("DataApiPython3DataStream emit error: {e:?}");
self.data_done = true;
let mut sb = crate::status_board().unwrap();
sb.add_error(&self.status_id, e);
if false {
// TODO format as python data api error frame:
let mut buf = BytesMut::with_capacity(1024);
buf.put_slice("".as_bytes());
Ready(Some(Ok(buf)))
} else {
self.data_done = true;
Ready(None)
}
}
},
None => {
self.chan_stream = None;
continue;
}
},
Ready(Some(k)) => Ready(self.handle_chan_stream_ready(k)),
Ready(None) => {
self.chan_stream = None;
continue;
}
Pending => Pending,
}
} else if let Some(fut) = &mut self.config_fut {
match fut.poll_unpin(cx) {
Ready(Ok(config)) => {
self.config_fut = None;
let entry_res = match extract_matching_config_entry(&self.range, &config) {
Ok(k) => k,
Err(e) => return Err(e)?,
};
match entry_res {
MatchingConfigEntry::None => {
warn!("DataApiPython3DataStream no config entry found for {:?}", config);
self.chan_stream = Some(Box::pin(stream::empty()));
continue;
}
MatchingConfigEntry::Multiple => {
warn!(
"DataApiPython3DataStream multiple config entries found for {:?}",
config
);
self.chan_stream = Some(Box::pin(stream::empty()));
continue;
}
MatchingConfigEntry::Entry(entry) => {
let entry = entry.clone();
let channel = self.channels[self.chan_ix - 1].clone();
debug!("found channel_config for {}: {:?}", channel.name, entry);
let evq = PlainEventsQuery::new(channel, self.range.clone()).for_event_blobs();
info!("query for event blobs retrieval: evq {evq:?}");
warn!("TODO fix magic inmem_bufcap");
warn!("TODO add timeout option to data api3 download");
let perf_opts = PerfOpts::default();
// TODO is this a good to place decide this?
let s = if self.node_config.node_config.cluster.is_central_storage {
info!("Set up central storage stream");
// TODO pull up this config
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let s = make_local_event_blobs_stream(
evq.range().try_into()?,
evq.channel().clone(),
&entry,
evq.one_before_range(),
self.do_decompress,
event_chunker_conf,
self.disk_io_tune.clone(),
&self.node_config,
)?;
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
} else {
if let Some(sh) = &entry.shape {
if sh.len() > 1 {
warn!("Remote stream fetch for shape {sh:?}");
}
}
debug!("Set up merged remote stream");
let s = MergedBlobsFromRemotes::new(
evq,
perf_opts,
self.node_config.node_config.cluster.clone(),
);
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
};
let s = s.map({
let mut header_out = false;
let mut count_events = 0;
let channel = self.channels[self.chan_ix - 1].clone();
move |b| {
let ret = match b {
Ok(b) => {
let f = match b {
StreamItem::DataItem(RangeCompletableItem::Data(b)) => {
Self::convert_item(
b,
&channel,
&entry,
&mut header_out,
&mut count_events,
)?
}
_ => BytesMut::new(),
};
Ok(f)
}
Err(e) => Err(e),
};
ret
}
});
//let _ = Box::new(s) as Box<dyn Stream<Item = Result<BytesMut, Error>> + Unpin>;
let evm = if self.events_max == 0 {
usize::MAX
} else {
self.events_max as usize
};
self.chan_stream = Some(Box::pin(s.map_err(Error::from).take(evm)));
continue;
}
Ready(k) => match self.handle_config_fut_ready(k) {
Ok(()) => continue,
Err(e) => {
self.config_fut = None;
self.data_done = true;
error!("api1_binary_events error {:?}", e);
Ready(Some(Err(e)))
}
}
Ready(Err(e)) => {
self.config_fut = None;
self.data_done = true;
error!("api1_binary_events error {:?}", e);
Ready(Some(Err(Error::with_msg_no_trace("can not parse channel config"))))
}
},
Pending => Pending,
}
} else {
if self.chan_ix >= self.channels.len() {
if let Some(channel) = self.channels.pop_front() {
self.current_channel = Some(channel.clone());
let fut = read_local_config(channel, self.node_config.clone()).map_err(Error::from);
self.config_fut = Some(Box::pin(fut));
continue;
} else {
self.data_done = true;
{
let n = Instant::now();
@@ -922,12 +940,6 @@ impl Stream for DataApiPython3DataStream {
sb.mark_ok(&self.status_id);
}
continue;
} else {
let channel = self.channels[self.chan_ix].clone();
self.chan_ix += 1;
let fut = read_local_config(channel.clone(), self.node_config.clone()).map_err(Error::from);
self.config_fut = Some(Box::pin(fut));
continue;
}
}
};
@@ -1034,7 +1046,7 @@ impl Api1EventsBinaryHandler {
let chans = qu
.channels()
.iter()
.map(|ch| Channel {
.map(|ch| SfDbChannel {
backend: backend.into(),
name: ch.name().into(),
series: None,
@@ -1058,9 +1070,9 @@ impl Api1EventsBinaryHandler {
Ok(ret)
} else {
// TODO set the public error code and message and return Err(e).
let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept));
error!("{e:?}");
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
let e = Error::with_public_msg(format!("Unsupported Accept: {}", accept));
error!("{e}");
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
}
}
}

View File

@@ -0,0 +1,4 @@
pub async fn find_config_quorum() {
// TODO create new endpoint which only returns the most matching config entry
// for some given channel and time range.
}

View File

@@ -13,12 +13,12 @@ use netpod::log::*;
use netpod::query::prebinned::PreBinnedQuery;
use netpod::timeunits::*;
use netpod::ChConf;
use netpod::Channel;
use netpod::ChannelConfigQuery;
use netpod::ChannelConfigResponse;
use netpod::FromUrl;
use netpod::NodeConfigCached;
use netpod::ScalarType;
use netpod::SfDbChannel;
use netpod::Shape;
use netpod::ACCEPT_ALL;
use netpod::APP_JSON;
@@ -100,7 +100,7 @@ impl ChannelConfigHandler {
let conf = if let Some(_scyco) = &node_config.node_config.cluster.scylla {
let c = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), node_config).await?;
ChannelConfigResponse {
channel: Channel {
channel: SfDbChannel {
series: c.series.clone(),
backend: q.channel.backend().into(),
name: c.name,
@@ -110,9 +110,9 @@ impl ChannelConfigHandler {
shape: c.shape,
}
} else if let Some(_) = &node_config.node.channel_archiver {
return Err(Error::with_msg_no_trace("no archiver"));
return Err(Error::with_msg_no_trace("channel archiver not supported"));
} else if let Some(_) = &node_config.node.archiver_appliance {
return Err(Error::with_msg_no_trace("no archapp"));
return Err(Error::with_msg_no_trace("archiver appliance not supported"));
} else {
parse::channelconfig::channel_config(&q, node_config).await?
};
@@ -332,7 +332,7 @@ impl FromUrl for ChannelsWithTypeQuery {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelListWithType {
channels: Vec<Channel>,
channels: Vec<SfDbChannel>,
}
pub struct ScyllaChannelsWithType {}
@@ -392,7 +392,7 @@ impl ScyllaChannelsWithType {
let mut list = Vec::new();
for row in res.rows_typed_or_empty::<(String, i64)>() {
let (channel_name, series) = row.err_conv()?;
let ch = Channel {
let ch = SfDbChannel {
backend: backend.into(),
name: channel_name,
series: Some(series as u64),

View File

@@ -290,6 +290,25 @@ async fn http_service_inner(
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path.starts_with("/api/4/private/logtest/") {
if req.method() == Method::GET {
if path.ends_with("/trace") {
trace!("test trace log output");
} else if path.ends_with("/debug") {
debug!("test debug log output");
} else if path.ends_with("/info") {
info!("test info log output");
} else if path.ends_with("/warn") {
warn!("test warn log output");
} else if path.ends_with("/error") {
error!("test error log output");
} else {
error!("test unknown log output");
}
Ok(response(StatusCode::OK).body(Body::empty())?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if let Some(h) = api4::status::StatusNodesRecursive::handler(&req) {
h.handle(req, ctx, &node_config).await
} else if let Some(h) = StatusBoardAllHandler::handler(&req) {