WIP on sf databuffer channel config decision
This commit is contained in:
@@ -37,3 +37,4 @@ nodenet = { path = "../nodenet" }
|
||||
commonio = { path = "../commonio" }
|
||||
taskrun = { path = "../taskrun" }
|
||||
scyllaconn = { path = "../scyllaconn" }
|
||||
httpclient = { path = "../httpclient" }
|
||||
|
||||
@@ -35,7 +35,6 @@ use netpod::ChannelSearchResult;
|
||||
use netpod::ChannelTypeConfigGen;
|
||||
use netpod::DiskIoTune;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::PerfOpts;
|
||||
use netpod::ProxyConfig;
|
||||
use netpod::SfChFetchInfo;
|
||||
use netpod::SfDbChannel;
|
||||
@@ -45,7 +44,10 @@ use netpod::APP_JSON;
|
||||
use netpod::APP_OCTET;
|
||||
use parse::api1_parse::Api1ByteOrder;
|
||||
use parse::api1_parse::Api1ChannelHeader;
|
||||
use query::api4::events::PlainEventsQuery;
|
||||
use query::api4::events::EventsSubQuery;
|
||||
use query::api4::events::EventsSubQuerySelect;
|
||||
use query::api4::events::EventsSubQuerySettings;
|
||||
use query::transform::TransformQuery;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value as JsonValue;
|
||||
@@ -523,12 +525,12 @@ async fn find_ch_conf(
|
||||
|
||||
pub struct DataApiPython3DataStream {
|
||||
range: NanoRange,
|
||||
channels: VecDeque<SfDbChannel>,
|
||||
current_channel: Option<SfDbChannel>,
|
||||
channels: VecDeque<ChannelTypeConfigGen>,
|
||||
settings: EventsSubQuerySettings,
|
||||
current_channel: Option<ChannelTypeConfigGen>,
|
||||
node_config: NodeConfigCached,
|
||||
chan_stream: Option<Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send>>>,
|
||||
config_fut: Option<Pin<Box<dyn Future<Output = Result<ChannelTypeConfigGen, Error>> + Send>>>,
|
||||
ch_conf: Option<ChannelTypeConfigGen>,
|
||||
disk_io_tune: DiskIoTune,
|
||||
do_decompress: bool,
|
||||
#[allow(unused)]
|
||||
@@ -543,7 +545,8 @@ pub struct DataApiPython3DataStream {
|
||||
impl DataApiPython3DataStream {
|
||||
pub fn new(
|
||||
range: NanoRange,
|
||||
channels: Vec<SfDbChannel>,
|
||||
channels: Vec<ChannelTypeConfigGen>,
|
||||
settings: EventsSubQuerySettings,
|
||||
disk_io_tune: DiskIoTune,
|
||||
do_decompress: bool,
|
||||
events_max: u64,
|
||||
@@ -553,11 +556,11 @@ impl DataApiPython3DataStream {
|
||||
Self {
|
||||
range,
|
||||
channels: channels.into_iter().collect(),
|
||||
settings,
|
||||
current_channel: None,
|
||||
node_config,
|
||||
chan_stream: None,
|
||||
config_fut: None,
|
||||
ch_conf: None,
|
||||
disk_io_tune,
|
||||
do_decompress,
|
||||
event_count: 0,
|
||||
@@ -571,7 +574,7 @@ impl DataApiPython3DataStream {
|
||||
|
||||
fn convert_item(
|
||||
b: EventFull,
|
||||
channel: &SfDbChannel,
|
||||
channel: &ChannelTypeConfigGen,
|
||||
fetch_info: &SfChFetchInfo,
|
||||
header_out: &mut bool,
|
||||
count_events: &mut usize,
|
||||
@@ -674,22 +677,26 @@ impl DataApiPython3DataStream {
|
||||
fn handle_config_fut_ready(&mut self, fetch_info: SfChFetchInfo) -> Result<(), Error> {
|
||||
self.config_fut = None;
|
||||
debug!("found channel_config {:?}", fetch_info);
|
||||
let channel = SfDbChannel::from_name(fetch_info.backend(), fetch_info.name());
|
||||
let evq = PlainEventsQuery::new(channel.clone(), self.range.clone()).for_event_blobs();
|
||||
debug!("query for event blobs retrieval: evq {evq:?}");
|
||||
let select = EventsSubQuerySelect::new(
|
||||
ChannelTypeConfigGen::SfDatabuffer(fetch_info.clone()),
|
||||
self.range.clone().into(),
|
||||
TransformQuery::for_event_blobs(),
|
||||
);
|
||||
let subq = EventsSubQuery::from_parts(select, self.settings.clone());
|
||||
let one_before = subq.transform().need_one_before_range();
|
||||
debug!("query for event blobs retrieval subq {subq:?}");
|
||||
// TODO important TODO
|
||||
debug!("TODO fix magic inmem_bufcap");
|
||||
debug!("TODO add timeout option to data api3 download");
|
||||
let perf_opts = PerfOpts::default();
|
||||
// TODO is this a good to place decide this?
|
||||
let s = if self.node_config.node_config.cluster.is_central_storage {
|
||||
info!("Set up central storage stream");
|
||||
// TODO pull up this config
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024));
|
||||
let s = make_local_event_blobs_stream(
|
||||
evq.range().try_into()?,
|
||||
self.range.clone(),
|
||||
&fetch_info,
|
||||
evq.one_before_range(),
|
||||
one_before,
|
||||
self.do_decompress,
|
||||
event_chunker_conf,
|
||||
self.disk_io_tune.clone(),
|
||||
@@ -698,8 +705,7 @@ impl DataApiPython3DataStream {
|
||||
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
|
||||
} else {
|
||||
debug!("Set up merged remote stream");
|
||||
let ch_conf: ChannelTypeConfigGen = fetch_info.clone().into();
|
||||
let s = MergedBlobsFromRemotes::new(evq, perf_opts, ch_conf, self.node_config.node_config.cluster.clone());
|
||||
let s = MergedBlobsFromRemotes::new(subq, self.node_config.node_config.cluster.clone());
|
||||
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
|
||||
};
|
||||
let s = s.map({
|
||||
@@ -782,11 +788,14 @@ impl Stream for DataApiPython3DataStream {
|
||||
} else {
|
||||
if let Some(channel) = self.channels.pop_front() {
|
||||
self.current_channel = Some(channel.clone());
|
||||
self.config_fut = Some(Box::pin(find_ch_conf(
|
||||
self.range.clone(),
|
||||
channel,
|
||||
self.node_config.clone(),
|
||||
)));
|
||||
if false {
|
||||
self.config_fut = Some(Box::pin(find_ch_conf(
|
||||
self.range.clone(),
|
||||
err::todoval(),
|
||||
self.node_config.clone(),
|
||||
)));
|
||||
}
|
||||
self.config_fut = Some(Box::pin(futures_util::future::ready(Ok(channel))));
|
||||
continue;
|
||||
} else {
|
||||
self.data_done = true;
|
||||
@@ -805,6 +814,7 @@ impl Stream for DataApiPython3DataStream {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn shape_to_api3proto(sh: &Option<Vec<u32>>) -> Vec<u32> {
|
||||
match sh {
|
||||
None => vec![],
|
||||
@@ -883,7 +893,7 @@ impl Api1EventsBinaryHandler {
|
||||
qu: Api1Query,
|
||||
accept: String,
|
||||
span: tracing::Span,
|
||||
node_config: &NodeConfigCached,
|
||||
ncc: &NodeConfigCached,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
// TODO this should go to usage statistics:
|
||||
info!(
|
||||
@@ -892,6 +902,7 @@ impl Api1EventsBinaryHandler {
|
||||
qu.channels().len(),
|
||||
qu.channels().first()
|
||||
);
|
||||
let settings = EventsSubQuerySettings::from(&qu);
|
||||
let beg_date = qu.range().beg().clone();
|
||||
let end_date = qu.range().end().clone();
|
||||
trace!("Api1Query beg_date {:?} end_date {:?}", beg_date, end_date);
|
||||
@@ -902,22 +913,27 @@ impl Api1EventsBinaryHandler {
|
||||
let end = end_date.timestamp() as u64 * SEC + end_date.timestamp_subsec_nanos() as u64;
|
||||
let range = NanoRange { beg, end };
|
||||
// TODO check for valid given backend name:
|
||||
let backend = &node_config.node_config.cluster.backend;
|
||||
let chans = qu
|
||||
.channels()
|
||||
.iter()
|
||||
.map(|ch| SfDbChannel::from_name(backend, ch.name()))
|
||||
.collect();
|
||||
let backend = &ncc.node_config.cluster.backend;
|
||||
// TODO ask for channel config quorum for all channels up front.
|
||||
//httpclient::http_get(url, accept);
|
||||
let mut chans = Vec::new();
|
||||
for ch in qu.channels() {
|
||||
let ch = SfDbChannel::from_name(backend, ch.name());
|
||||
let ch_conf = nodenet::configquorum::find_config_basics_quorum(ch, range.clone().into(), ncc).await?;
|
||||
chans.push(ch_conf);
|
||||
}
|
||||
// TODO use a better stream protocol with built-in error delivery.
|
||||
let status_id = super::status_board()?.new_status_id();
|
||||
let s = DataApiPython3DataStream::new(
|
||||
range.clone(),
|
||||
chans,
|
||||
qu.disk_io_tune().clone(),
|
||||
// TODO carry those settings from the query again
|
||||
settings,
|
||||
DiskIoTune::default(),
|
||||
qu.decompress(),
|
||||
qu.events_max().unwrap_or(u64::MAX),
|
||||
status_id.clone(),
|
||||
node_config.clone(),
|
||||
ncc.clone(),
|
||||
);
|
||||
let s = s.instrument(span);
|
||||
let body = BodyStream::wrapped(s, format!("Api1EventsBinaryHandler"));
|
||||
|
||||
@@ -37,17 +37,18 @@ pub async fn chconf_from_events_v1(
|
||||
q: &PlainEventsQuery,
|
||||
ncc: &NodeConfigCached,
|
||||
) -> Result<ChannelTypeConfigGen, Error> {
|
||||
let ret = nodenet::configquorum::find_config_basics_quorum(q.channel(), ncc).await?;
|
||||
let ret = nodenet::configquorum::find_config_basics_quorum(q.channel().clone(), q.range().clone(), ncc).await?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn chconf_from_prebinned(q: &PreBinnedQuery, ncc: &NodeConfigCached) -> Result<ChannelTypeConfigGen, Error> {
|
||||
let ret = nodenet::configquorum::find_config_basics_quorum(q.channel(), ncc).await?;
|
||||
let ret =
|
||||
nodenet::configquorum::find_config_basics_quorum(q.channel().clone(), q.patch().patch_range(), ncc).await?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn ch_conf_from_binned(q: &BinnedQuery, ncc: &NodeConfigCached) -> Result<ChannelTypeConfigGen, Error> {
|
||||
let ret = nodenet::configquorum::find_config_basics_quorum(q.channel(), ncc).await?;
|
||||
let ret = nodenet::configquorum::find_config_basics_quorum(q.channel().clone(), q.range().clone(), ncc).await?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -142,10 +143,10 @@ impl ChannelConfigsHandler {
|
||||
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||
let q = ChannelConfigQuery::from_url(&url)?;
|
||||
info!("channel_configs for q {q:?}");
|
||||
let ch_conf = nodenet::channelconfig::channel_config(q.range.clone(), q.channel, ncc).await?;
|
||||
let ch_confs = nodenet::channelconfig::channel_configs(q.channel, ncc).await?;
|
||||
let ret = response(StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, APP_JSON)
|
||||
.body(Body::from(serde_json::to_string(&ch_conf)?))?;
|
||||
.body(Body::from(serde_json::to_string(&ch_confs)?))?;
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user