diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 56dadee..c694315 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -75,9 +75,9 @@ checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" [[package]] name = "async-channel" -version = "1.7.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e14485364214912d3b19cc3435dde4df66065127f05fa0d75c712f36f12c2f28" +checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" dependencies = [ "concurrent-queue", "event-listener", @@ -107,26 +107,15 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.58" +version = "0.1.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c" +checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364" dependencies = [ "proc-macro2", "quote", "syn", ] -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi", - "libc", - "winapi", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -135,9 +124,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.5.17" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acee9fd5073ab6b045a275b3e709c163dd36c90685219cb21804a147b58dba43" +checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48" dependencies = [ "async-trait", "axum-core", @@ -153,9 +142,9 @@ dependencies = [ "mime", "percent-encoding", "pin-project-lite", + "rustversion", "serde", "sync_wrapper", - "tokio", "tower", "tower-http", "tower-layer", @@ -164,9 +153,9 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.2.9" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37e5939e02c56fecd5c017c37df4238c0a839fa76b7f97acdd7efb804fd181cc" +checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92" dependencies = [ "async-trait", "bytes", @@ -174,6 +163,7 @@ dependencies = [ "http", "http-body", "mime", + "rustversion", "tower-layer", "tower-service", ] @@ -188,7 +178,7 @@ dependencies = [ "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.5.4", "object", "rustc-demangle", ] @@ -288,12 +278,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" -[[package]] -name = "cache-padded" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" - [[package]] name = "cc" version = "1.0.77" @@ -317,7 +301,7 @@ dependencies = [ "num-integer", "num-traits", "serde", - "time 0.1.44", + "time 0.1.45", "wasm-bindgen", "winapi", ] @@ -351,14 +335,14 @@ dependencies = [ [[package]] name = "clap" -version = "4.0.26" +version = "4.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2148adefda54e14492fb9bddcc600b4344c5d1a3123bd666dcb939c6f0e0e57e" +checksum = "4d63b9e9c07271b9957ad22c173bae2a4d9a81127680962039296abcd2f8251d" dependencies = [ - "atty", "bitflags", "clap_derive", "clap_lex", + "is-terminal", "once_cell", "strsim", "termcolor", @@ -421,11 +405,11 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "1.2.4" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af4780a44ab5696ea9e28294517f1fffb421a83a25af521333c838635509db9c" +checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b" dependencies = [ - "cache-padded", + "crossbeam-utils", ] [[package]] @@ -583,9 +567,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.82" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4a41a86530d0fe7f5d9ea779916b7cadd2d4f9add748b99c2c029cbbdfaf453" +checksum = "bdf07d07d6531bfcdbe9b8b739b104610c6508dcc4d63b410585faf338241daf" dependencies = [ "cc", "cxxbridge-flags", @@ -595,9 +579,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.82" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06416d667ff3e3ad2df1cd8cd8afae5da26cf9cec4d0825040f88b5ca659a2f0" +checksum = "d2eb5b96ecdc99f72657332953d4d9c50135af1bac34277801cc3937906ebd39" dependencies = [ "cc", "codespan-reporting", @@ -610,15 +594,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.82" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "820a9a2af1669deeef27cb271f476ffd196a2c4b6731336011e0ba63e2c7cf71" +checksum = "ac040a39517fd1674e0f32177648334b0f4074625b5588a64519804ba0553b12" [[package]] name = "cxxbridge-macro" -version = "1.0.82" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a08a6e2fcc370a089ad3b4aaf54db3b1b4cee38ddabce5896b33eb693275f470" +checksum = "1362b0ddcfc4eb0a1f57b68bd77dd99f0e826958a96abd0ae9bd092e114ffed6" dependencies = [ "proc-macro2", "quote", @@ -665,6 +649,8 @@ dependencies = [ "httpret", "hyper", "items", + "items_0", + "items_2", "lazy_static", "netpod", "nom", @@ -690,7 +676,7 @@ dependencies = [ "hashbrown", "lock_api", "once_cell", - "parking_lot_core 0.9.4", + "parking_lot_core 0.9.5", ] [[package]] @@ -763,6 +749,8 @@ dependencies = [ "httpclient", "hyper", "items", + "items_0", + "items_2", "libc", "netpod", "num-derive", @@ -822,6 +810,7 @@ dependencies = [ "backtrace", "chrono", "erased-serde", + "http", "regex", "rmp-serde", "serde", @@ -830,6 +819,27 @@ dependencies = [ "url", ] +[[package]] +name = "errno" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -853,12 +863,12 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.24" +version = "1.0.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" +checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.6.2", ] [[package]] @@ -1126,6 +1136,15 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" +dependencies = [ + "libc", +] + [[package]] name = "hex" version = "0.4.3" @@ -1219,12 +1238,12 @@ dependencies = [ "dbconn", "disk", "err", - "futures-core", "futures-util", "http", "hyper", "hyper-tls", "items", + "items_0", "items_2", "itertools", "md-5 0.9.1", @@ -1353,6 +1372,28 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "io-lifetimes" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c" +dependencies = [ + "libc", + "windows-sys 0.42.0", +] + +[[package]] +name = "is-terminal" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "927609f78c2913a6f6ac3c27a4fe87f43e2a35367c0c4b0f8265e8f49a104330" +dependencies = [ + "hermit-abi 0.2.6", + "io-lifetimes", + "rustix", + "windows-sys 0.42.0", +] + [[package]] name = "items" version = "0.0.1-a.dev.4" @@ -1366,6 +1407,7 @@ dependencies = [ "erased-serde", "err", "futures-util", + "items_0", "items_proc", "netpod", "num-traits", @@ -1376,6 +1418,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "items_0" +version = "0.0.1" +dependencies = [ + "erased-serde", + "err", + "netpod", + "serde", + "serde_json", +] + [[package]] name = "items_2" version = "0.0.1" @@ -1387,6 +1440,7 @@ dependencies = [ "err", "futures-util", "items", + "items_0", "items_proc", "netpod", "num-traits", @@ -1436,9 +1490,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.137" +version = "0.2.138" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc7fcc620a3bff7cdd7a365be3376c97191aeaccc2a603e600951e452615bf89" +checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8" [[package]] name = "link-cplusplus" @@ -1449,6 +1503,12 @@ dependencies = [ "cc", ] +[[package]] +name = "linux-raw-sys" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f9f08d8963a6c613f4b1a78f4f4a4dbfadf8e6545b2d72861731e4858b8b47f" + [[package]] name = "lock_api" version = "0.4.9" @@ -1497,9 +1557,9 @@ dependencies = [ [[package]] name = "matchit" -version = "0.5.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" +checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" [[package]] name = "md-5" @@ -1557,6 +1617,15 @@ dependencies = [ "adler", ] +[[package]] +name = "miniz_oxide" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.8.5" @@ -1616,10 +1685,10 @@ dependencies = [ "dbconn", "disk", "err", - "futures-core", "futures-util", "hex", "items", + "items_0", "items_2", "netpod", "scylla", @@ -1701,7 +1770,7 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6058e64324c71e02bc2b150e4f3bc8286db6c83092132ffa3f6b1eab0f9def5" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", ] @@ -1749,9 +1818,9 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl" -version = "0.10.42" +version = "0.10.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12fc0523e3bd51a692c8850d075d74dc062ccf251c0110668cbd921917118a13" +checksum = "29d971fd5722fec23977260f6e81aa67d2f22cadbdc2aa049f1022d9a3be1566" dependencies = [ "bitflags", "cfg-if", @@ -1781,9 +1850,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.77" +version = "0.9.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b03b84c3b2d099b81f0953422b4d4ad58761589d0229b5506356afca05a3670a" +checksum = "5454462c0eced1e97f2ec09036abc8da362e66802f66fd20f86854d9d8cbcbc4" dependencies = [ "autocfg", "cc", @@ -1822,7 +1891,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.4", + "parking_lot_core 0.9.5", ] [[package]] @@ -1841,9 +1910,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dc9e0dc2adc1c69d09143aff38d3d30c5c3f0df0dad82e6d25547af174ebec0" +checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba" dependencies = [ "cfg-if", "libc", @@ -2022,9 +2091,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0841812012b2d4a6145fae9a6af1534873c32aa67fff26bd09f8fa42c83f95a" +checksum = "c0b18e655c21ff5ac2084a5ad0611e827b3f92badf79f4910b5a5c58f4d87ff0" dependencies = [ "bytes", "prost-derive", @@ -2164,6 +2233,20 @@ version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" +[[package]] +name = "rustix" +version = "0.36.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3807b5d10909833d3e9acd1eb5fb988f79376ff10fce42937de71a449c4c588" +dependencies = [ + "bitflags", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys", + "windows-sys 0.42.0", +] + [[package]] name = "rustversion" version = "1.0.9" @@ -2271,6 +2354,7 @@ dependencies = [ "erased-serde", "err", "futures-util", + "items_0", "items_2", "netpod", "num-traits", @@ -2307,9 +2391,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.147" +version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d193d69bae983fc11a79df82342761dfbf28a99fc8d203dca4c3c1b590948965" +checksum = "256b9932320c590e707b94576e3cc1f7c9024d0ee6612dfbcf1cb106cbe8e055" dependencies = [ "serde_derive", ] @@ -2335,9 +2419,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.147" +version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f1d362ca8fc9c3e3a7484440752472d68a6caa98f1ab81d99b5dfe517cec852" +checksum = "b4eae9b04cbffdfd550eb462ed33bc6a1b68c935127d008b27444d08380f94e4" dependencies = [ "proc-macro2", "quote", @@ -2433,6 +2517,7 @@ dependencies = [ "err", "futures-util", "items", + "items_0", "items_2", "netpod", "parse", @@ -2487,9 +2572,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.103" +version = "1.0.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a864042229133ada95abf3b54fdc62ef5ccabe9515b64717bcb9a1919e59445d" +checksum = "60b9b43d45702de4c839cb9b51d9f529c5dd26a4aff255b42b1ebc03e88ee908" dependencies = [ "proc-macro2", "quote", @@ -2572,9 +2657,9 @@ dependencies = [ [[package]] name = "time" -version = "0.1.44" +version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" +checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" dependencies = [ "libc", "wasi 0.10.0+wasi-snapshot-preview1", @@ -2634,9 +2719,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.22.0" +version = "1.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d76ce4a75fb488c605c54bf610f221cea8b0dafb53333c1a67e8ee199dcd2ae3" +checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" dependencies = [ "autocfg", "bytes", @@ -2648,7 +2733,7 @@ dependencies = [ "socket2", "tokio-macros", "tracing", - "winapi", + "windows-sys 0.42.0", ] [[package]] @@ -2663,9 +2748,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.8.0" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" dependencies = [ "proc-macro2", "quote", @@ -2742,9 +2827,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55b9af819e54b8f33d453655bef9b9acc171568fb49523078d0cc4e7484200ec" +checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" dependencies = [ "async-stream", "async-trait", @@ -2794,9 +2879,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" +checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858" dependencies = [ "bitflags", "bytes", @@ -2950,9 +3035,9 @@ dependencies = [ [[package]] name = "typenum" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" +checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" [[package]] name = "unicode-bidi" diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs index 1a6c025..9588b99 100644 --- a/daqbufp2/src/test/api4/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use crate::test::f32_iter_cmp_near; @@ -5,9 +7,9 @@ use chrono::{DateTime, Utc}; use err::Error; use http::StatusCode; use hyper::Body; -use netpod::log::*; use netpod::query::PlainEventsQuery; use netpod::APP_JSON; +use netpod::{log::*, AggKind}; use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange}; use serde_json::Value as JsonValue; use url::Url; @@ -103,7 +105,14 @@ async fn events_plain_json( let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsQuery::new(channel, range, 1024 * 4, None, false); + let query = PlainEventsQuery::new( + channel, + range, + AggKind::TimeWeightedScalar, + Duration::from_millis(10000), + None, + true, + ); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index c576482..5f0b6d5 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -10,12 +10,13 @@ use hyper::Body; use items::numops::NumOps; use items::scalarevents::ScalarEvents; use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen}; -use netpod::log::*; use netpod::query::PlainEventsQuery; +use netpod::{log::*, AggKind}; use netpod::{AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_JSON, APP_OCTET}; use serde_json::Value as JsonValue; use std::fmt::Debug; use std::future::ready; +use std::time::Duration; use streams::frames::inmem::InMemoryFrameAsyncReadStream; use tokio::io::AsyncRead; use url::Url; @@ -82,7 +83,14 @@ where series: None, }; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsQuery::new(channel, range, 1024 * 4, None, false); + let query = PlainEventsQuery::new( + channel, + range, + AggKind::TimeWeightedScalar, + Duration::from_millis(10000), + None, + true, + ); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); @@ -309,7 +317,14 @@ pub async fn get_plain_events_json( let beg_date: DateTime = beg_date.parse()?; let end_date: DateTime = end_date.parse()?; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsQuery::new(channel, range, 1024 * 4, None, false); + let query = PlainEventsQuery::new( + channel, + range, + AggKind::TimeWeightedScalar, + Duration::from_millis(10000), + None, + true, + ); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); diff --git a/dbconn/src/bincache.rs b/dbconn/src/bincache.rs index f1621df..7b054b8 100644 --- a/dbconn/src/bincache.rs +++ b/dbconn/src/bincache.rs @@ -5,7 +5,7 @@ use futures_util::{Future, Stream, StreamExt}; use items::binsdim0::MinMaxAvgDim0Bins; use items::{empty_binned_dyn, empty_events_dyn, RangeCompletableItem, StreamItem, TimeBinned}; use netpod::log::*; -use netpod::query::{CacheUsage, RawEventsQuery}; +use netpod::query::{CacheUsage, PlainEventsQuery}; use netpod::timeunits::*; use netpod::{ AggKind, ChannelTyped, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ScalarType, ScyllaConfig, @@ -348,7 +348,14 @@ pub async fn fetch_uncached_binned_events( let deadline = deadline .checked_add(Duration::from_millis(6000)) .ok_or_else(|| Error::with_msg_no_trace(format!("deadline overflow")))?; - let evq = RawEventsQuery::new(chn.channel.clone(), coord.patch_range(), agg_kind); + let evq = PlainEventsQuery::new( + chn.channel.clone(), + coord.patch_range(), + agg_kind, + Duration::from_millis(6000), + None, + true, + ); let mut events_dyn = EventsStreamScylla::new(series, &evq, chn.scalar_type.clone(), chn.shape.clone(), scy, false); let mut complete = false; loop { diff --git a/dbconn/src/channelconfig.rs b/dbconn/src/channelconfig.rs new file mode 100644 index 0000000..744dfd9 --- /dev/null +++ b/dbconn/src/channelconfig.rs @@ -0,0 +1,137 @@ +use err::Error; +use netpod::log::*; +use netpod::{Channel, NodeConfigCached, ScalarType, Shape}; + +use crate::ErrConv; + +pub struct ChConf { + pub series: u64, + pub scalar_type: ScalarType, + pub shape: Shape, +} + +/// It is an unsolved question as to how we want to uniquely address channels. +/// Currently, the usual (backend, channelname) works in 99% of the cases, but the edge-cases +/// are not solved. At the same time, it is desirable to avoid to complicate things for users. +/// Current state: +/// If the series id is given, we take that. +/// Otherwise we try to uniquely identify the series id from the given information. +/// In the future, we can even try to involve time range information for that, but backends like +/// old archivers and sf databuffer do not support such lookup. +pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> Result { + if channel.backend != ncc.node_config.cluster.backend { + warn!( + "mismatched backend {} vs {}", + channel.backend, ncc.node_config.cluster.backend + ); + } + if channel.backend() == "test-inmem" { + let ret = if channel.name() == "inmem-d0-i32" { + let ret = ChConf { + series: 1, + scalar_type: ScalarType::I32, + shape: Shape::Scalar, + }; + Ok(ret) + } else { + error!("no test information"); + Err(Error::with_msg_no_trace(format!("no test information")) + .add_public_msg("No channel config for test channel {:?}")) + }; + return ret; + } + if channel.backend() == "test-disk-databuffer" { + // TODO the series-ids here are just random. Need to integrate with better test setup. + let ret = if channel.name() == "scalar-i32-be" { + let ret = ChConf { + series: 1, + scalar_type: ScalarType::I32, + shape: Shape::Scalar, + }; + Ok(ret) + } else if channel.name() == "wave-f64-be-n21" { + let ret = ChConf { + series: 2, + scalar_type: ScalarType::F64, + shape: Shape::Wave(21), + }; + Ok(ret) + } else if channel.name() == "const-regular-scalar-i32-be" { + let ret = ChConf { + series: 3, + scalar_type: ScalarType::I32, + shape: Shape::Scalar, + }; + Ok(ret) + } else { + error!("no test information"); + Err(Error::with_msg_no_trace(format!("no test information")) + .add_public_msg("No channel config for test channel {:?}")) + }; + return ret; + } + // TODO use a common already running worker pool for these queries: + let dbconf = &ncc.node_config.cluster.database; + let dburl = format!( + "postgresql://{}:{}@{}:{}/{}", + dbconf.user, dbconf.pass, dbconf.host, dbconf.port, dbconf.name + ); + let (pgclient, pgconn) = tokio_postgres::connect(&dburl, tokio_postgres::NoTls) + .await + .err_conv()?; + tokio::spawn(pgconn); + if let Some(series) = channel.series() { + let res = pgclient + .query( + "select scalar_type, shape_dims from series_by_channel where series = $1", + &[&(series as i64)], + ) + .await + .err_conv()?; + if res.len() < 1 { + warn!("can not find channel information for series {series} given through {channel:?}"); + let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?}")); + Err(e) + } else { + let row = res.first().unwrap(); + let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(0) as u8)?; + // TODO can I get a slice from psql driver? + let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(1))?; + let ret = ChConf { + series, + scalar_type, + shape, + }; + Ok(ret) + } + } else { + let res = pgclient + .query( + "select series, scalar_type, shape_dims from series_by_channel where facility = $1 and channel = $2", + &[&channel.backend(), &channel.name()], + ) + .await + .err_conv()?; + if res.len() < 1 { + warn!("can not find channel information for {channel:?}"); + let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?}")); + Err(e) + } else if res.len() > 1 { + warn!("ambigious channel {channel:?}"); + let e = Error::with_public_msg_no_trace(format!("ambigious channel {channel:?}")); + Err(e) + } else { + let row = res.first().unwrap(); + let series = row.get::<_, i64>(0) as u64; + let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(1) as u8)?; + // TODO can I get a slice from psql driver? + let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(2))?; + let ret = ChConf { + series, + scalar_type, + shape, + }; + Ok(ret) + } + } +} diff --git a/dbconn/src/dbconn.rs b/dbconn/src/dbconn.rs index 048185f..69bd27e 100644 --- a/dbconn/src/dbconn.rs +++ b/dbconn/src/dbconn.rs @@ -5,14 +5,17 @@ pub mod search; pub mod pg { pub use tokio_postgres::{Client, Error}; } +pub mod channelconfig; + use err::Error; -use netpod::log::*; +use netpod::{log::*, ScalarType, Shape}; use netpod::{Channel, Database, NodeConfigCached, ScyllaConfig}; use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; use scylla::Session as ScySession; +use std::sync::Arc; use std::time::Duration; -use tokio_postgres::{Client, NoTls}; +use tokio_postgres::{Client, Client as PgClient, NoTls}; trait ErrConv { fn err_conv(self) -> Result; @@ -188,3 +191,40 @@ pub async fn insert_channel(name: String, facility: i64, dbc: &Client) -> Result } Ok(()) } + +pub async fn find_series(channel: &Channel, pgclient: Arc) -> Result<(u64, ScalarType, Shape), Error> { + info!("find_series channel {:?}", channel); + let rows = if let Some(series) = channel.series() { + let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where series = $1"; + pgclient.query(q, &[&(series as i64)]).await.err_conv()? + } else { + let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where facility = $1 and channel = $2"; + pgclient + .query(q, &[&channel.backend(), &channel.name()]) + .await + .err_conv()? + }; + if rows.len() < 1 { + return Err(Error::with_public_msg_no_trace(format!( + "No series found for {channel:?}" + ))); + } + if rows.len() > 1 { + error!("Multiple series found for {channel:?}"); + return Err(Error::with_public_msg_no_trace( + "Multiple series found for channel, can not return data for ambiguous series", + )); + } + let row = rows + .into_iter() + .next() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?; + let series = row.get::<_, i64>(0) as u64; + let _facility: String = row.get(1); + let _channel: String = row.get(2); + let a: i32 = row.get(3); + let scalar_type = ScalarType::from_scylla_i32(a)?; + let a: Vec = row.get(4); + let shape = Shape::from_scylla_shape_dims(&a)?; + Ok((series, scalar_type, shape)) +} diff --git a/dbconn/src/events_scylla.rs b/dbconn/src/events_scylla.rs index f4658e1..52fc8ff 100644 --- a/dbconn/src/events_scylla.rs +++ b/dbconn/src/events_scylla.rs @@ -5,9 +5,9 @@ use items::scalarevents::ScalarEvents; use items::waveevents::WaveEvents; use items::{EventsDyn, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; -use netpod::query::{ChannelStateEventsQuery, RawEventsQuery}; +use netpod::query::{ChannelStateEventsQuery, PlainEventsQuery}; use netpod::timeunits::DAY; -use netpod::{Database, NanoRange, ScalarType, ScyllaConfig, Shape}; +use netpod::{NanoRange, ScalarType, ScyllaConfig, Shape}; use scylla::Session as ScySession; use std::collections::VecDeque; use std::pin::Pin; @@ -126,8 +126,6 @@ enum FrState { pub struct EventsStreamScylla { state: FrState, series: u64, - #[allow(unused)] - evq: RawEventsQuery, scalar_type: ScalarType, shape: Shape, range: NanoRange, @@ -139,7 +137,7 @@ pub struct EventsStreamScylla { impl EventsStreamScylla { pub fn new( series: u64, - evq: &RawEventsQuery, + evq: &PlainEventsQuery, scalar_type: ScalarType, shape: Shape, scy: Arc, @@ -148,10 +146,9 @@ impl EventsStreamScylla { Self { state: FrState::New, series, - evq: evq.clone(), scalar_type, shape, - range: evq.range.clone(), + range: evq.range().clone(), ts_msps: VecDeque::new(), scy, do_test_stream_error, @@ -476,18 +473,6 @@ read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16"); -// TODO remove -#[allow(unused)] -async fn make_scylla_stream( - _evq: &RawEventsQuery, - _scyco: &ScyllaConfig, - _dbconf: Database, - _do_test_stream_error: bool, -) -> Result>> + Send>>, Error> { - error!("forward call to crate scyllaconn"); - err::todoval() -} - pub async fn channel_state_events( evq: &ChannelStateEventsQuery, scyco: &ScyllaConfig, diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs index 331d032..2a714bd 100644 --- a/disk/src/merge/mergedblobsfromremotes.rs +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -5,7 +5,7 @@ use futures_util::{pin_mut, StreamExt}; use items::eventfull::EventFull; use items::Sitemty; use netpod::log::*; -use netpod::query::RawEventsQuery; +use netpod::query::PlainEventsQuery; use netpod::{Cluster, PerfOpts}; use std::future::Future; use std::pin::Pin; @@ -24,7 +24,7 @@ pub struct MergedBlobsFromRemotes { } impl MergedBlobsFromRemotes { - pub fn new(evq: RawEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { + pub fn new(evq: PlainEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { debug!("MergedBlobsFromRemotes evq {:?}", evq); let mut tcp_establish_futs = vec![]; for node in &cluster.nodes { diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 09f4d6c..fe94d40 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -10,7 +10,7 @@ use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, Stream use items_2::channelevents::ChannelEvents; use items_2::eventsdim0::EventsDim0; use netpod::log::*; -use netpod::query::RawEventsQuery; +use netpod::query::PlainEventsQuery; use netpod::{AggKind, ByteOrder, ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, ConfigEntry, MatchingConfigEntry}; use std::collections::VecDeque; @@ -182,17 +182,17 @@ macro_rules! pipe1 { } pub async fn make_event_pipe( - evq: &RawEventsQuery, + evq: &PlainEventsQuery, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { if false { - match dbconn::channel_exists(&evq.channel, &node_config).await { + match dbconn::channel_exists(&evq.channel(), &node_config).await { Ok(_) => (), Err(e) => return Err(e)?, } } - let range = &evq.range; - let channel_config = match read_local_config(evq.channel.clone(), node_config.node.clone()).await { + let range = evq.range().clone(); + let channel_config = match read_local_config(evq.channel().clone(), node_config.node.clone()).await { Ok(k) => k, Err(e) => { // TODO introduce detailed error type @@ -204,7 +204,7 @@ pub async fn make_event_pipe( } } }; - let entry_res = match extract_matching_config_entry(range, &channel_config) { + let entry_res = match extract_matching_config_entry(&range, &channel_config) { Ok(k) => k, Err(e) => return Err(e)?, }; @@ -218,7 +218,7 @@ pub async fn make_event_pipe( Err(e) => return Err(e)?, }; let channel_config = netpod::ChannelConfig { - channel: evq.channel.clone(), + channel: evq.channel().clone(), keyspace: entry.ks as u8, time_bin_size: entry.bs, shape: shape, @@ -229,7 +229,7 @@ pub async fn make_event_pipe( }; trace!( "make_event_pipe need_expand {need_expand} {evq:?}", - need_expand = evq.agg_kind.need_expand() + need_expand = evq.agg_kind().need_expand() ); let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let event_blobs = EventChunkerMultifile::new( @@ -237,9 +237,9 @@ pub async fn make_event_pipe( channel_config.clone(), node_config.node.clone(), node_config.ix, - evq.disk_io_tune.clone(), + DiskIoTune::default(), event_chunker_conf, - evq.agg_kind.need_expand(), + evq.agg_kind().need_expand(), true, ); let shape = entry.to_shape()?; @@ -247,7 +247,7 @@ pub async fn make_event_pipe( entry.scalar_type, entry.byte_order, shape, - evq.agg_kind.clone(), + evq.agg_kind().clone(), event_blobs ); Ok(pipe) @@ -350,30 +350,30 @@ pub fn make_remote_event_blobs_stream( } pub async fn make_event_blobs_pipe( - evq: &RawEventsQuery, + evq: &PlainEventsQuery, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { if false { - match dbconn::channel_exists(&evq.channel, &node_config).await { + match dbconn::channel_exists(evq.channel(), &node_config).await { Ok(_) => (), Err(e) => return Err(e)?, } } - let expand = evq.agg_kind.need_expand(); - let range = &evq.range; - let entry = get_applicable_entry(&evq.range, evq.channel.clone(), node_config).await?; + let expand = evq.agg_kind().need_expand(); + let range = evq.range(); + let entry = get_applicable_entry(evq.range(), evq.channel().clone(), node_config).await?; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); type ItemType = Sitemty; // TODO should depend on host config let pipe = if true { let event_blobs = make_remote_event_blobs_stream( range.clone(), - evq.channel.clone(), + evq.channel().clone(), &entry, expand, - evq.do_decompress, + true, event_chunker_conf, - evq.disk_io_tune.clone(), + DiskIoTune::default(), node_config, )?; let s = event_blobs.map(|item: ItemType| Box::new(item) as Box); @@ -384,12 +384,12 @@ pub async fn make_event_blobs_pipe( } else { let event_blobs = make_local_event_blobs_stream( range.clone(), - evq.channel.clone(), + evq.channel().clone(), &entry, expand, - evq.do_decompress, + true, event_chunker_conf, - evq.disk_io_tune.clone(), + DiskIoTune::default(), node_config, )?; let s = event_blobs.map(|item: ItemType| Box::new(item) as Box); diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 953369c..b4081b9 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -9,7 +9,7 @@ use items::eventfull::EventFull; use items::{RangeCompletableItem, Sitemty, StreamItem}; use itertools::Itertools; use netpod::query::api1::Api1Query; -use netpod::query::RawEventsQuery; +use netpod::query::PlainEventsQuery; use netpod::timeunits::SEC; use netpod::{log::*, ScalarType}; use netpod::{ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, PerfOpts, Shape}; @@ -773,7 +773,14 @@ impl Stream for DataApiPython3DataStream { }; let channel = self.channels[self.chan_ix - 1].clone(); debug!("found channel_config for {}: {:?}", channel.name, entry); - let evq = RawEventsQuery::new(channel, self.range.clone(), netpod::AggKind::EventBlobs); + let evq = PlainEventsQuery::new( + channel, + self.range.clone(), + netpod::AggKind::EventBlobs, + Duration::from_millis(10000), + None, + true, + ); let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 }; // TODO is this a good to place decide this? let s = if self.node_config.node_config.cluster.is_central_storage { @@ -781,11 +788,11 @@ impl Stream for DataApiPython3DataStream { // TODO pull up this config let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let s = disk::raw::conn::make_local_event_blobs_stream( - evq.range.clone(), - evq.channel.clone(), + evq.range().clone(), + evq.channel().clone(), &entry, - evq.agg_kind.need_expand(), - evq.do_decompress, + evq.agg_kind().need_expand(), + true, event_chunker_conf, self.disk_io_tune.clone(), &self.node_config, diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 3f58c9d..465014b 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -1,5 +1,6 @@ use crate::err::Error; use crate::{response, ToPublicResponse}; +use dbconn::channelconfig::{chconf_from_database, ChConf}; use dbconn::{create_connection, create_scylla_connection}; use futures_util::StreamExt; use http::{Method, Request, Response, StatusCode}; @@ -21,144 +22,12 @@ use std::convert::TryInto; use std::time::{Duration, Instant}; use url::Url; -pub struct ChConf { - pub series: u64, - pub scalar_type: ScalarType, - pub shape: Shape, -} - -/// It is an unsolved question as to how we want to uniquely address channels. -/// Currently, the usual (backend, channelname) works in 99% of the cases, but the edge-cases -/// are not solved. At the same time, it is desirable to avoid to complicate things for users. -/// Current state: -/// If the series id is given, we take that. -/// Otherwise we try to uniquely identify the series id from the given information. -/// In the future, we can even try to involve time range information for that, but backends like -/// old archivers and sf databuffer do not support such lookup. -pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> Result { - if channel.backend != ncc.node_config.cluster.backend { - warn!( - "mismatched backend {} vs {}", - channel.backend, ncc.node_config.cluster.backend - ); - } - if channel.backend() == "test-inmem" { - let ret = if channel.name() == "inmem-d0-i32" { - let ret = ChConf { - series: 1, - scalar_type: ScalarType::I32, - shape: Shape::Scalar, - }; - Ok(ret) - } else { - error!("no test information"); - Err(Error::with_msg_no_trace(format!("no test information")) - .add_public_msg("No channel config for test channel {:?}")) - }; - return ret; - } - if channel.backend() == "test-disk-databuffer" { - // TODO the series-ids here are just random. Need to integrate with better test setup. - let ret = if channel.name() == "scalar-i32-be" { - let ret = ChConf { - series: 1, - scalar_type: ScalarType::I32, - shape: Shape::Scalar, - }; - Ok(ret) - } else if channel.name() == "wave-f64-be-n21" { - let ret = ChConf { - series: 2, - scalar_type: ScalarType::F64, - shape: Shape::Wave(21), - }; - Ok(ret) - } else if channel.name() == "const-regular-scalar-i32-be" { - let ret = ChConf { - series: 3, - scalar_type: ScalarType::I32, - shape: Shape::Scalar, - }; - Ok(ret) - } else { - error!("no test information"); - Err(Error::with_msg_no_trace(format!("no test information")) - .add_public_msg("No channel config for test channel {:?}")) - }; - return ret; - } - // TODO use a common already running worker pool for these queries: - let dbconf = &ncc.node_config.cluster.database; - let dburl = format!( - "postgresql://{}:{}@{}:{}/{}", - dbconf.user, dbconf.pass, dbconf.host, dbconf.port, dbconf.name - ); - let (pgclient, pgconn) = tokio_postgres::connect(&dburl, tokio_postgres::NoTls) - .await - .err_conv()?; - tokio::spawn(pgconn); - if let Some(series) = channel.series() { - let res = pgclient - .query( - "select scalar_type, shape_dims from series_by_channel where series = $1", - &[&(series as i64)], - ) - .await - .err_conv()?; - if res.len() < 1 { - warn!("can not find channel information for series {series} given through {channel:?}"); - let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?}")); - Err(e) - } else { - let row = res.first().unwrap(); - let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(0) as u8)?; - // TODO can I get a slice from psql driver? - let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(1))?; - let ret = ChConf { - series, - scalar_type, - shape, - }; - Ok(ret) - } - } else { - let res = pgclient - .query( - "select series, scalar_type, shape_dims from series_by_channel where facility = $1 and channel = $2", - &[&channel.backend(), &channel.name()], - ) - .await - .err_conv()?; - if res.len() < 1 { - warn!("can not find channel information for {channel:?}"); - let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?}")); - Err(e) - } else if res.len() > 1 { - warn!("ambigious channel {channel:?}"); - let e = Error::with_public_msg_no_trace(format!("ambigious channel {channel:?}")); - Err(e) - } else { - let row = res.first().unwrap(); - let series = row.get::<_, i64>(0) as u64; - let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(1) as u8)?; - // TODO can I get a slice from psql driver? - let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(2))?; - let ret = ChConf { - series, - scalar_type, - shape, - }; - Ok(ret) - } - } -} - pub async fn chconf_from_events_binary(q: &PlainEventsQuery, ncc: &NodeConfigCached) -> Result { - chconf_from_database(q.channel(), ncc).await + chconf_from_database(q.channel(), ncc).await.map_err(Into::into) } pub async fn chconf_from_events_json(q: &PlainEventsQuery, ncc: &NodeConfigCached) -> Result { - chconf_from_database(q.channel(), ncc).await + chconf_from_database(q.channel(), ncc).await.map_err(Into::into) } pub async fn chconf_from_prebinned(q: &PreBinnedQuery, _ncc: &NodeConfigCached) -> Result { @@ -174,7 +43,7 @@ pub async fn chconf_from_prebinned(q: &PreBinnedQuery, _ncc: &NodeConfigCached) } pub async fn chconf_from_binned(q: &BinnedQuery, ncc: &NodeConfigCached) -> Result { - chconf_from_database(q.channel(), ncc).await + chconf_from_database(q.channel(), ncc).await.map_err(Into::into) } pub struct ChannelConfigHandler {} diff --git a/httpret/src/events.rs b/httpret/src/events.rs index 099eeac..fe2d22d 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -8,15 +8,15 @@ use items_2::channelevents::ChannelEvents; use items_2::merger_cev::ChannelEventsMerger; use items_2::{binned_collected, empty_events_dyn, empty_events_dyn_2}; use netpod::log::*; -use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery, RawEventsQuery}; +use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery}; use netpod::{AggKind, BinnedRange, FromUrl, NodeConfigCached}; use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; use scyllaconn::create_scy_session; use scyllaconn::errconv::ErrConv; -use scyllaconn::events::{channel_state_events, find_series, make_scylla_stream}; +use scyllaconn::events::{channel_state_events, make_scylla_stream}; use std::pin::Pin; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use url::Url; pub struct EventsHandler {} @@ -103,8 +103,15 @@ async fn plain_events_json( query.set_series_id(chconf.series); let query = query; // --- - let query = RawEventsQuery::new(query.channel().clone(), query.range().clone(), AggKind::Plain); - let item = streams::plaineventsjson::plain_events_json(query, &node_config.node_config.cluster).await?; + //let query = RawEventsQuery::new(query.channel().clone(), query.range().clone(), AggKind::Plain); + let item = streams::plaineventsjson::plain_events_json(&query, &node_config.node_config.cluster).await; + let item = match item { + Ok(item) => item, + Err(e) => { + error!("got error from streams::plaineventsjson::plain_events_json {e:?}"); + return Err(e.into()); + } + }; let buf = serde_json::to_vec(&item)?; let ret = response(StatusCode::OK).body(Body::from(buf))?; Ok(ret) @@ -188,8 +195,8 @@ impl EventsHandlerScylla { return Err(Error::with_public_msg(format!("no scylla configured"))); }; let scy = create_scy_session(scyco).await?; - let do_one_before_range = evq.do_one_before_range(); - let (series, scalar_type, shape) = find_series(evq.channel(), pgclient.clone()).await?; + let do_one_before_range = evq.agg_kind().need_expand(); + let (series, scalar_type, shape) = dbconn::find_series(evq.channel(), pgclient.clone()).await?; let empty_item = empty_events_dyn_2(&scalar_type, &shape, &AggKind::TimeWeightedScalar); let empty_stream = futures_util::stream::once(futures_util::future::ready(Ok(ChannelEvents::Events(empty_item)))); @@ -342,10 +349,17 @@ impl BinnedHandlerScylla { let scy = create_scy_session(scyco).await?; let covering = BinnedRange::covering_range(evq.range().clone(), evq.bin_count())?; let range = covering.full_range(); - let mut query2 = PlainEventsQuery::new(evq.channel().clone(), range.clone(), 0, None, false); + let mut query2 = PlainEventsQuery::new( + evq.channel().clone(), + range.clone(), + evq.agg_kind().clone(), + Duration::from_millis(6000), + None, + false, + ); query2.set_timeout(evq.timeout()); let query2 = query2; - let (series, scalar_type, shape) = find_series(evq.channel(), pgclient.clone()).await?; + let (series, scalar_type, shape) = dbconn::find_series(evq.channel(), pgclient.clone()).await?; let stream = make_scylla_stream( &query2, do_one_before_range, diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 5f7338e..ded8915 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -4,7 +4,6 @@ pub mod prebinned; use crate::get_url_query_pairs; use crate::log::*; -use crate::DiskIoTune; use crate::{AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos}; use chrono::{DateTime, TimeZone, Utc}; use err::Error; @@ -66,50 +65,15 @@ impl fmt::Display for CacheUsage { } } -/// Query parameters to request (optionally) X-processed, but not T-processed events. -// TODO maybe merge with PlainEventsQuery? #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct RawEventsQuery { - pub channel: Channel, - pub range: NanoRange, - pub agg_kind: AggKind, - #[serde(default)] - pub disk_io_tune: DiskIoTune, - #[serde(default)] - pub do_decompress: bool, - #[serde(default)] - pub do_test_main_error: bool, - #[serde(default)] - pub do_test_stream_error: bool, -} - -impl RawEventsQuery { - pub fn new(channel: Channel, range: NanoRange, agg_kind: AggKind) -> Self { - Self { - channel, - range, - agg_kind, - disk_io_tune: DiskIoTune::default(), - do_decompress: false, - do_test_main_error: false, - do_test_stream_error: false, - } - } - - pub fn channel(&self) -> &Channel { - &self.channel - } -} - -#[derive(Clone, Debug, Serialize)] pub struct PlainEventsQuery { channel: Channel, range: NanoRange, - do_one_before_range: bool, - disk_io_buffer_size: usize, - report_error: bool, + agg_kind: AggKind, timeout: Duration, events_max: Option, + stream_batch_len: Option, + report_error: bool, do_log: bool, do_test_main_error: bool, do_test_stream_error: bool, @@ -119,18 +83,19 @@ impl PlainEventsQuery { pub fn new( channel: Channel, range: NanoRange, - disk_io_buffer_size: usize, + agg_kind: AggKind, + timeout: Duration, events_max: Option, do_log: bool, ) -> Self { Self { channel, range, - do_one_before_range: false, - disk_io_buffer_size, - report_error: false, - timeout: Duration::from_millis(10000), + agg_kind, + timeout, events_max, + stream_batch_len: None, + report_error: false, do_log, do_test_main_error: false, do_test_stream_error: false, @@ -145,12 +110,16 @@ impl PlainEventsQuery { &self.range } + pub fn agg_kind(&self) -> &AggKind { + &self.agg_kind + } + pub fn report_error(&self) -> bool { self.report_error } pub fn disk_io_buffer_size(&self) -> usize { - self.disk_io_buffer_size + 1024 * 8 } pub fn timeout(&self) -> Duration { @@ -173,10 +142,6 @@ impl PlainEventsQuery { self.do_test_stream_error } - pub fn do_one_before_range(&self) -> bool { - self.do_one_before_range - } - pub fn set_series_id(&mut self, series: u64) { self.channel.series = Some(series); } @@ -221,16 +186,7 @@ impl FromUrl for PlainEventsQuery { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), }, - disk_io_buffer_size: pairs - .get("diskIoBufferSize") - .map_or("4096", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, - report_error: pairs - .get("reportError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse reportError {:?}", e)))?, + agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::TimeWeightedScalar), timeout: pairs .get("timeout") .map_or("10000", |k| k) @@ -240,6 +196,14 @@ impl FromUrl for PlainEventsQuery { events_max: pairs .get("eventsMax") .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + stream_batch_len: pairs + .get("streamBatchLen") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + report_error: pairs + .get("reportError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse reportError {:?}", e)))?, do_log: pairs .get("doLog") .map_or("false", |k| k) @@ -255,11 +219,6 @@ impl FromUrl for PlainEventsQuery { .map_or("false", |k| k) .parse() .map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError {:?}", e)))?, - do_one_before_range: pairs - .get("getOneBeforeRange") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse getOneBeforeRange {:?}", e)))?, }; Ok(ret) } @@ -269,6 +228,7 @@ impl AppendToUrl for PlainEventsQuery { fn append_to_url(&self, url: &mut Url) { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; self.channel.append_to_url(url); + binning_scheme_append_to_url(&self.agg_kind, url); let mut g = url.query_pairs_mut(); g.append_pair( "begDate", @@ -278,13 +238,14 @@ impl AppendToUrl for PlainEventsQuery { "endDate", &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), ); - g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); if let Some(x) = self.events_max.as_ref() { g.append_pair("eventsMax", &format!("{}", x)); } + if let Some(x) = self.stream_batch_len.as_ref() { + g.append_pair("streamBatchLen", &format!("{}", x)); + } g.append_pair("doLog", &format!("{}", self.do_log)); - g.append_pair("getOneBeforeRange", &format!("{}", self.do_one_before_range)); } } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index f244f59..991d300 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -1,12 +1,12 @@ use err::Error; -use futures_util::{stream, Stream, StreamExt}; +use futures_util::{Stream, StreamExt}; use items::frame::{decode_frame, make_term_frame}; use items::{EventQueryJsonStringFrame, Framable, RangeCompletableItem, Sitemty, StreamItem}; use items_0::Empty; use items_2::channelevents::ChannelEvents; use netpod::histo::HistoLog2; use netpod::log::*; -use netpod::query::{PlainEventsQuery, RawEventsQuery}; +use netpod::query::PlainEventsQuery; use netpod::AggKind; use netpod::{NodeConfigCached, PerfOpts}; use std::net::SocketAddr; @@ -104,7 +104,7 @@ async fn events_conn_handler_inner_try( }, Err(e) => return Err((e, netout).into()), }; - let res: Result = serde_json::from_str(&qitem.0); + let res: Result = serde_json::from_str(&qitem.0); let evq = match res { Ok(k) => k, Err(e) => { @@ -114,7 +114,7 @@ async fn events_conn_handler_inner_try( }; info!("events_conn_handler_inner_try evq {:?}", evq); - if evq.do_test_main_error { + if evq.channel().name() == "test-do-trigger-main-error" { let e = Error::with_msg(format!("Test error private message.")) .add_public_msg(format!("Test error PUBLIC message.")); return Err((e, netout).into()); @@ -158,19 +158,24 @@ async fn events_conn_handler_inner_try( // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. let do_one_before_range = false; // TODO use better builder pattern with shortcuts for production and dev defaults - let qu = PlainEventsQuery::new(evq.channel, evq.range, 1024 * 8, None, true); + let f = dbconn::channelconfig::chconf_from_database(evq.channel(), node_config) + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))); + let f = match f { + Ok(k) => k, + Err(e) => return Err((e, netout))?, + }; let scyco = conf; - let _dbconf = node_config.node_config.cluster.database.clone(); let scy = match scyllaconn::create_scy_session(scyco).await { Ok(k) => k, Err(e) => return Err((e, netout))?, }; - let series = err::todoval(); - let scalar_type = err::todoval(); - let shape = err::todoval(); + let series = f.series; + let scalar_type = f.scalar_type; + let shape = f.shape; let do_test_stream_error = false; let stream = match scyllaconn::events::make_scylla_stream( - &qu, + &evq, do_one_before_range, series, scalar_type, @@ -183,28 +188,25 @@ async fn events_conn_handler_inner_try( Ok(k) => k, Err(e) => return Err((e, netout))?, }; - let e = Error::with_msg_no_trace("TODO scylla events"); - if true { - return Err((e, netout))?; - } - let _s = stream.map(|item| { + let stream = stream.map(|item| { let item = match item { Ok(item) => match item { - ChannelEvents::Events(_item) => { - // TODO - let item = items_2::eventsdim0::EventsDim0::::empty(); - let item = ChannelEvents::Events(Box::new(item)); + ChannelEvents::Events(item) => { + let item = ChannelEvents::Events(item); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + item + } + ChannelEvents::Status(item) => { + let item = ChannelEvents::Status(item); let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); item } - ChannelEvents::Status(_item) => todo!(), }, Err(e) => Err(e), }; - Box::new(item) as Box + item }); - let s = stream::empty(); - Box::pin(s) + Box::pin(stream) } else if let Some(_) = &node_config.node.channel_archiver { let e = Error::with_msg_no_trace("archapp not built"); return Err((e, netout))?; @@ -212,7 +214,7 @@ async fn events_conn_handler_inner_try( let e = Error::with_msg_no_trace("archapp not built"); return Err((e, netout))?; } else { - let stream = match evq.agg_kind { + let stream = match evq.agg_kind() { AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { Ok(_stream) => { let e = Error::with_msg_no_trace("TODO make_event_blobs_pipe"); @@ -245,6 +247,7 @@ async fn events_conn_handler_inner_try( } } Err(e) => { + error!("events_conn_handler_inner_try sees error in stream: {e:?}"); return Err((e, netout))?; } } @@ -273,6 +276,7 @@ async fn events_conn_handler_inner( match events_conn_handler_inner_try(stream, addr, node_config).await { Ok(_) => (), Err(ce) => { + error!("events_conn_handler_inner sees error {:?}", ce.err); // Try to pass the error over the network. // If that fails, give error to the caller. let mut out = ce.netout; diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index 8ab3655..3be36e4 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use super::*; use items::frame::make_frame; use items::Sitemty; @@ -58,7 +60,7 @@ fn raw_data_00() { beg: SEC, end: SEC * 10, }; - let qu = RawEventsQuery::new(channel, range, AggKind::Plain); + let qu = PlainEventsQuery::new(channel, range, AggKind::Plain, Duration::from_millis(10000), None, true); let query = EventQueryJsonStringFrame(serde_json::to_string(&qu).unwrap()); let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(query))); let frame = make_frame(&item).unwrap(); diff --git a/scyllaconn/src/bincache.rs b/scyllaconn/src/bincache.rs index 8fa40af..aaed907 100644 --- a/scyllaconn/src/bincache.rs +++ b/scyllaconn/src/bincache.rs @@ -7,7 +7,7 @@ use items_2::binsdim0::BinsDim0; use items_2::channelevents::ChannelEvents; use items_2::{empty_binned_dyn, empty_events_dyn}; use netpod::log::*; -use netpod::query::{CacheUsage, PlainEventsQuery, RawEventsQuery}; +use netpod::query::{CacheUsage, PlainEventsQuery}; use netpod::timeunits::*; use netpod::{AggKind, ChannelTyped, ScalarType, Shape}; use netpod::{PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange}; @@ -356,13 +356,20 @@ pub async fn fetch_uncached_binned_events( // We must produce some result with correct types even if upstream delivers nothing at all. let bin0 = empty_events_dyn(&chn.scalar_type, &chn.shape, &agg_kind); let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight); + // TODO handle deadline better let deadline = Instant::now(); let deadline = deadline .checked_add(Duration::from_millis(6000)) .ok_or_else(|| Error::with_msg_no_trace(format!("deadline overflow")))?; let do_one_before_range = agg_kind.need_expand(); - let _evq = RawEventsQuery::new(chn.channel.clone(), coord.patch_range(), agg_kind); - let evq = PlainEventsQuery::new(chn.channel.clone(), coord.patch_range(), 4096, None, true); + let evq = PlainEventsQuery::new( + chn.channel.clone(), + coord.patch_range(), + AggKind::TimeWeightedScalar, + Duration::from_millis(8000), + None, + true, + ); let mut events_dyn = EventsStreamScylla::new( series, evq.range().clone(), diff --git a/scyllaconn/src/events.rs b/scyllaconn/src/events.rs index c28e8fb..6230931 100644 --- a/scyllaconn/src/events.rs +++ b/scyllaconn/src/events.rs @@ -7,13 +7,12 @@ use items_2::eventsdim0::EventsDim0; use netpod::log::*; use netpod::query::{ChannelStateEventsQuery, PlainEventsQuery}; use netpod::timeunits::*; -use netpod::{Channel, NanoRange, ScalarType, Shape}; +use netpod::{NanoRange, ScalarType, Shape}; use scylla::Session as ScySession; use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use tokio_postgres::Client as PgClient; async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { info!("find_ts_msp series {} {:?}", series, range); @@ -559,43 +558,6 @@ impl Stream for EventsStreamScylla { } } -pub async fn find_series(channel: &Channel, pgclient: Arc) -> Result<(u64, ScalarType, Shape), Error> { - info!("find_series channel {:?}", channel); - let rows = if let Some(series) = channel.series() { - let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where series = $1"; - pgclient.query(q, &[&(series as i64)]).await.err_conv()? - } else { - let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where facility = $1 and channel = $2"; - pgclient - .query(q, &[&channel.backend(), &channel.name()]) - .await - .err_conv()? - }; - if rows.len() < 1 { - return Err(Error::with_public_msg_no_trace(format!( - "No series found for {channel:?}" - ))); - } - if rows.len() > 1 { - error!("Multiple series found for {channel:?}"); - return Err(Error::with_public_msg_no_trace( - "Multiple series found for channel, can not return data for ambiguous series", - )); - } - let row = rows - .into_iter() - .next() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?; - let series = row.get::<_, i64>(0) as u64; - let _facility: String = row.get(1); - let _channel: String = row.get(2); - let a: i32 = row.get(3); - let scalar_type = ScalarType::from_scylla_i32(a)?; - let a: Vec = row.get(4); - let shape = Shape::from_scylla_shape_dims(&a)?; - Ok((series, scalar_type, shape)) -} - pub async fn make_scylla_stream( evq: &PlainEventsQuery, do_one_before_range: bool, diff --git a/scyllaconn/src/scyllaconn.rs b/scyllaconn/src/scyllaconn.rs index 6493f60..9299e46 100644 --- a/scyllaconn/src/scyllaconn.rs +++ b/scyllaconn/src/scyllaconn.rs @@ -5,6 +5,7 @@ pub mod events; use err::Error; use errconv::ErrConv; use netpod::ScyllaConfig; +use scylla::statement::Consistency; use scylla::Session as ScySession; use std::sync::Arc; @@ -12,6 +13,7 @@ pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result(query: SER, cluster: &Cluster) -> Result -where - SER: Serialize, -{ +pub async fn plain_events_json(query: &PlainEventsQuery, cluster: &Cluster) -> Result { + let deadline = Instant::now() + query.timeout(); + let events_max = query.events_max().unwrap_or(1024 * 32); // TODO should be able to ask for data-events only, instead of mixed data and status events. let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&query, cluster).await?; //let inps = open_tcp_streams::<_, Box>(&query, cluster).await?; @@ -24,9 +23,7 @@ where let stream = inp0.chain(inp1).chain(inp2); stream }; - let stream = { items_2::merger::Merger::new(inps, 1) }; - let deadline = Instant::now() + Duration::from_millis(3500); - let events_max = 100; + let stream = { items_2::merger::Merger::new(inps, 512) }; let collected = crate::collect::collect(stream, deadline, events_max).await?; let jsval = serde_json::to_value(&collected)?; Ok(jsval) diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index 86efe23..e53b70a 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -14,7 +14,7 @@ use items::frame::{make_frame, make_term_frame}; use items::sitem_data; use items::{EventQueryJsonStringFrame, EventsNodeProcessor, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; -use netpod::query::RawEventsQuery; +use netpod::query::PlainEventsQuery; use netpod::Cluster; use netpod::{Node, PerfOpts}; use std::pin::Pin; @@ -22,7 +22,7 @@ use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; pub async fn x_processed_stream_from_node( - query: RawEventsQuery, + query: PlainEventsQuery, perf_opts: PerfOpts, node: Node, ) -> Result::Output>> + Send>>, Error> @@ -49,7 +49,7 @@ where } pub async fn x_processed_event_blobs_stream_from_node( - query: RawEventsQuery, + query: PlainEventsQuery, perf_opts: PerfOpts, node: Node, ) -> Result> + Send>>, Error> { diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index 744b20a..6c59849 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -3,7 +3,7 @@ use err::Error; use futures_util::StreamExt; #[allow(unused)] use netpod::log::*; -use netpod::query::{BinnedQuery, RawEventsQuery}; +use netpod::query::{BinnedQuery, PlainEventsQuery}; use netpod::{BinnedRange, Cluster}; use serde_json::Value as JsonValue; use std::time::{Duration, Instant}; @@ -12,8 +12,16 @@ pub async fn timebinned_json(query: &BinnedQuery, cluster: &Cluster) -> Result(&rawquery, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader: let stream = { items_2::merger::Merger::new(inps, 1) };