diff --git a/daqbufp2/src/test/archapp.rs b/daqbufp2/src/test/archapp.rs index 378505c..b9cba88 100644 --- a/daqbufp2/src/test/archapp.rs +++ b/daqbufp2/src/test/archapp.rs @@ -1,6 +1,7 @@ use super::binnedjson::ScalarEventsResponse; use super::events::get_plain_events_json; use crate::nodes::require_archapp_test_host_running; +use crate::test::events::ch_gen; use err::Error; use netpod::f64_close; use netpod::log::*; @@ -11,7 +12,7 @@ fn get_events_1() -> Result<(), Error> { let rh = require_archapp_test_host_running()?; let cluster = &rh.cluster; let res = get_plain_events_json( - "SARUN16-MQUA080:X", + ch_gen("SARUN16-MQUA080:X"), "2021-01-04T00:00:00Z", "2021-01-30T00:00:00Z", cluster, diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index a528f5b..934f261 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -19,9 +19,20 @@ use std::future::ready; use tokio::io::AsyncRead; use url::Url; -#[test] -fn get_plain_events_binary_0() { - taskrun::run(get_plain_events_binary_0_inner()).unwrap(); +fn ch_adhoc(name: &str) -> Channel { + Channel { + series: None, + backend: "testbackend".into(), + name: name.into(), + } +} + +pub fn ch_gen(name: &str) -> Channel { + Channel { + series: None, + backend: "testbackend".into(), + name: name.into(), + } } // TODO OFFENDING TEST add actual checks on result @@ -42,6 +53,11 @@ async fn get_plain_events_binary_0_inner() -> Result<(), Error> { Ok(()) } +#[test] +fn get_plain_events_binary_0() { + taskrun::run(get_plain_events_binary_0_inner()).unwrap(); +} + async fn get_plain_events_binary( channel_name: &str, beg_date: &str, @@ -218,16 +234,31 @@ where Ok(ret) } +async fn get_plain_events_json_0_inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + get_plain_events_json( + ch_gen("scalar-i32-be"), + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:20:12.000Z", + cluster, + true, + 4, + ) + .await?; + Ok(()) +} + #[test] fn get_plain_events_json_0() { taskrun::run(get_plain_events_json_0_inner()).unwrap(); } -async fn get_plain_events_json_0_inner() -> Result<(), Error> { +async fn get_plain_events_json_1_inner() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; get_plain_events_json( - "scalar-i32-be", + ch_gen("wave-f64-be-n21"), "1970-01-01T00:20:10.000Z", "1970-01-01T00:20:12.000Z", cluster, @@ -243,13 +274,13 @@ fn get_plain_events_json_1() { taskrun::run(get_plain_events_json_1_inner()).unwrap(); } -async fn get_plain_events_json_1_inner() -> Result<(), Error> { +async fn get_plain_events_json_2_inner() -> Result<(), Error> { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; get_plain_events_json( - "wave-f64-be-n21", + ch_adhoc("inmem-d0-i32"), + "1970-01-01T00:20:04.000Z", "1970-01-01T00:20:10.000Z", - "1970-01-01T00:20:12.000Z", cluster, true, 4, @@ -258,9 +289,14 @@ async fn get_plain_events_json_1_inner() -> Result<(), Error> { Ok(()) } +#[test] +fn get_plain_events_json_2() { + taskrun::run(get_plain_events_json_2_inner()).unwrap(); +} + // TODO improve by a more information-rich return type. pub async fn get_plain_events_json( - channel_name: &str, + channel: Channel, beg_date: &str, end_date: &str, cluster: &Cluster, @@ -271,19 +307,13 @@ pub async fn get_plain_events_json( let node0 = &cluster.nodes[0]; let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; - let channel_backend = "testbackend"; - let channel = Channel { - backend: channel_backend.into(), - name: channel_name.into(), - series: None, - }; let range = NanoRange::from_date_time(beg_date, end_date); let query = PlainEventsQuery::new(channel, range, 1024 * 4, None, false); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - debug!("get_plain_events get {}", url); + info!("get_plain_events get {}", url); let req = hyper::Request::builder() .method(http::Method::GET) .uri(url.to_string()) @@ -292,12 +322,18 @@ pub async fn get_plain_events_json( .ec()?; let client = hyper::Client::new(); let res = client.request(req).await.ec()?; + + trace!("Response {res:?}"); + if res.status() != StatusCode::OK { error!("client response {:?}", res); } let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; let s = String::from_utf8_lossy(&buf); let res: JsonValue = serde_json::from_str(&s)?; + + eprintln!("res {res:?}"); + // TODO assert more let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 3dac6bf..dd1dba3 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -42,6 +42,16 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> channel.backend, ncc.node_config.cluster.backend ); } + if channel.backend() == "testbackend" { + if channel.name() == "inmem-d0-i32" { + let ret = ChConf { + series: 1, + scalar_type: ScalarType::I32, + shape: Shape::Scalar, + }; + return Ok(ret); + } + } // TODO use a common already running worker pool for these queries: let dbconf = &ncc.node_config.cluster.database; let dburl = format!( diff --git a/httpret/src/events.rs b/httpret/src/events.rs index 052c7ad..f146a24 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -47,20 +47,29 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res .headers() .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + let url = { + let s1 = format!("dummy:{}", req.uri()); + Url::parse(&s1) + .map_err(Error::from) + .map_err(|e| e.add_public_msg(format!("Can not parse query url")))? + }; if accept == APP_JSON || accept == ACCEPT_ALL { - Ok(plain_events_json(req, node_config).await?) + Ok(plain_events_json(url, req, node_config).await?) } else if accept == APP_OCTET { - Ok(plain_events_binary(req, node_config).await?) + Ok(plain_events_binary(url, req, node_config).await?) } else { let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?; Ok(ret) } } -async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +async fn plain_events_binary( + url: Url, + req: Request, + node_config: &NodeConfigCached, +) -> Result, Error> { debug!("httpret plain_events_binary req: {:?}", req); - let url = Url::parse(&format!("dummy:{}", req.uri()))?; - let query = PlainEventsQuery::from_url(&url)?; + let query = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; let chconf = chconf_from_events_binary(&query, node_config).await?; // Update the series id since we don't require some unique identifier yet. @@ -88,13 +97,18 @@ async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) Ok(ret) } -async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("httpret plain_events_json req: {:?}", req); - let (head, _body) = req.into_parts(); - let s1 = format!("dummy:{}", head.uri); - let url = Url::parse(&s1)?; +async fn plain_events_json( + url: Url, + req: Request, + node_config: &NodeConfigCached, +) -> Result, Error> { + debug!("httpret plain_events_json req: {:?}", req); + let (_head, _body) = req.into_parts(); let query = PlainEventsQuery::from_url(&url)?; - let chconf = chconf_from_events_json(&query, node_config).await?; + let chconf = chconf_from_events_json(&query, node_config).await.map_err(|e| { + error!("chconf_from_events_json {e:?}"); + e.add_public_msg(format!("Can not get channel information")) + })?; // Update the series id since we don't require some unique identifier yet. let mut query = query; @@ -360,11 +374,13 @@ impl BinnedHandlerScylla { x }) .map_err(|e| items_2::Error::from(format!("{e}"))); - todo!(); + error!("TODO BinnedHandlerScylla::gather"); + err::todo(); type Items = Pin> + Send>>; - let data_stream = Box::pin(data_stream) as Items; - let state_stream = Box::pin(state_stream) as Items; - let merged_stream = ChannelEventsMerger::new(todo!()); + let _data_stream = Box::pin(data_stream) as Items; + let _sate_stream = Box::pin(state_stream) as Items; + let merged_stream = ChannelEventsMerger::new(err::todoval()); + let _ = merged_stream; //let merged_stream = ChannelEventsMerger::new(vec![data_stream, state_stream]); let merged_stream = Box::pin(merged_stream) as Pin + Send>>; let binned_collected = binned_collected( diff --git a/items/src/numops.rs b/items/src/numops.rs index 055f49e..d424199 100644 --- a/items/src/numops.rs +++ b/items/src/numops.rs @@ -110,7 +110,6 @@ impl PartialOrd for StringNum { pub trait NumOps: Sized - //+ Copy + Clone + AsPrimF32 + Send @@ -119,7 +118,6 @@ pub trait NumOps: + Unpin + Debug + Zero - //+ AsPrimitive + Bounded + PartialOrd + SubFrId diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index a87ca49..bb67773 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -678,6 +678,40 @@ impl Collectable for Box { } } +fn flush_binned( + binner: &mut Box, + coll: &mut Option>, + bin_count_exp: u32, + force: bool, +) -> Result<(), Error> { + trace!("flush_binned bins_ready_count: {}", binner.bins_ready_count()); + if force { + if binner.bins_ready_count() == 0 { + debug!("cycle the binner forced"); + binner.cycle(); + } else { + debug!("bins ready, do not force"); + } + } + if binner.bins_ready_count() > 0 { + let ready = binner.bins_ready(); + match ready { + Some(mut ready) => { + trace!("binned_collected ready {ready:?}"); + if coll.is_none() { + *coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); + } + let cl = coll.as_mut().unwrap(); + cl.ingest(ready.as_collectable_mut()); + Ok(()) + } + None => Err(format!("bins_ready_count but no result").into()), + } + } else { + Ok(()) + } +} + // TODO handle status information. pub async fn binned_collected( scalar_type: ScalarType, @@ -687,52 +721,24 @@ pub async fn binned_collected( timeout: Duration, inp: Pin> + Send>>, ) -> Result, Error> { - info!("binned_collected"); + event!(Level::TRACE, "binned_collected"); + if edges.len() < 2 { + return Err(format!("binned_collected but edges.len() {}", edges.len()).into()); + } + let ts_edges_max = *edges.last().unwrap(); let deadline = Instant::now() + timeout; let mut did_timeout = false; let bin_count_exp = edges.len().max(2) as u32 - 1; let do_time_weight = agg_kind.do_time_weighted(); // TODO maybe TimeBinner should take all ChannelEvents and handle this? let mut did_range_complete = false; - fn flush_binned( - binner: &mut Box, - coll: &mut Option>, - bin_count_exp: u32, - force: bool, - ) -> Result<(), Error> { - info!("flush_binned bins_ready_count: {}", binner.bins_ready_count()); - if force { - if binner.bins_ready_count() == 0 { - warn!("cycle the binner forced"); - binner.cycle(); - } else { - warn!("binner was some ready, do nothing"); - } - } - if binner.bins_ready_count() > 0 { - let ready = binner.bins_ready(); - match ready { - Some(mut ready) => { - info!("binned_collected ready {ready:?}"); - if coll.is_none() { - *coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); - } - let cl = coll.as_mut().unwrap(); - cl.ingest(ready.as_collectable_mut()); - Ok(()) - } - None => Err(format!("bins_ready_count but no result").into()), - } - } else { - Ok(()) - } - } let mut coll = None; let mut binner = None; let empty_item = empty_events_dyn_2(&scalar_type, &shape, &AggKind::TimeWeightedScalar); - let empty_stream = futures_util::stream::once(futures_util::future::ready(Ok(StreamItem::DataItem( - RangeCompletableItem::Data(ChannelEvents::Events(empty_item)), + let tmp_item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events( + empty_item, )))); + let empty_stream = futures_util::stream::once(futures_util::future::ready(tmp_item)); let mut stream = empty_stream.chain(inp); loop { let item = futures_util::select! { @@ -751,18 +757,23 @@ pub async fn binned_collected( match item { StreamItem::DataItem(k) => match k { RangeCompletableItem::RangeComplete => { - warn!("binned_collected TODO RangeComplete"); did_range_complete = true; } RangeCompletableItem::Data(k) => match k { ChannelEvents::Events(events) => { - if binner.is_none() { - let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight); - binner = Some(bb); + if events.starts_after(NanoRange { + beg: 0, + end: ts_edges_max, + }) { + } else { + if binner.is_none() { + let bb = events.as_time_binnable().time_binner_new(edges.clone(), do_time_weight); + binner = Some(bb); + } + let binner = binner.as_mut().unwrap(); + binner.ingest(events.as_time_binnable()); + flush_binned(binner, &mut coll, bin_count_exp, false)?; } - let binner = binner.as_mut().unwrap(); - binner.ingest(events.as_time_binnable()); - flush_binned(binner, &mut coll, bin_count_exp, false)?; } ChannelEvents::Status(item) => { trace!("{:?}", item); @@ -806,13 +817,14 @@ pub async fn binned_collected( match coll { Some(mut coll) => { let res = coll.result().map_err(|e| format!("{e}"))?; + tokio::time::sleep(Duration::from_millis(2000)).await; Ok(res) } None => { - error!("TODO should never happen with changed logic, remove"); - err::todo(); + error!("binned_collected nothing collected"); let item = empty_binned_dyn(&scalar_type, &shape, &AggKind::DimXBins1); let ret = item.to_box_to_json_result(); + tokio::time::sleep(Duration::from_millis(2000)).await; Ok(ret) } } diff --git a/items_2/src/test.rs b/items_2/src/test.rs index 3a59304..3da462e 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -324,7 +324,9 @@ fn binned_timeout_01() { fn val(ts: u64) -> f32 { 2f32 + ((ts / SEC) % 2) as f32 + 0.2 * ((ts / (MS * 100)) % 2) as f32 } + eprintln!("binned_timeout_01 ENTER"); let fut = async { + eprintln!("binned_timeout_01 IN FUT"); let mut events_vec1 = Vec::new(); let mut t = TSBASE; for _ in 0..20 { diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index fe1e370..8a28077 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -107,12 +107,12 @@ async fn events_conn_handler_inner_try( } let mut p1: Pin> + Send>> = - if evq.channel().backend() == "test-adhoc-dyn" { + if evq.channel().backend() == "testbackend" { use items_2::ChannelEvents; use items_2::Empty; use netpod::timeunits::MS; let node_ix = node_config.ix; - if evq.channel().name() == "scalar-i32" { + if evq.channel().name() == "inmem-d0-i32" { let mut item = items_2::eventsdim0::EventsDim0::::empty(); let td = MS * 10; let mut ts = MS * 17 + MS * td * node_ix as u64; diff --git a/scyllaconn/src/bincache.rs b/scyllaconn/src/bincache.rs index c749ac0..c5d90ad 100644 --- a/scyllaconn/src/bincache.rs +++ b/scyllaconn/src/bincache.rs @@ -383,8 +383,10 @@ pub async fn fetch_uncached_binned_events( ))); } }; - // TODO as soon we encounter RangeComplete we just: - // complete = true; + if false { + // TODO as soon we encounter RangeComplete we just: + complete = true; + } match item { Ok(ChannelEvents::Events(item)) => { time_binner.ingest(item.as_time_binnable()); diff --git a/taskrun/Cargo.toml b/taskrun/Cargo.toml index 571db2b..09f4504 100644 --- a/taskrun/Cargo.toml +++ b/taskrun/Cargo.toml @@ -8,9 +8,11 @@ edition = "2021" path = "src/taskrun.rs" [dependencies] -tokio = { version = "1.18.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] } -tracing = "0.1.34" -tracing-subscriber = { version = "0.3.11", features = ["fmt", "time"] } +futures-util = "0.3" +tokio = { version = "1.21.2", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] } +tracing = "0.1.37" +tracing-subscriber = { version = "0.3.16", features = ["fmt", "time"] } +#tracing-loki = { version = "0.2.1", default-features = false, features = ["compat-0-2-1"] } time = { version = "0.3", features = ["formatting"] } console-subscriber = "0.1.5" backtrace = "0.3.56" diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index b958170..8757069 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -4,10 +4,13 @@ use crate::log::*; use err::Error; use std::future::Future; use std::panic; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::AtomicUsize; +use std::sync::{Arc, Mutex, Once}; use tokio::runtime::Runtime; use tokio::task::JoinHandle; +static INIT_TRACING_ONCE: Once = Once::new(); + pub mod log { #[allow(unused_imports)] pub use tracing::{debug, error, info, trace, warn}; @@ -25,7 +28,6 @@ pub fn get_runtime_opts(nworkers: usize, nblocking: usize) -> Arc { let mut g = RUNTIME.lock().unwrap(); match g.as_ref() { None => { - tracing_init(); let res = tokio::runtime::Builder::new_multi_thread() .worker_threads(nworkers) .max_blocking_threads(nblocking) @@ -56,8 +58,14 @@ pub fn get_runtime_opts(nworkers: usize, nblocking: usize) -> Arc { //old(info); })); }) - .build() - .unwrap(); + .build(); + let res = match res { + Ok(x) => x, + Err(e) => { + eprintln!("ERROR {e}"); + panic!(); + } + }; let a = Arc::new(res); *g = Some(a.clone()); a @@ -71,6 +79,12 @@ where F: std::future::Future>, { let runtime = get_runtime(); + match tracing_init() { + Ok(_) => {} + Err(e) => { + eprintln!("TRACING: {e:?}"); + } + } let res = runtime.block_on(async { f.await }); match res { Ok(k) => Ok(k), @@ -81,44 +95,86 @@ where } } -lazy_static::lazy_static! { - pub static ref INITMX: Mutex = Mutex::new(0); -} - -pub fn tracing_init() { - let mut g = INITMX.lock().unwrap(); - if *g == 0 { - //use tracing_subscriber::fmt::time::FormatTime; - let fmtstr = "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"; - //let format = tracing_subscriber::fmt::format().with_timer(timer); - let timer = tracing_subscriber::fmt::time::UtcTime::new(time::format_description::parse(fmtstr).unwrap()); - //use tracing_subscriber::prelude::*; - //let trsub = tracing_subscriber::fmt::layer(); - //let console_layer = console_subscriber::spawn(); - //tracing_subscriber::registry().with(console_layer).with(trsub).init(); - //console_subscriber::init(); - #[allow(unused)] - let log_filter = tracing_subscriber::EnvFilter::new( - [ - //"tokio=trace", - //"runtime=trace", - "warn", - "disk::binned::pbv=trace", - "[log_span_d]=debug", - "[log_span_t]=trace", - ] - .join(","), - ); - tracing_subscriber::fmt() +fn tracing_init_inner() -> Result<(), Error> { + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + use tracing_subscriber::Layer; + let fmtstr = "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"; + let timer = tracing_subscriber::fmt::time::UtcTime::new( + time::format_description::parse(fmtstr).map_err(|e| format!("{e}"))?, + ); + if true { + let fmt_layer = tracing_subscriber::fmt::Layer::new() .with_timer(timer) .with_target(true) .with_thread_names(true) - //.with_max_level(tracing::Level::INFO) - //.with_env_filter(log_filter) - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_filter(tracing_subscriber::EnvFilter::from_default_env()); + let z = tracing_subscriber::registry().with(fmt_layer); + #[cfg(CONSOLE)] + { + let console_layer = console_subscriber::spawn(); + let z = z.with(console_layer); + } + z.try_init().map_err(|e| format!("{e}"))?; + } + #[cfg(DISABLED_LOKI)] + // TODO tracing_loki seems not well composable, try open telemetry instead. + if false { + /*let fmt_layer = tracing_subscriber::fmt::Layer::new() + .with_timer(timer) + .with_target(true) + .with_thread_names(true) + .with_filter(tracing_subscriber::EnvFilter::from_default_env());*/ + let url = "http://[::1]:6947"; + //let url = "http://127.0.0.1:6947"; + //let url = "http://[::1]:6132"; + let (loki_layer, loki_task) = tracing_loki::layer( + tracing_loki::url::Url::parse(url)?, + vec![(format!("daqbuffer"), format!("dev"))].into_iter().collect(), + [].into(), + ) + .map_err(|e| format!("{e}"))?; + //let loki_layer = loki_layer.with_filter(log_filter); + eprintln!("MADE LAYER"); + tracing_subscriber::registry() + //.with(fmt_layer) + .with(loki_layer) + //.try_init() + //.map_err(|e| format!("{e}"))?; .init(); - *g = 1; - //warn!("tracing_init done"); + eprintln!("REGISTERED"); + if true { + tokio::spawn(loki_task); + eprintln!("SPAWNED TASK"); + } + eprintln!("INFO LOKI"); + } + Ok(()) +} + +pub fn tracing_init() -> Result<(), ()> { + use std::sync::atomic::Ordering; + let is_good = Arc::new(AtomicUsize::new(0)); + { + let is_good = is_good.clone(); + INIT_TRACING_ONCE.call_once(move || match tracing_init_inner() { + Ok(_) => { + is_good.store(1, Ordering::Release); + } + Err(e) => { + is_good.store(2, Ordering::Release); + eprintln!("tracing_init_inner gave error {e}"); + } + }); + } + let n = is_good.load(Ordering::Acquire); + if n == 2 { + Err(()) + } else if n == 1 { + Ok(()) + } else { + eprintln!("ERROR Unknown tracing state"); + Err(()) } }