Refactor for in-memory data test

This commit is contained in:
Dominik Werder
2022-11-15 10:10:39 +01:00
parent 8584d36d21
commit 20e2c20697
11 changed files with 259 additions and 124 deletions

View File

@@ -1,6 +1,7 @@
use super::binnedjson::ScalarEventsResponse;
use super::events::get_plain_events_json;
use crate::nodes::require_archapp_test_host_running;
use crate::test::events::ch_gen;
use err::Error;
use netpod::f64_close;
use netpod::log::*;
@@ -11,7 +12,7 @@ fn get_events_1() -> Result<(), Error> {
let rh = require_archapp_test_host_running()?;
let cluster = &rh.cluster;
let res = get_plain_events_json(
"SARUN16-MQUA080:X",
ch_gen("SARUN16-MQUA080:X"),
"2021-01-04T00:00:00Z",
"2021-01-30T00:00:00Z",
cluster,

View File

@@ -19,9 +19,20 @@ use std::future::ready;
use tokio::io::AsyncRead;
use url::Url;
#[test]
fn get_plain_events_binary_0() {
taskrun::run(get_plain_events_binary_0_inner()).unwrap();
fn ch_adhoc(name: &str) -> Channel {
Channel {
series: None,
backend: "testbackend".into(),
name: name.into(),
}
}
pub fn ch_gen(name: &str) -> Channel {
Channel {
series: None,
backend: "testbackend".into(),
name: name.into(),
}
}
// TODO OFFENDING TEST add actual checks on result
@@ -42,6 +53,11 @@ async fn get_plain_events_binary_0_inner() -> Result<(), Error> {
Ok(())
}
#[test]
fn get_plain_events_binary_0() {
taskrun::run(get_plain_events_binary_0_inner()).unwrap();
}
async fn get_plain_events_binary<NTY>(
channel_name: &str,
beg_date: &str,
@@ -218,16 +234,31 @@ where
Ok(ret)
}
async fn get_plain_events_json_0_inner() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
get_plain_events_json(
ch_gen("scalar-i32-be"),
"1970-01-01T00:20:10.000Z",
"1970-01-01T00:20:12.000Z",
cluster,
true,
4,
)
.await?;
Ok(())
}
#[test]
fn get_plain_events_json_0() {
taskrun::run(get_plain_events_json_0_inner()).unwrap();
}
async fn get_plain_events_json_0_inner() -> Result<(), Error> {
async fn get_plain_events_json_1_inner() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
get_plain_events_json(
"scalar-i32-be",
ch_gen("wave-f64-be-n21"),
"1970-01-01T00:20:10.000Z",
"1970-01-01T00:20:12.000Z",
cluster,
@@ -243,13 +274,13 @@ fn get_plain_events_json_1() {
taskrun::run(get_plain_events_json_1_inner()).unwrap();
}
async fn get_plain_events_json_1_inner() -> Result<(), Error> {
async fn get_plain_events_json_2_inner() -> Result<(), Error> {
let rh = require_test_hosts_running()?;
let cluster = &rh.cluster;
get_plain_events_json(
"wave-f64-be-n21",
ch_adhoc("inmem-d0-i32"),
"1970-01-01T00:20:04.000Z",
"1970-01-01T00:20:10.000Z",
"1970-01-01T00:20:12.000Z",
cluster,
true,
4,
@@ -258,9 +289,14 @@ async fn get_plain_events_json_1_inner() -> Result<(), Error> {
Ok(())
}
#[test]
fn get_plain_events_json_2() {
taskrun::run(get_plain_events_json_2_inner()).unwrap();
}
// TODO improve by a more information-rich return type.
pub async fn get_plain_events_json(
channel_name: &str,
channel: Channel,
beg_date: &str,
end_date: &str,
cluster: &Cluster,
@@ -271,19 +307,13 @@ pub async fn get_plain_events_json(
let node0 = &cluster.nodes[0];
let beg_date: DateTime<Utc> = beg_date.parse()?;
let end_date: DateTime<Utc> = end_date.parse()?;
let channel_backend = "testbackend";
let channel = Channel {
backend: channel_backend.into(),
name: channel_name.into(),
series: None,
};
let range = NanoRange::from_date_time(beg_date, end_date);
let query = PlainEventsQuery::new(channel, range, 1024 * 4, None, false);
let hp = HostPort::from_node(node0);
let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?;
query.append_to_url(&mut url);
let url = url;
debug!("get_plain_events get {}", url);
info!("get_plain_events get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
@@ -292,12 +322,18 @@ pub async fn get_plain_events_json(
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
trace!("Response {res:?}");
if res.status() != StatusCode::OK {
error!("client response {:?}", res);
}
let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
let s = String::from_utf8_lossy(&buf);
let res: JsonValue = serde_json::from_str(&s)?;
eprintln!("res {res:?}");
// TODO assert more
let t2 = chrono::Utc::now();
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;

View File

@@ -42,6 +42,16 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
channel.backend, ncc.node_config.cluster.backend
);
}
if channel.backend() == "testbackend" {
if channel.name() == "inmem-d0-i32" {
let ret = ChConf {
series: 1,
scalar_type: ScalarType::I32,
shape: Shape::Scalar,
};
return Ok(ret);
}
}
// TODO use a common already running worker pool for these queries:
let dbconf = &ncc.node_config.cluster.database;
let dburl = format!(

View File

@@ -47,20 +47,29 @@ async fn plain_events(req: Request<Body>, node_config: &NodeConfigCached) -> Res
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
let url = {
let s1 = format!("dummy:{}", req.uri());
Url::parse(&s1)
.map_err(Error::from)
.map_err(|e| e.add_public_msg(format!("Can not parse query url")))?
};
if accept == APP_JSON || accept == ACCEPT_ALL {
Ok(plain_events_json(req, node_config).await?)
Ok(plain_events_json(url, req, node_config).await?)
} else if accept == APP_OCTET {
Ok(plain_events_binary(req, node_config).await?)
Ok(plain_events_binary(url, req, node_config).await?)
} else {
let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?;
Ok(ret)
}
}
async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn plain_events_binary(
url: Url,
req: Request<Body>,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
debug!("httpret plain_events_binary req: {:?}", req);
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let query = PlainEventsQuery::from_url(&url)?;
let query = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?;
let chconf = chconf_from_events_binary(&query, node_config).await?;
// Update the series id since we don't require some unique identifier yet.
@@ -88,13 +97,18 @@ async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached)
Ok(ret)
}
async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
info!("httpret plain_events_json req: {:?}", req);
let (head, _body) = req.into_parts();
let s1 = format!("dummy:{}", head.uri);
let url = Url::parse(&s1)?;
async fn plain_events_json(
url: Url,
req: Request<Body>,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
debug!("httpret plain_events_json req: {:?}", req);
let (_head, _body) = req.into_parts();
let query = PlainEventsQuery::from_url(&url)?;
let chconf = chconf_from_events_json(&query, node_config).await?;
let chconf = chconf_from_events_json(&query, node_config).await.map_err(|e| {
error!("chconf_from_events_json {e:?}");
e.add_public_msg(format!("Can not get channel information"))
})?;
// Update the series id since we don't require some unique identifier yet.
let mut query = query;
@@ -360,11 +374,13 @@ impl BinnedHandlerScylla {
x
})
.map_err(|e| items_2::Error::from(format!("{e}")));
todo!();
error!("TODO BinnedHandlerScylla::gather");
err::todo();
type Items = Pin<Box<dyn Stream<Item = Result<ChannelEvents, items_2::Error>> + Send>>;
let data_stream = Box::pin(data_stream) as Items;
let state_stream = Box::pin(state_stream) as Items;
let merged_stream = ChannelEventsMerger::new(todo!());
let _data_stream = Box::pin(data_stream) as Items;
let _sate_stream = Box::pin(state_stream) as Items;
let merged_stream = ChannelEventsMerger::new(err::todoval());
let _ = merged_stream;
//let merged_stream = ChannelEventsMerger::new(vec![data_stream, state_stream]);
let merged_stream = Box::pin(merged_stream) as Pin<Box<dyn Stream<Item = _> + Send>>;
let binned_collected = binned_collected(

View File

@@ -110,7 +110,6 @@ impl PartialOrd for StringNum {
pub trait NumOps:
Sized
//+ Copy
+ Clone
+ AsPrimF32
+ Send
@@ -119,7 +118,6 @@ pub trait NumOps:
+ Unpin
+ Debug
+ Zero
//+ AsPrimitive<f32>
+ Bounded
+ PartialOrd
+ SubFrId

View File

@@ -678,6 +678,40 @@ impl Collectable for Box<dyn Collectable> {
}
}
fn flush_binned(
binner: &mut Box<dyn TimeBinner>,
coll: &mut Option<Box<dyn Collector>>,
bin_count_exp: u32,
force: bool,
) -> Result<(), Error> {
trace!("flush_binned bins_ready_count: {}", binner.bins_ready_count());
if force {
if binner.bins_ready_count() == 0 {
debug!("cycle the binner forced");
binner.cycle();
} else {
debug!("bins ready, do not force");
}
}
if binner.bins_ready_count() > 0 {
let ready = binner.bins_ready();
match ready {
Some(mut ready) => {
trace!("binned_collected ready {ready:?}");
if coll.is_none() {
*coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp));
}
let cl = coll.as_mut().unwrap();
cl.ingest(ready.as_collectable_mut());
Ok(())
}
None => Err(format!("bins_ready_count but no result").into()),
}
} else {
Ok(())
}
}
// TODO handle status information.
pub async fn binned_collected(
scalar_type: ScalarType,
@@ -687,52 +721,24 @@ pub async fn binned_collected(
timeout: Duration,
inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
) -> Result<Box<dyn ToJsonResult>, Error> {
info!("binned_collected");
event!(Level::TRACE, "binned_collected");
if edges.len() < 2 {
return Err(format!("binned_collected but edges.len() {}", edges.len()).into());
}
let ts_edges_max = *edges.last().unwrap();
let deadline = Instant::now() + timeout;
let mut did_timeout = false;
let bin_count_exp = edges.len().max(2) as u32 - 1;
let do_time_weight = agg_kind.do_time_weighted();
// TODO maybe TimeBinner should take all ChannelEvents and handle this?
let mut did_range_complete = false;
fn flush_binned(
binner: &mut Box<dyn TimeBinner>,
coll: &mut Option<Box<dyn Collector>>,
bin_count_exp: u32,
force: bool,
) -> Result<(), Error> {
info!("flush_binned bins_ready_count: {}", binner.bins_ready_count());
if force {
if binner.bins_ready_count() == 0 {
warn!("cycle the binner forced");
binner.cycle();
} else {
warn!("binner was some ready, do nothing");
}
}
if binner.bins_ready_count() > 0 {
let ready = binner.bins_ready();
match ready {
Some(mut ready) => {
info!("binned_collected ready {ready:?}");
if coll.is_none() {
*coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp));
}
let cl = coll.as_mut().unwrap();
cl.ingest(ready.as_collectable_mut());
Ok(())
}
None => Err(format!("bins_ready_count but no result").into()),
}
} else {
Ok(())
}
}
let mut coll = None;
let mut binner = None;
let empty_item = empty_events_dyn_2(&scalar_type, &shape, &AggKind::TimeWeightedScalar);
let empty_stream = futures_util::stream::once(futures_util::future::ready(Ok(StreamItem::DataItem(
RangeCompletableItem::Data(ChannelEvents::Events(empty_item)),
let tmp_item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events(
empty_item,
))));
let empty_stream = futures_util::stream::once(futures_util::future::ready(tmp_item));
let mut stream = empty_stream.chain(inp);
loop {
let item = futures_util::select! {
@@ -751,18 +757,23 @@ pub async fn binned_collected(
match item {
StreamItem::DataItem(k) => match k {
RangeCompletableItem::RangeComplete => {
warn!("binned_collected TODO RangeComplete");
did_range_complete = true;
}
RangeCompletableItem::Data(k) => match k {
ChannelEvents::Events(events) => {
if binner.is_none() {
let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight);
binner = Some(bb);
if events.starts_after(NanoRange {
beg: 0,
end: ts_edges_max,
}) {
} else {
if binner.is_none() {
let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight);
binner = Some(bb);
}
let binner = binner.as_mut().unwrap();
binner.ingest(events.as_time_binnable());
flush_binned(binner, &mut coll, bin_count_exp, false)?;
}
let binner = binner.as_mut().unwrap();
binner.ingest(events.as_time_binnable());
flush_binned(binner, &mut coll, bin_count_exp, false)?;
}
ChannelEvents::Status(item) => {
trace!("{:?}", item);
@@ -806,13 +817,14 @@ pub async fn binned_collected(
match coll {
Some(mut coll) => {
let res = coll.result().map_err(|e| format!("{e}"))?;
tokio::time::sleep(Duration::from_millis(2000)).await;
Ok(res)
}
None => {
error!("TODO should never happen with changed logic, remove");
err::todo();
error!("binned_collected nothing collected");
let item = empty_binned_dyn(&scalar_type, &shape, &AggKind::DimXBins1);
let ret = item.to_box_to_json_result();
tokio::time::sleep(Duration::from_millis(2000)).await;
Ok(ret)
}
}

View File

@@ -324,7 +324,9 @@ fn binned_timeout_01() {
fn val(ts: u64) -> f32 {
2f32 + ((ts / SEC) % 2) as f32 + 0.2 * ((ts / (MS * 100)) % 2) as f32
}
eprintln!("binned_timeout_01 ENTER");
let fut = async {
eprintln!("binned_timeout_01 IN FUT");
let mut events_vec1 = Vec::new();
let mut t = TSBASE;
for _ in 0..20 {

View File

@@ -107,12 +107,12 @@ async fn events_conn_handler_inner_try(
}
let mut p1: Pin<Box<dyn Stream<Item = Box<dyn Framable + Send>> + Send>> =
if evq.channel().backend() == "test-adhoc-dyn" {
if evq.channel().backend() == "testbackend" {
use items_2::ChannelEvents;
use items_2::Empty;
use netpod::timeunits::MS;
let node_ix = node_config.ix;
if evq.channel().name() == "scalar-i32" {
if evq.channel().name() == "inmem-d0-i32" {
let mut item = items_2::eventsdim0::EventsDim0::<f32>::empty();
let td = MS * 10;
let mut ts = MS * 17 + MS * td * node_ix as u64;

View File

@@ -383,8 +383,10 @@ pub async fn fetch_uncached_binned_events(
)));
}
};
// TODO as soon we encounter RangeComplete we just:
// complete = true;
if false {
// TODO as soon we encounter RangeComplete we just:
complete = true;
}
match item {
Ok(ChannelEvents::Events(item)) => {
time_binner.ingest(item.as_time_binnable());

View File

@@ -8,9 +8,11 @@ edition = "2021"
path = "src/taskrun.rs"
[dependencies]
tokio = { version = "1.18.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] }
tracing = "0.1.34"
tracing-subscriber = { version = "0.3.11", features = ["fmt", "time"] }
futures-util = "0.3"
tokio = { version = "1.21.2", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["fmt", "time"] }
#tracing-loki = { version = "0.2.1", default-features = false, features = ["compat-0-2-1"] }
time = { version = "0.3", features = ["formatting"] }
console-subscriber = "0.1.5"
backtrace = "0.3.56"

View File

@@ -4,10 +4,13 @@ use crate::log::*;
use err::Error;
use std::future::Future;
use std::panic;
use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex, Once};
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
static INIT_TRACING_ONCE: Once = Once::new();
pub mod log {
#[allow(unused_imports)]
pub use tracing::{debug, error, info, trace, warn};
@@ -25,7 +28,6 @@ pub fn get_runtime_opts(nworkers: usize, nblocking: usize) -> Arc<Runtime> {
let mut g = RUNTIME.lock().unwrap();
match g.as_ref() {
None => {
tracing_init();
let res = tokio::runtime::Builder::new_multi_thread()
.worker_threads(nworkers)
.max_blocking_threads(nblocking)
@@ -56,8 +58,14 @@ pub fn get_runtime_opts(nworkers: usize, nblocking: usize) -> Arc<Runtime> {
//old(info);
}));
})
.build()
.unwrap();
.build();
let res = match res {
Ok(x) => x,
Err(e) => {
eprintln!("ERROR {e}");
panic!();
}
};
let a = Arc::new(res);
*g = Some(a.clone());
a
@@ -71,6 +79,12 @@ where
F: std::future::Future<Output = Result<T, Error>>,
{
let runtime = get_runtime();
match tracing_init() {
Ok(_) => {}
Err(e) => {
eprintln!("TRACING: {e:?}");
}
}
let res = runtime.block_on(async { f.await });
match res {
Ok(k) => Ok(k),
@@ -81,44 +95,86 @@ where
}
}
lazy_static::lazy_static! {
pub static ref INITMX: Mutex<u32> = Mutex::new(0);
}
pub fn tracing_init() {
let mut g = INITMX.lock().unwrap();
if *g == 0 {
//use tracing_subscriber::fmt::time::FormatTime;
let fmtstr = "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z";
//let format = tracing_subscriber::fmt::format().with_timer(timer);
let timer = tracing_subscriber::fmt::time::UtcTime::new(time::format_description::parse(fmtstr).unwrap());
//use tracing_subscriber::prelude::*;
//let trsub = tracing_subscriber::fmt::layer();
//let console_layer = console_subscriber::spawn();
//tracing_subscriber::registry().with(console_layer).with(trsub).init();
//console_subscriber::init();
#[allow(unused)]
let log_filter = tracing_subscriber::EnvFilter::new(
[
//"tokio=trace",
//"runtime=trace",
"warn",
"disk::binned::pbv=trace",
"[log_span_d]=debug",
"[log_span_t]=trace",
]
.join(","),
);
tracing_subscriber::fmt()
fn tracing_init_inner() -> Result<(), Error> {
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Layer;
let fmtstr = "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z";
let timer = tracing_subscriber::fmt::time::UtcTime::new(
time::format_description::parse(fmtstr).map_err(|e| format!("{e}"))?,
);
if true {
let fmt_layer = tracing_subscriber::fmt::Layer::new()
.with_timer(timer)
.with_target(true)
.with_thread_names(true)
//.with_max_level(tracing::Level::INFO)
//.with_env_filter(log_filter)
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_filter(tracing_subscriber::EnvFilter::from_default_env());
let z = tracing_subscriber::registry().with(fmt_layer);
#[cfg(CONSOLE)]
{
let console_layer = console_subscriber::spawn();
let z = z.with(console_layer);
}
z.try_init().map_err(|e| format!("{e}"))?;
}
#[cfg(DISABLED_LOKI)]
// TODO tracing_loki seems not well composable, try open telemetry instead.
if false {
/*let fmt_layer = tracing_subscriber::fmt::Layer::new()
.with_timer(timer)
.with_target(true)
.with_thread_names(true)
.with_filter(tracing_subscriber::EnvFilter::from_default_env());*/
let url = "http://[::1]:6947";
//let url = "http://127.0.0.1:6947";
//let url = "http://[::1]:6132";
let (loki_layer, loki_task) = tracing_loki::layer(
tracing_loki::url::Url::parse(url)?,
vec![(format!("daqbuffer"), format!("dev"))].into_iter().collect(),
[].into(),
)
.map_err(|e| format!("{e}"))?;
//let loki_layer = loki_layer.with_filter(log_filter);
eprintln!("MADE LAYER");
tracing_subscriber::registry()
//.with(fmt_layer)
.with(loki_layer)
//.try_init()
//.map_err(|e| format!("{e}"))?;
.init();
*g = 1;
//warn!("tracing_init done");
eprintln!("REGISTERED");
if true {
tokio::spawn(loki_task);
eprintln!("SPAWNED TASK");
}
eprintln!("INFO LOKI");
}
Ok(())
}
pub fn tracing_init() -> Result<(), ()> {
use std::sync::atomic::Ordering;
let is_good = Arc::new(AtomicUsize::new(0));
{
let is_good = is_good.clone();
INIT_TRACING_ONCE.call_once(move || match tracing_init_inner() {
Ok(_) => {
is_good.store(1, Ordering::Release);
}
Err(e) => {
is_good.store(2, Ordering::Release);
eprintln!("tracing_init_inner gave error {e}");
}
});
}
let n = is_good.load(Ordering::Acquire);
if n == 2 {
Err(())
} else if n == 1 {
Ok(())
} else {
eprintln!("ERROR Unknown tracing state");
Err(())
}
}