Refactor config entry matching
This commit is contained in:
@@ -16,10 +16,11 @@ use netpod::Shape;
|
||||
/// In the future, we can even try to involve time range information for that, but backends like
|
||||
/// old archivers and sf databuffer do not support such lookup.
|
||||
pub async fn chconf_from_scylla_type_backend(channel: &SfDbChannel, ncc: &NodeConfigCached) -> Result<ChConf, Error> {
|
||||
if channel.backend != ncc.node_config.cluster.backend {
|
||||
if channel.backend() != ncc.node_config.cluster.backend {
|
||||
warn!(
|
||||
"mismatched backend {} vs {}",
|
||||
channel.backend, ncc.node_config.cluster.backend
|
||||
channel.backend(),
|
||||
ncc.node_config.cluster.backend
|
||||
);
|
||||
}
|
||||
let backend = channel.backend().into();
|
||||
|
||||
@@ -74,7 +74,7 @@ pub async fn create_connection(db_config: &Database) -> Result<PgClient, Error>
|
||||
pub async fn channel_exists(channel: &SfDbChannel, node_config: &NodeConfigCached) -> Result<bool, Error> {
|
||||
let cl = create_connection(&node_config.node_config.cluster.database).await?;
|
||||
let rows = cl
|
||||
.query("select rowid from channels where name = $1::text", &[&channel.name])
|
||||
.query("select rowid from channels where name = $1::text", &[&channel.name()])
|
||||
.await
|
||||
.err_conv()?;
|
||||
debug!("channel_exists {} rows", rows.len());
|
||||
|
||||
@@ -6,21 +6,21 @@ use netpod::NodeConfigCached;
|
||||
use netpod::SfDbChannel;
|
||||
|
||||
// For sf-databuffer backend, given a Channel, try to complete the information if only id is given.
|
||||
pub async fn sf_databuffer_fetch_channel_by_series(
|
||||
#[allow(unused)]
|
||||
async fn sf_databuffer_fetch_channel_by_series(
|
||||
channel: SfDbChannel,
|
||||
ncc: &NodeConfigCached,
|
||||
) -> Result<SfDbChannel, Error> {
|
||||
info!("sf_databuffer_fetch_channel_by_series");
|
||||
let me = "sf_databuffer_fetch_channel_by_series";
|
||||
info!("{me}");
|
||||
// TODO should not be needed at some point.
|
||||
if channel.backend().is_empty() || channel.name().is_empty() {
|
||||
if let Some(series) = channel.series() {
|
||||
if series < 1 {
|
||||
error!("sf_databuffer_fetch_channel_by_series bad input: {channel:?}");
|
||||
Err(Error::with_msg_no_trace(format!(
|
||||
"sf_databuffer_fetch_channel_by_series bad input: {channel:?}"
|
||||
)))
|
||||
error!("{me} bad input: {channel:?}");
|
||||
Err(Error::with_msg_no_trace(format!("{me} bad input: {channel:?}")))
|
||||
} else {
|
||||
info!("sf_databuffer_fetch_channel_by_series do the lookup");
|
||||
info!("{me} do the lookup");
|
||||
let series = channel
|
||||
.series()
|
||||
.ok_or_else(|| Error::with_msg_no_trace("no series id given"))? as i64;
|
||||
@@ -30,21 +30,18 @@ pub async fn sf_databuffer_fetch_channel_by_series(
|
||||
.await
|
||||
.err_conv()?;
|
||||
if let Some(row) = rows.pop() {
|
||||
info!("sf_databuffer_fetch_channel_by_series got a row {row:?}");
|
||||
info!("{me} got a row {row:?}");
|
||||
let name: String = row.get(0);
|
||||
let channel =
|
||||
SfDbChannel::from_full(ncc.node_config.cluster.backend.clone(), channel.series(), name);
|
||||
info!("sf_databuffer_fetch_channel_by_series return {channel:?}");
|
||||
let channel = SfDbChannel::from_full(&ncc.node_config.cluster.backend, channel.series(), name);
|
||||
info!("{me} return {channel:?}");
|
||||
Ok(channel)
|
||||
} else {
|
||||
info!("sf_databuffer_fetch_channel_by_series nothing found");
|
||||
info!("{me} nothing found");
|
||||
Err(Error::with_msg_no_trace("can not find series"))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Err(Error::with_msg_no_trace(format!(
|
||||
"sf_databuffer_fetch_channel_by_series bad input: {channel:?}"
|
||||
)))
|
||||
Err(Error::with_msg_no_trace(format!("{me} bad input: {channel:?}")))
|
||||
}
|
||||
} else {
|
||||
Ok(channel)
|
||||
|
||||
@@ -20,8 +20,8 @@ use tiny_keccak::Hasher;
|
||||
// No longer needed for scylla-based caching.
|
||||
pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoordEnum, channel: &SfDbChannel, cluster: &Cluster) -> u32 {
|
||||
let mut hash = tiny_keccak::Sha3::v256();
|
||||
hash.update(channel.backend.as_bytes());
|
||||
hash.update(channel.name.as_bytes());
|
||||
hash.update(channel.backend().as_bytes());
|
||||
hash.update(channel.name().as_bytes());
|
||||
/*hash.update(&patch_coord.patch_beg().to_le_bytes());
|
||||
hash.update(&patch_coord.patch_end().to_le_bytes());
|
||||
hash.update(&patch_coord.bin_t_len().to_le_bytes());
|
||||
@@ -53,8 +53,8 @@ impl CacheFileDesc {
|
||||
pub fn hash(&self) -> String {
|
||||
let mut h = tiny_keccak::Sha3::v256();
|
||||
h.update(b"V000");
|
||||
h.update(self.channel.backend.as_bytes());
|
||||
h.update(self.channel.name.as_bytes());
|
||||
h.update(self.channel.backend().as_bytes());
|
||||
h.update(self.channel.name().as_bytes());
|
||||
h.update(format!("{}", self.agg_kind).as_bytes());
|
||||
//h.update(&self.patch.spec().bin_t_len().to_le_bytes());
|
||||
//h.update(&self.patch.spec().patch_t_len().to_le_bytes());
|
||||
@@ -67,8 +67,8 @@ impl CacheFileDesc {
|
||||
pub fn hash_channel(&self) -> String {
|
||||
let mut h = tiny_keccak::Sha3::v256();
|
||||
h.update(b"V000");
|
||||
h.update(self.channel.backend.as_bytes());
|
||||
h.update(self.channel.name.as_bytes());
|
||||
h.update(self.channel.backend().as_bytes());
|
||||
h.update(self.channel.name().as_bytes());
|
||||
let mut buf = [0; 32];
|
||||
h.finalize(&mut buf);
|
||||
hex::encode(&buf)
|
||||
@@ -83,7 +83,7 @@ impl CacheFileDesc {
|
||||
.join("cache")
|
||||
.join(&hc[0..3])
|
||||
.join(&hc[3..6])
|
||||
.join(&self.channel.name)
|
||||
.join(self.channel.name())
|
||||
.join(format!("{}", self.agg_kind))
|
||||
/*.join(format!(
|
||||
"{:010}-{:010}",
|
||||
|
||||
@@ -6,50 +6,57 @@ use netpod::SfDbChannel;
|
||||
use parse::channelconfig::extract_matching_config_entry;
|
||||
use parse::channelconfig::read_local_config;
|
||||
use parse::channelconfig::ChannelConfigs;
|
||||
use parse::channelconfig::MatchingConfigEntry;
|
||||
use parse::channelconfig::ConfigEntry;
|
||||
|
||||
pub async fn config(
|
||||
range: NanoRange,
|
||||
pub async fn config_entry_best_match(
|
||||
range: &NanoRange,
|
||||
channel: SfDbChannel,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<SfDbChConf, Error> {
|
||||
let channel_configs = read_local_config(channel.clone(), node_config.clone()).await?;
|
||||
let entry_res = match extract_matching_config_entry(&range, &channel_configs) {
|
||||
) -> Result<Option<ConfigEntry>, Error> {
|
||||
let channel_config = read_local_config(channel.clone(), node_config.clone()).await?;
|
||||
let entry_res = match extract_matching_config_entry(range, &channel_config) {
|
||||
Ok(k) => k,
|
||||
Err(e) => return Err(e)?,
|
||||
};
|
||||
let entry = match entry_res {
|
||||
MatchingConfigEntry::None => {
|
||||
return Err(Error::with_public_msg(format!(
|
||||
"disk::channelconfig no config entry found for {:?}",
|
||||
channel
|
||||
)))?
|
||||
}
|
||||
MatchingConfigEntry::Multiple => {
|
||||
return Err(Error::with_public_msg(format!(
|
||||
"disk::channelconfig multiple config entries in range found for {:?}",
|
||||
channel
|
||||
)))?
|
||||
}
|
||||
MatchingConfigEntry::Entry(entry) => entry,
|
||||
};
|
||||
let shape = match entry.to_shape() {
|
||||
Ok(k) => k,
|
||||
Err(e) => return Err(e)?,
|
||||
};
|
||||
let channel_config = SfDbChConf {
|
||||
channel: channel.clone(),
|
||||
keyspace: entry.ks as u8,
|
||||
time_bin_size: entry.bs.clone(),
|
||||
shape,
|
||||
scalar_type: entry.scalar_type.clone(),
|
||||
byte_order: entry.byte_order.clone(),
|
||||
array: entry.is_array,
|
||||
compression: entry.is_compressed,
|
||||
};
|
||||
Ok(channel_config)
|
||||
match entry_res.best() {
|
||||
None => Ok(None),
|
||||
Some(x) => Ok(Some(x.clone())),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn configs(channel: SfDbChannel, node_config: &NodeConfigCached) -> Result<ChannelConfigs, Error> {
|
||||
read_local_config(channel.clone(), node_config.clone()).await
|
||||
}
|
||||
|
||||
pub async fn channel_config_best_match(
|
||||
range: NanoRange,
|
||||
channel: SfDbChannel,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Option<SfDbChConf>, Error> {
|
||||
let best = config_entry_best_match(&range, channel.clone(), node_config).await?;
|
||||
let channel_configs = read_local_config(channel.clone(), node_config.clone()).await?;
|
||||
let entry_res = match extract_matching_config_entry(&range, &channel_configs) {
|
||||
Ok(k) => k,
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
match entry_res.best() {
|
||||
None => Ok(None),
|
||||
Some(entry) => {
|
||||
let shape = match entry.to_shape() {
|
||||
Ok(k) => k,
|
||||
Err(e) => return Err(e)?,
|
||||
};
|
||||
let channel_config = SfDbChConf {
|
||||
channel: channel.clone(),
|
||||
keyspace: entry.ks as u8,
|
||||
time_bin_size: entry.bs.clone(),
|
||||
shape,
|
||||
scalar_type: entry.scalar_type.clone(),
|
||||
byte_order: entry.byte_order.clone(),
|
||||
array: entry.is_array,
|
||||
compression: entry.is_compressed,
|
||||
};
|
||||
Ok(Some(channel_config))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -152,12 +152,12 @@ async fn gen_node(split: u32, node: &Node, ensemble: &Ensemble) -> Result<(), Er
|
||||
|
||||
async fn gen_channel(chn: &ChannelGenProps, split: u32, node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
|
||||
let sfc = node.sf_databuffer.as_ref().unwrap();
|
||||
let config_path = sfc.data_base_path.join("config").join(&chn.config.channel.name);
|
||||
let config_path = sfc.data_base_path.join("config").join(chn.config.channel.name());
|
||||
let channel_path = sfc
|
||||
.data_base_path
|
||||
.join(format!("{}_{}", sfc.ksprefix, chn.config.keyspace))
|
||||
.join("byTime")
|
||||
.join(&chn.config.channel.name);
|
||||
.join(chn.config.channel.name());
|
||||
tokio::fs::create_dir_all(&channel_path).await?;
|
||||
gen_config(&config_path, &chn.config, node, ensemble)
|
||||
.await
|
||||
@@ -200,7 +200,7 @@ async fn gen_config(config_path: &Path, config: &SfDbChConf, _node: &Node, _ense
|
||||
let mut buf = BytesMut::with_capacity(1024 * 1);
|
||||
let ver = 0;
|
||||
buf.put_i16(ver);
|
||||
let cnenc = config.channel.name.as_bytes();
|
||||
let cnenc = config.channel.name().as_bytes();
|
||||
let len1 = cnenc.len() + 8;
|
||||
buf.put_i32(len1 as i32);
|
||||
buf.put(cnenc);
|
||||
@@ -385,7 +385,7 @@ async fn gen_timebin(
|
||||
|
||||
async fn gen_datafile_header(file: &mut CountedFile, config: &SfDbChConf) -> Result<(), Error> {
|
||||
let mut buf = BytesMut::with_capacity(1024);
|
||||
let cnenc = config.channel.name.as_bytes();
|
||||
let cnenc = config.channel.name().as_bytes();
|
||||
let len1 = cnenc.len() + 8;
|
||||
buf.put_i16(0);
|
||||
buf.put_i32(len1 as i32);
|
||||
|
||||
@@ -18,7 +18,7 @@ pub fn datapath(timebin: u64, config: &SfDbChConf, split: u32, node: &Node) -> P
|
||||
config.keyspace
|
||||
))
|
||||
.join("byTime")
|
||||
.join(config.channel.name.clone())
|
||||
.join(config.channel.name())
|
||||
.join(format!("{:019}", timebin))
|
||||
.join(format!("{:010}", split))
|
||||
.join(format!("{:019}_00000_Data", config.time_bin_size.ns() / MS))
|
||||
@@ -36,7 +36,7 @@ pub async fn datapaths_for_timebin(timebin: u64, config: &SfDbChConf, node: &Nod
|
||||
.data_base_path
|
||||
.join(format!("{}_{}", sfc.ksprefix, config.keyspace))
|
||||
.join("byTime")
|
||||
.join(config.channel.name.clone())
|
||||
.join(config.channel.name())
|
||||
.join(format!("{:019}", timebin));
|
||||
let rd = tokio::fs::read_dir(timebin_path).await?;
|
||||
let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd);
|
||||
@@ -71,7 +71,7 @@ pub async fn datapaths_for_timebin(timebin: u64, config: &SfDbChConf, node: &Nod
|
||||
.data_base_path
|
||||
.join(format!("{}_{}", sfc.ksprefix, config.keyspace))
|
||||
.join("byTime")
|
||||
.join(config.channel.name.clone())
|
||||
.join(config.channel.name())
|
||||
.join(format!("{:019}", timebin))
|
||||
.join(format!("{:010}", split))
|
||||
.join(format!("{:019}_00000_Data", config.time_bin_size.ns() / MS));
|
||||
@@ -86,7 +86,7 @@ pub fn channel_timebins_dir_path(channel_config: &SfDbChConf, node: &Node) -> Re
|
||||
.data_base_path
|
||||
.join(format!("{}_{}", sfc.ksprefix, channel_config.keyspace))
|
||||
.join("byTime")
|
||||
.join(&channel_config.channel.name);
|
||||
.join(channel_config.channel.name());
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::channelconfig::config_entry_best_match;
|
||||
use crate::eventblobs::EventChunkerMultifile;
|
||||
use crate::eventchunker::EventChunkerConf;
|
||||
use crate::raw::generated::EventBlobsGeneratorI32Test00;
|
||||
@@ -20,10 +21,7 @@ use netpod::ChConf;
|
||||
use netpod::DiskIoTune;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::SfDbChannel;
|
||||
use parse::channelconfig::extract_matching_config_entry;
|
||||
use parse::channelconfig::read_local_config;
|
||||
use parse::channelconfig::ConfigEntry;
|
||||
use parse::channelconfig::MatchingConfigEntry;
|
||||
use query::api4::events::PlainEventsQuery;
|
||||
use std::pin::Pin;
|
||||
|
||||
@@ -63,32 +61,16 @@ pub async fn make_event_pipe(
|
||||
chconf: ChConf,
|
||||
ncc: &NodeConfigCached,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
|
||||
info!("---------- disk::raw::conn::make_event_pipe");
|
||||
if false {
|
||||
match dbconn::channel_exists(&evq.channel(), &ncc).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err(e)?,
|
||||
}
|
||||
}
|
||||
let chn = evq.channel().clone();
|
||||
let chn = if chn.name().is_empty() {
|
||||
if let Some(series) = chn.series() {
|
||||
if series < 1 {
|
||||
error!("BAD QUERY: {evq:?}");
|
||||
return Err(Error::with_msg_no_trace(format!("BAD QUERY: {evq:?}")));
|
||||
} else {
|
||||
dbconn::query::sf_databuffer_fetch_channel_by_series(chn, ncc).await?
|
||||
}
|
||||
} else {
|
||||
chn
|
||||
}
|
||||
} else {
|
||||
chn
|
||||
};
|
||||
// sf-databuffer type backends identify channels by their (backend, name) only.
|
||||
let channel = evq.channel().clone();
|
||||
let range = evq.range().clone();
|
||||
let channel_config = crate::channelconfig::config(evq.range().try_into()?, chn, ncc).await;
|
||||
let channel_config = match channel_config {
|
||||
Ok(x) => x,
|
||||
let x = crate::channelconfig::channel_config_best_match(evq.range().try_into()?, channel, ncc).await;
|
||||
let channel_config = match x {
|
||||
Ok(Some(x)) => x,
|
||||
Ok(None) => {
|
||||
error!("make_event_pipe can not find config");
|
||||
return Err(Error::with_msg_no_trace("make_event_pipe can not find config"));
|
||||
}
|
||||
Err(e) => {
|
||||
error!("make_event_pipe can not find config");
|
||||
if e.msg().contains("ErrorKind::NotFound") {
|
||||
@@ -96,7 +78,7 @@ pub async fn make_event_pipe(
|
||||
let s = futures_util::stream::empty();
|
||||
return Ok(Box::pin(s));
|
||||
} else {
|
||||
return Err(e)?;
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -128,35 +110,6 @@ pub async fn make_event_pipe(
|
||||
Ok(pipe)
|
||||
}
|
||||
|
||||
pub async fn get_applicable_entry(
|
||||
range: &NanoRange,
|
||||
channel: SfDbChannel,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<ConfigEntry, Error> {
|
||||
debug!("disk::raw::conn::get_applicable_entry");
|
||||
let channel_config = read_local_config(channel.clone(), node_config.clone()).await?;
|
||||
let entry_res = match extract_matching_config_entry(range, &channel_config) {
|
||||
Ok(k) => k,
|
||||
Err(e) => return Err(e)?,
|
||||
};
|
||||
let entry = match entry_res {
|
||||
MatchingConfigEntry::None => {
|
||||
return Err(Error::with_public_msg(format!(
|
||||
"get_applicable_entry no config entry found {:?}",
|
||||
channel
|
||||
)))?
|
||||
}
|
||||
MatchingConfigEntry::Multiple => {
|
||||
return Err(Error::with_public_msg(format!(
|
||||
"get_applicable_entry multiple config entries found for {:?}",
|
||||
channel
|
||||
)))?
|
||||
}
|
||||
MatchingConfigEntry::Entry(entry) => entry,
|
||||
};
|
||||
Ok(entry.clone())
|
||||
}
|
||||
|
||||
pub fn make_local_event_blobs_stream(
|
||||
range: NanoRange,
|
||||
channel: SfDbChannel,
|
||||
@@ -264,8 +217,13 @@ pub async fn make_event_blobs_pipe_real(
|
||||
}
|
||||
let expand = evq.one_before_range();
|
||||
let range = evq.range();
|
||||
let entry = match get_applicable_entry(&evq.range().try_into()?, evq.channel().clone(), node_config).await {
|
||||
Ok(x) => x,
|
||||
let entry = match config_entry_best_match(&evq.range().try_into()?, evq.channel().clone(), node_config).await {
|
||||
Ok(Some(x)) => x,
|
||||
Ok(None) => {
|
||||
let e = Error::with_msg_no_trace("no config entry found");
|
||||
error!("{e}");
|
||||
return Err(e);
|
||||
}
|
||||
Err(e) => {
|
||||
if e.to_public_error().msg().contains("no config entry found") {
|
||||
let item = items_0::streamitem::LogItem {
|
||||
@@ -280,7 +238,6 @@ pub async fn make_event_blobs_pipe_real(
|
||||
}
|
||||
};
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||
type ItemType = Sitemty<EventFull>;
|
||||
// TODO should depend on host config
|
||||
let do_local = node_config.node_config.cluster.is_central_storage;
|
||||
let pipe = if do_local {
|
||||
@@ -306,7 +263,9 @@ pub async fn make_event_blobs_pipe_real(
|
||||
DiskIoTune::default(),
|
||||
node_config,
|
||||
)?;
|
||||
/*let s = event_blobs.map(|item: ItemType| Box::new(item) as Box<dyn Framable + Send>);
|
||||
/*
|
||||
type ItemType = Sitemty<EventFull>;
|
||||
let s = event_blobs.map(|item: ItemType| Box::new(item) as Box<dyn Framable + Send>);
|
||||
//let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe"));
|
||||
let pipe: Pin<Box<dyn Stream<Item = Box<dyn Framable + Send>> + Send>>;
|
||||
pipe = Box::pin(s);
|
||||
|
||||
@@ -50,7 +50,6 @@ use parse::channelconfig::extract_matching_config_entry;
|
||||
use parse::channelconfig::read_local_config;
|
||||
use parse::channelconfig::ChannelConfigs;
|
||||
use parse::channelconfig::ConfigEntry;
|
||||
use parse::channelconfig::MatchingConfigEntry;
|
||||
use query::api4::events::PlainEventsQuery;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@@ -719,7 +718,7 @@ impl DataApiPython3DataStream {
|
||||
let compression = if let Some(_) = &b.comps[i1] { Some(1) } else { None };
|
||||
if !*header_out {
|
||||
let head = Api1ChannelHeader {
|
||||
name: channel.name.clone(),
|
||||
name: channel.name().into(),
|
||||
ty: (&b.scalar_types[i1]).into(),
|
||||
byte_order: if b.be[i1] {
|
||||
Api1ByteOrder::Big
|
||||
@@ -792,98 +791,79 @@ impl DataApiPython3DataStream {
|
||||
match item {
|
||||
Ok(config) => {
|
||||
self.config_fut = None;
|
||||
let entry_res = match extract_matching_config_entry(&self.range, &config) {
|
||||
Ok(k) => k,
|
||||
Err(e) => return Err(e)?,
|
||||
};
|
||||
match entry_res {
|
||||
MatchingConfigEntry::None => {
|
||||
let res = extract_matching_config_entry(&self.range, &config)?;
|
||||
let entry = match res.best() {
|
||||
Some(k) => k,
|
||||
None => {
|
||||
warn!("DataApiPython3DataStream no config entry found for {:?}", config);
|
||||
self.chan_stream = Some(Box::pin(stream::empty()));
|
||||
Ok(())
|
||||
// TODO remember the issue for status and metrics
|
||||
return Ok(());
|
||||
}
|
||||
MatchingConfigEntry::Multiple => {
|
||||
warn!(
|
||||
"DataApiPython3DataStream multiple config entries found for {:?}",
|
||||
config
|
||||
);
|
||||
self.chan_stream = Some(Box::pin(stream::empty()));
|
||||
Ok(())
|
||||
};
|
||||
let entry = entry.clone();
|
||||
let channel = self.current_channel.as_ref().unwrap();
|
||||
debug!("found channel_config for {}: {:?}", channel.name(), entry);
|
||||
let evq = PlainEventsQuery::new(channel.clone(), self.range.clone()).for_event_blobs();
|
||||
debug!("query for event blobs retrieval: evq {evq:?}");
|
||||
// TODO important TODO
|
||||
debug!("TODO fix magic inmem_bufcap");
|
||||
debug!("TODO add timeout option to data api3 download");
|
||||
let perf_opts = PerfOpts::default();
|
||||
// TODO is this a good to place decide this?
|
||||
let s = if self.node_config.node_config.cluster.is_central_storage {
|
||||
info!("Set up central storage stream");
|
||||
// TODO pull up this config
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||
let s = make_local_event_blobs_stream(
|
||||
evq.range().try_into()?,
|
||||
evq.channel().clone(),
|
||||
&entry,
|
||||
evq.one_before_range(),
|
||||
self.do_decompress,
|
||||
event_chunker_conf,
|
||||
self.disk_io_tune.clone(),
|
||||
&self.node_config,
|
||||
)?;
|
||||
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
|
||||
} else {
|
||||
if let Some(sh) = &entry.shape {
|
||||
if sh.len() > 1 {
|
||||
warn!("Remote stream fetch for shape {sh:?}");
|
||||
}
|
||||
}
|
||||
MatchingConfigEntry::Entry(entry) => {
|
||||
let entry = entry.clone();
|
||||
let channel = self.current_channel.as_ref().unwrap();
|
||||
debug!("found channel_config for {}: {:?}", channel.name, entry);
|
||||
let evq = PlainEventsQuery::new(channel.clone(), self.range.clone()).for_event_blobs();
|
||||
debug!("query for event blobs retrieval: evq {evq:?}");
|
||||
// TODO important TODO
|
||||
debug!("TODO fix magic inmem_bufcap");
|
||||
debug!("TODO add timeout option to data api3 download");
|
||||
let perf_opts = PerfOpts::default();
|
||||
// TODO is this a good to place decide this?
|
||||
let s = if self.node_config.node_config.cluster.is_central_storage {
|
||||
info!("Set up central storage stream");
|
||||
// TODO pull up this config
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||
let s = make_local_event_blobs_stream(
|
||||
evq.range().try_into()?,
|
||||
evq.channel().clone(),
|
||||
&entry,
|
||||
evq.one_before_range(),
|
||||
self.do_decompress,
|
||||
event_chunker_conf,
|
||||
self.disk_io_tune.clone(),
|
||||
&self.node_config,
|
||||
)?;
|
||||
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
|
||||
} else {
|
||||
if let Some(sh) = &entry.shape {
|
||||
if sh.len() > 1 {
|
||||
warn!("Remote stream fetch for shape {sh:?}");
|
||||
}
|
||||
}
|
||||
debug!("Set up merged remote stream");
|
||||
let s = MergedBlobsFromRemotes::new(
|
||||
evq,
|
||||
perf_opts,
|
||||
self.node_config.node_config.cluster.clone(),
|
||||
);
|
||||
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
|
||||
};
|
||||
let s = s.map({
|
||||
let mut header_out = false;
|
||||
let mut count_events = 0;
|
||||
let channel = self.current_channel.clone().unwrap();
|
||||
move |b| {
|
||||
let ret = match b {
|
||||
Ok(b) => {
|
||||
let f = match b {
|
||||
StreamItem::DataItem(RangeCompletableItem::Data(b)) => Self::convert_item(
|
||||
b,
|
||||
&channel,
|
||||
&entry,
|
||||
&mut header_out,
|
||||
&mut count_events,
|
||||
)?,
|
||||
_ => BytesMut::new(),
|
||||
};
|
||||
Ok(f)
|
||||
debug!("Set up merged remote stream");
|
||||
let s = MergedBlobsFromRemotes::new(evq, perf_opts, self.node_config.node_config.cluster.clone());
|
||||
Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>
|
||||
};
|
||||
let s = s.map({
|
||||
let mut header_out = false;
|
||||
let mut count_events = 0;
|
||||
let channel = self.current_channel.clone().unwrap();
|
||||
move |b| {
|
||||
let ret = match b {
|
||||
Ok(b) => {
|
||||
let f = match b {
|
||||
StreamItem::DataItem(RangeCompletableItem::Data(b)) => {
|
||||
Self::convert_item(b, &channel, &entry, &mut header_out, &mut count_events)?
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
_ => BytesMut::new(),
|
||||
};
|
||||
ret
|
||||
Ok(f)
|
||||
}
|
||||
});
|
||||
//let _ = Box::new(s) as Box<dyn Stream<Item = Result<BytesMut, Error>> + Unpin>;
|
||||
let evm = if self.events_max == 0 {
|
||||
usize::MAX
|
||||
} else {
|
||||
self.events_max as usize
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
self.chan_stream = Some(Box::pin(s.map_err(Error::from).take(evm)));
|
||||
Ok(())
|
||||
ret
|
||||
}
|
||||
}
|
||||
});
|
||||
//let _ = Box::new(s) as Box<dyn Stream<Item = Result<BytesMut, Error>> + Unpin>;
|
||||
let evm = if self.events_max == 0 {
|
||||
usize::MAX
|
||||
} else {
|
||||
self.events_max as usize
|
||||
};
|
||||
self.chan_stream = Some(Box::pin(s.map_err(Error::from).take(evm)));
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => Err(Error::with_msg_no_trace(format!("can not parse channel config {e}"))),
|
||||
}
|
||||
|
||||
@@ -605,8 +605,8 @@ pub struct SfDbChannel {
|
||||
// "backend" is currently used in the existing systems for multiple purposes:
|
||||
// it can indicate the facility (eg. sf-databuffer, hipa, ...) but also
|
||||
// some special subsystem (eg. sf-rf-databuffer).
|
||||
pub backend: String,
|
||||
pub name: String,
|
||||
backend: String,
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl SfDbChannel {
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use dbconn::query::sf_databuffer_fetch_channel_by_series;
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
@@ -81,26 +80,17 @@ pub async fn channel_config(range: NanoRange, channel: SfDbChannel, ncc: &NodeCo
|
||||
.map_err(Error::from)?;
|
||||
Ok(ret)
|
||||
} else if ncc.node.sf_databuffer.is_some() {
|
||||
debug!("channel_config BEFORE {channel:?}");
|
||||
debug!("try to get ChConf for sf-databuffer type backend");
|
||||
// TODO in the future we should not need this:
|
||||
let mut channel = sf_databuffer_fetch_channel_by_series(channel, ncc).await?;
|
||||
if channel.series().is_none() {
|
||||
let pgclient = dbconn::create_connection(&ncc.node_config.cluster.database).await?;
|
||||
let pgclient = std::sync::Arc::new(pgclient);
|
||||
let series = dbconn::find_series_sf_databuffer(&channel, pgclient).await?;
|
||||
channel.set_series(series);
|
||||
}
|
||||
let channel = channel;
|
||||
debug!("channel_config AFTER {channel:?}");
|
||||
let c1 = disk::channelconfig::config(range, channel.clone(), ncc).await?;
|
||||
debug!("channel_config THEN {c1:?}");
|
||||
debug!("channel_config channel {channel:?}");
|
||||
let config = disk::channelconfig::channel_config_best_match(range, channel.clone(), ncc)
|
||||
.await?
|
||||
.ok_or_else(|| Error::with_msg_no_trace("config entry not found"))?;
|
||||
debug!("channel_config config {config:?}");
|
||||
let ret = ChConf {
|
||||
backend: c1.channel.backend,
|
||||
backend: config.channel.backend().into(),
|
||||
series: channel.series(),
|
||||
name: c1.channel.name,
|
||||
scalar_type: c1.scalar_type,
|
||||
shape: c1.shape,
|
||||
name: config.channel.name().into(),
|
||||
scalar_type: config.scalar_type,
|
||||
shape: config.shape,
|
||||
};
|
||||
Ok(ret)
|
||||
} else {
|
||||
|
||||
@@ -23,6 +23,7 @@ use num_derive::ToPrimitive;
|
||||
use num_traits::ToPrimitive;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::cmp;
|
||||
use std::fmt;
|
||||
use tokio::io::ErrorKind;
|
||||
|
||||
@@ -268,9 +269,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
|
||||
))
|
||||
}
|
||||
|
||||
/**
|
||||
Parse a complete configuration file from given in-memory input buffer.
|
||||
*/
|
||||
/// Parse a complete configuration file from given in-memory input buffer.
|
||||
pub fn parse_config(inp: &[u8]) -> NRes<ChannelConfigs> {
|
||||
let (inp, ver) = be_i16(inp)?;
|
||||
let (inp, len1) = be_i32(inp)?;
|
||||
@@ -298,29 +297,9 @@ pub fn parse_config(inp: &[u8]) -> NRes<ChannelConfigs> {
|
||||
}
|
||||
inp_a = inp;
|
||||
}
|
||||
// TODO hack to accommodate for parts of databuffer nodes failing:
|
||||
if false && channel_name == "SARFE10-PSSS059:SPECTRUM_X" {
|
||||
warn!("apply hack for {channel_name}");
|
||||
entries = entries
|
||||
.into_iter()
|
||||
.map(|mut e| {
|
||||
e.is_array = true;
|
||||
e.shape = Some(vec![2560]);
|
||||
e
|
||||
})
|
||||
.collect();
|
||||
}
|
||||
if false && channel_name == "SARFE10-PSSS059:SPECTRUM_Y" {
|
||||
warn!("apply hack for {channel_name}");
|
||||
entries = entries
|
||||
.into_iter()
|
||||
.map(|mut e| {
|
||||
e.is_array = true;
|
||||
e.shape = Some(vec![2560]);
|
||||
e
|
||||
})
|
||||
.collect();
|
||||
}
|
||||
// Do not sort the parsed config entries.
|
||||
// We want to deliver the actual order which is found on disk.
|
||||
// Important for troubleshooting.
|
||||
let ret = ChannelConfigs {
|
||||
format_version: ver,
|
||||
channel_name,
|
||||
@@ -334,8 +313,8 @@ pub async fn channel_config(q: &ChannelConfigQuery, ncc: &NodeConfigCached) -> R
|
||||
let entry_res = extract_matching_config_entry(&q.range, &conf)?;
|
||||
let entry = match entry_res {
|
||||
MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found")),
|
||||
MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found")),
|
||||
MatchingConfigEntry::Entry(entry) => entry,
|
||||
MatchingConfigEntry::Single(entry) => entry,
|
||||
MatchingConfigEntry::Multiple(_) => return Err(Error::with_public_msg("multiple config entries found")),
|
||||
};
|
||||
let ret = ChannelConfigResponse {
|
||||
channel: q.channel.clone(),
|
||||
@@ -354,7 +333,7 @@ async fn read_local_config_real(channel: SfDbChannel, ncc: &NodeConfigCached) ->
|
||||
.ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))?
|
||||
.data_base_path
|
||||
.join("config")
|
||||
.join(&channel.name)
|
||||
.join(channel.name())
|
||||
.join("latest")
|
||||
.join("00000_Config");
|
||||
// TODO use commonio here to wrap the error conversion
|
||||
@@ -363,7 +342,7 @@ async fn read_local_config_real(channel: SfDbChannel, ncc: &NodeConfigCached) ->
|
||||
Err(e) => match e.kind() {
|
||||
ErrorKind::NotFound => {
|
||||
let bt = err::bt::Backtrace::new();
|
||||
netpod::log::error!("{bt:?}");
|
||||
error!("{bt:?}");
|
||||
return Err(Error::with_public_msg(format!(
|
||||
"databuffer channel config file not found for channel {channel:?} at {path:?}"
|
||||
)));
|
||||
@@ -458,43 +437,77 @@ pub async fn read_local_config(channel: SfDbChannel, ncc: NodeConfigCached) -> R
|
||||
#[derive(Clone)]
|
||||
pub enum MatchingConfigEntry<'a> {
|
||||
None,
|
||||
Multiple,
|
||||
Entry(&'a ConfigEntry),
|
||||
Single(&'a ConfigEntry),
|
||||
// In this case, we only return the entry which best matches to the time range
|
||||
Multiple(&'a ConfigEntry),
|
||||
}
|
||||
|
||||
impl<'a> MatchingConfigEntry<'a> {
|
||||
pub fn best(&self) -> Option<&ConfigEntry> {
|
||||
match self {
|
||||
MatchingConfigEntry::None => None,
|
||||
MatchingConfigEntry::Single(e) => Some(e),
|
||||
MatchingConfigEntry::Multiple(e) => Some(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extract_matching_config_entry<'a>(
|
||||
range: &NanoRange,
|
||||
channel_config: &'a ChannelConfigs,
|
||||
) -> Result<MatchingConfigEntry<'a>, Error> {
|
||||
// TODO remove temporary
|
||||
if channel_config.channel_name == "SARFE10-PSSS059:SPECTRUM_X" {
|
||||
debug!("found config {:?}", channel_config);
|
||||
}
|
||||
let mut ixs = Vec::new();
|
||||
for i1 in 0..channel_config.entries.len() {
|
||||
let e1 = &channel_config.entries[i1];
|
||||
if i1 + 1 < channel_config.entries.len() {
|
||||
let e2 = &channel_config.entries[i1 + 1];
|
||||
if e1.ts.ns() < range.end && e2.ts.ns() >= range.beg {
|
||||
ixs.push(i1);
|
||||
let mut a: Vec<_> = channel_config.entries.iter().enumerate().map(|(i, x)| (i, x)).collect();
|
||||
a.sort_unstable_by_key(|(_, x)| x.ts.ns());
|
||||
|
||||
let b: Vec<_> = a
|
||||
.into_iter()
|
||||
.rev()
|
||||
.map({
|
||||
let mut nx = None;
|
||||
move |(i, x)| {
|
||||
let k = nx.clone();
|
||||
nx = Some(x.ts.clone());
|
||||
(i, x, k)
|
||||
}
|
||||
} else {
|
||||
if e1.ts.ns() < range.end {
|
||||
ixs.push(i1);
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut c: Vec<_> = b
|
||||
.into_iter()
|
||||
.map(|(i, e, tsn)| {
|
||||
if let Some(ts2) = tsn {
|
||||
if e.ts.ns() < range.end {
|
||||
let p = if e.ts.ns() < range.beg { range.beg } else { e.ts.ns() };
|
||||
let q = if ts2.ns() < range.end { ts2.ns() } else { range.end };
|
||||
(i, TsNano(q - p), e)
|
||||
} else {
|
||||
(i, TsNano(0), e)
|
||||
}
|
||||
} else {
|
||||
if e.ts.ns() < range.end {
|
||||
if e.ts.ns() < range.beg {
|
||||
(i, TsNano(range.delta()), e)
|
||||
} else {
|
||||
(i, TsNano(range.end - e.ts.ns()), e)
|
||||
}
|
||||
} else {
|
||||
(i, TsNano(0), e)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
c.sort_unstable_by_key(|x| u64::MAX - x.1.ns());
|
||||
|
||||
for (i, ts, e) in &c[..c.len().min(3)] {
|
||||
info!("FOUND CONFIG IN ORDER {} {:?} {:?}", i, ts, e.ts);
|
||||
}
|
||||
if ixs.len() == 0 {
|
||||
Ok(MatchingConfigEntry::None)
|
||||
} else if ixs.len() > 1 {
|
||||
Ok(MatchingConfigEntry::Multiple)
|
||||
} else {
|
||||
let e = &channel_config.entries[ixs[0]];
|
||||
|
||||
if let Some(&(i, _, _)) = c.first() {
|
||||
// TODO remove temporary
|
||||
if channel_config.channel_name == "SARFE10-PSSS059:SPECTRUM_X" {
|
||||
debug!("found matching entry {:?}", e);
|
||||
}
|
||||
Ok(MatchingConfigEntry::Entry(e))
|
||||
Ok(MatchingConfigEntry::Single(&channel_config.entries[i]))
|
||||
} else {
|
||||
Ok(MatchingConfigEntry::None)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -144,7 +144,7 @@ impl BinnedQuery {
|
||||
|
||||
impl HasBackend for BinnedQuery {
|
||||
fn backend(&self) -> &str {
|
||||
&self.channel.backend
|
||||
self.channel.backend()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -164,7 +164,7 @@ impl PlainEventsQuery {
|
||||
|
||||
impl HasBackend for PlainEventsQuery {
|
||||
fn backend(&self) -> &str {
|
||||
&self.channel.backend
|
||||
self.channel.backend()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
use crate::errconv::ErrConv;
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::{log::*, ScalarType, SfDbChannel, Shape};
|
||||
use netpod::{ChannelConfigQuery, ChannelConfigResponse};
|
||||
use netpod::log::*;
|
||||
use netpod::ChannelConfigQuery;
|
||||
use netpod::ChannelConfigResponse;
|
||||
use netpod::ScalarType;
|
||||
use netpod::SfDbChannel;
|
||||
use netpod::Shape;
|
||||
use scylla::Session as ScySession;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user