Status events with human readable time, bool set events

This commit is contained in:
Dominik Werder
2023-01-27 15:40:20 +01:00
parent fd3f22fccb
commit f20765ec56
21 changed files with 2230 additions and 599 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -108,7 +108,7 @@ async fn go() -> Result<(), Error> {
}
SubCmd::Logappend(k) => {
let jh = tokio::task::spawn_blocking(move || {
taskrun::append::append(&k.dir, std::io::stdin()).unwrap();
taskrun::append::append(&k.dir, k.total_size_max_bytes(), std::io::stdin()).unwrap();
});
jh.await.map_err(Error::from_string)?;
}

View File

@@ -78,4 +78,12 @@ pub struct BinnedClient {
pub struct Logappend {
#[arg(long)]
pub dir: String,
#[arg(long)]
pub total_mb: Option<u64>,
}
impl Logappend {
pub fn total_size_max_bytes(&self) -> u64 {
1024 * 1024 * self.total_mb.unwrap_or(20)
}
}

View File

@@ -1,18 +1,42 @@
use crate::decode::{BigEndian, Endianness, LittleEndian};
use crate::decode::{EventValueFromBytes, EventValueShape, EventsDecodedStream, NumFromBytes};
use crate::decode::{EventValuesDim0Case, EventValuesDim1Case};
use crate::decode::BigEndian;
use crate::decode::Endianness;
use crate::decode::EventValueFromBytes;
use crate::decode::EventValueShape;
use crate::decode::EventValuesDim0Case;
use crate::decode::EventValuesDim1Case;
use crate::decode::EventsDecodedStream;
use crate::decode::LittleEndian;
use crate::decode::NumFromBytes;
use crate::eventblobs::EventChunkerMultifile;
use err::Error;
use futures_util::{Stream, StreamExt};
use futures_util::Stream;
use futures_util::StreamExt;
use items::eventfull::EventFull;
use items::numops::{BoolNum, NumOps, StringNum};
use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem};
use items::numops::BoolNum;
use items::numops::NumOps;
use items::numops::StringNum;
use items::EventsNodeProcessor;
use items::Framable;
use items::RangeCompletableItem;
use items::Sitemty;
use items::StreamItem;
use items_2::channelevents::ChannelEvents;
use items_2::eventsdim0::EventsDim0;
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::{AggKind, ByteOrder, ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, ScalarType, Shape};
use parse::channelconfig::{extract_matching_config_entry, read_local_config, ConfigEntry, MatchingConfigEntry};
use netpod::AggKind;
use netpod::ByteOrder;
use netpod::ByteSize;
use netpod::Channel;
use netpod::DiskIoTune;
use netpod::NanoRange;
use netpod::NodeConfigCached;
use netpod::ScalarType;
use netpod::Shape;
use parse::channelconfig::extract_matching_config_entry;
use parse::channelconfig::read_local_config;
use parse::channelconfig::ConfigEntry;
use parse::channelconfig::MatchingConfigEntry;
use std::collections::VecDeque;
use std::pin::Pin;
use streams::eventchunker::EventChunkerConf;
@@ -322,6 +346,7 @@ pub fn make_remote_event_blobs_stream(
disk_io_tune: DiskIoTune,
node_config: &NodeConfigCached,
) -> Result<impl Stream<Item = Sitemty<EventFull>>, Error> {
info!("make_remote_event_blobs_stream");
let shape = match entry.to_shape() {
Ok(k) => k,
Err(e) => return Err(e)?,
@@ -352,7 +377,7 @@ pub fn make_remote_event_blobs_stream(
pub async fn make_event_blobs_pipe(
evq: &PlainEventsQuery,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable + Send>> + Send>>, Error> {
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
info!("make_event_blobs_pipe {evq:?}");
if false {
match dbconn::channel_exists(evq.channel(), &node_config).await {
@@ -377,11 +402,12 @@ pub async fn make_event_blobs_pipe(
DiskIoTune::default(),
node_config,
)?;
let s = event_blobs.map(|item: ItemType| Box::new(item) as Box<dyn Framable + Send>);
/*let s = event_blobs.map(|item: ItemType| Box::new(item) as Box<dyn Framable + Send>);
//let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe"));
let pipe: Pin<Box<dyn Stream<Item = Box<dyn Framable + Send>> + Send>>;
pipe = Box::pin(s);
pipe
pipe*/
Box::pin(event_blobs) as _
} else {
let event_blobs = make_local_event_blobs_stream(
range.clone(),
@@ -393,11 +419,12 @@ pub async fn make_event_blobs_pipe(
DiskIoTune::default(),
node_config,
)?;
let s = event_blobs.map(|item: ItemType| Box::new(item) as Box<dyn Framable + Send>);
/*let s = event_blobs.map(|item: ItemType| Box::new(item) as Box<dyn Framable + Send>);
//let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe"));
let pipe: Pin<Box<dyn Stream<Item = Box<dyn Framable + Send>> + Send>>;
pipe = Box::pin(s);
pipe
pipe*/
Box::pin(event_blobs) as _
};
Ok(pipe)
}

View File

@@ -7,6 +7,7 @@ use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use items_2::channelevents::ChannelStatusEvent;
use items_2::channelevents::ConnStatusEvent;
use netpod::log::*;
use netpod::query::ChannelStateEventsQuery;
@@ -72,17 +73,20 @@ impl ConnectionStatusEvents {
.scylla
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?;
let scy = scyllaconn::create_scy_session(scyco).await?;
let _scy = scyllaconn::create_scy_session(scyco).await?;
let chconf = dbconn::channelconfig::chconf_from_database(q.channel(), node_config).await?;
let series = chconf.series;
let do_one_before_range = true;
let mut stream =
let _series = chconf.series;
let _do_one_before_range = true;
let ret = Vec::new();
if true {
return Err(Error::with_msg_no_trace("TODO channel_status fetch_data"));
}
/*let mut stream =
scyllaconn::status::StatusStreamScylla::new(series, q.range().clone(), do_one_before_range, scy);
let mut ret = Vec::new();
while let Some(item) = stream.next().await {
let item = item?;
ret.push(item);
}
}*/
Ok(ret)
}
}
@@ -136,7 +140,7 @@ impl ChannelStatusEvents {
&self,
q: &ChannelStateEventsQuery,
node_config: &NodeConfigCached,
) -> Result<Vec<ConnStatusEvent>, Error> {
) -> Result<Vec<ChannelStatusEvent>, Error> {
let scyco = node_config
.node_config
.cluster

View File

@@ -88,6 +88,7 @@ async fn plain_events_json(
info!("httpret plain_events_json req: {:?}", req);
let (_head, _body) = req.into_parts();
let query = PlainEventsQuery::from_url(&url)?;
info!("plain_events_json query {query:?}");
let chconf = chconf_from_events_json(&query, node_config)
.await
.map_err(Error::from)?;

View File

@@ -1,32 +1,159 @@
use crate::err::Error;
use crate::response;
use async_channel::Receiver;
use async_channel::Sender;
use bytes::Buf;
use bytes::BufMut;
use bytes::{Buf, BytesMut};
use futures_util::stream::{FuturesOrdered, FuturesUnordered};
use bytes::BytesMut;
use futures_util::stream::FuturesOrdered;
use futures_util::stream::FuturesUnordered;
use futures_util::FutureExt;
use http::{Method, StatusCode, Uri};
use hyper::{Body, Request, Response};
use http::Method;
use http::StatusCode;
use http::Uri;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use netpod::log::*;
use netpod::AppendToUrl;
use netpod::FromUrl;
use netpod::HasBackend;
use netpod::HasTimeout;
use netpod::NodeConfigCached;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
use std::future::Future;
use std::io::SeekFrom;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{io::SeekFrom, path::PathBuf};
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use tokio::task::JoinHandle;
use tokio::time::error::Elapsed;
use url::Url;
struct Dummy;
enum CachePortal<V> {
Fresh,
Existing(Receiver<Dummy>),
Known(V),
}
impl<V> CachePortal<V> {}
enum CacheEntry<V> {
Waiting(SystemTime, Sender<Dummy>, Receiver<Dummy>),
Known(SystemTime, V),
}
impl<V> CacheEntry<V> {
fn ts(&self) -> &SystemTime {
match self {
CacheEntry::Waiting(ts, _, _) => ts,
CacheEntry::Known(ts, _) => ts,
}
}
}
struct CacheInner<K, V> {
map: BTreeMap<K, CacheEntry<V>>,
}
impl<K, V> CacheInner<K, V>
where
K: Ord,
{
const fn new() -> Self {
Self { map: BTreeMap::new() }
}
fn housekeeping(&mut self) {
if self.map.len() > 200 {
info!("trigger housekeeping with len {}", self.map.len());
let mut v: Vec<_> = self.map.iter().map(|(k, v)| (v.ts(), k)).collect();
v.sort();
let ts0 = v[v.len() / 2].0.clone();
//let tsnow = SystemTime::now();
//let tscut = tsnow.checked_sub(Duration::from_secs(60 * 10)).unwrap_or(tsnow);
self.map.retain(|_k, v| v.ts() >= &ts0);
info!("housekeeping kept len {}", self.map.len());
}
}
}
struct Cache<K, V> {
inner: Mutex<CacheInner<K, V>>,
}
impl<K, V> Cache<K, V>
where
K: Ord,
V: Clone,
{
const fn new() -> Self {
Self {
inner: Mutex::new(CacheInner::new()),
}
}
fn housekeeping(&self) {
let mut g = self.inner.lock().unwrap();
g.housekeeping();
}
fn portal(&self, key: K) -> CachePortal<V> {
use std::collections::btree_map::Entry;
let mut g = self.inner.lock().unwrap();
g.housekeeping();
match g.map.entry(key) {
Entry::Vacant(e) => {
let (tx, rx) = async_channel::bounded(16);
let ret = CachePortal::Fresh;
let v = CacheEntry::Waiting(SystemTime::now(), tx, rx);
e.insert(v);
ret
}
Entry::Occupied(e) => match e.get() {
CacheEntry::Waiting(_ts, _tx, rx) => CachePortal::Existing(rx.clone()),
CacheEntry::Known(_ts, v) => CachePortal::Known(v.clone()),
},
}
}
fn set_value(&self, key: K, val: V) {
let mut g = self.inner.lock().unwrap();
if let Some(e) = g.map.get_mut(&key) {
match e {
CacheEntry::Waiting(ts, tx, _rx) => {
let tx = tx.clone();
*e = CacheEntry::Known(*ts, val);
tx.close();
}
CacheEntry::Known(_ts, _val) => {
error!("set_value already known");
}
}
} else {
error!("set_value no entry for key");
}
}
}
static CACHE: Cache<u64, u64> = Cache::new();
pub struct MapPulseHisto {
_pulse: u64,
_tss: Vec<u64>,
@@ -41,6 +168,9 @@ const MAP_PULSE_LOCAL_URL_PREFIX: &'static str = "/api/1/map/pulse/local/";
const MAP_PULSE_MARK_CLOSED_URL_PREFIX: &'static str = "/api/1/map/pulse/mark/closed/";
const API_4_MAP_PULSE_URL_PREFIX: &'static str = "/api/4/map/pulse/";
const MAP_PULSE_LOCAL_TIMEOUT: Duration = Duration::from_millis(8000);
const MAP_PULSE_QUERY_TIMEOUT: Duration = Duration::from_millis(10000);
async fn make_tables(node_config: &NodeConfigCached) -> Result<(), Error> {
let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
let sql = "set client_min_messages = 'warning'";
@@ -389,7 +519,7 @@ impl IndexFullHttpFunction {
let n1 = files.len().min(3);
let m1 = files.len() - n1;
for ch in &files[m1..] {
info!(" index over {:?}", ch);
trace!(" index over {:?}", ch);
}
for mp in files[m1..].into_iter() {
match mp {
@@ -463,7 +593,12 @@ impl IndexFullHttpFunction {
}
}
}
info!("latest for {channel_name} {latest_pair:?}");
if channel_name.contains("SAT-CVME-TIFALL5:EvtSet")
|| channel_name.contains("SINSB04")
|| channel_name.contains("SINSB03")
{
info!("latest for {channel_name} {latest_pair:?}");
}
Ok(msg)
}
@@ -500,7 +635,7 @@ impl UpdateTaskGuard {
pub async fn abort_wait(&mut self) -> Result<(), Error> {
if let Some(jh) = self.jh.take() {
info!("UpdateTaskGuard::abort_wait");
let fut = tokio::time::timeout(Duration::from_millis(6000), async { jh.await });
let fut = tokio::time::timeout(Duration::from_millis(20000), async { jh.await });
Ok(fut.await???)
} else {
Ok(())
@@ -527,6 +662,7 @@ async fn update_task(do_abort: Arc<AtomicUsize>, node_config: NodeConfigCached)
break;
}
let ts1 = Instant::now();
CACHE.housekeeping();
match IndexFullHttpFunction::index(&node_config).await {
Ok(_) => {}
Err(e) => {
@@ -536,7 +672,7 @@ async fn update_task(do_abort: Arc<AtomicUsize>, node_config: NodeConfigCached)
}
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3;
info!("Done update task {:.0} ms", dt);
info!("Done update task {:.0}ms", dt);
}
Ok(())
}
@@ -721,7 +857,7 @@ impl HasBackend for MapPulseQuery {
impl HasTimeout for MapPulseQuery {
fn timeout(&self) -> Duration {
Duration::from_millis(2000)
MAP_PULSE_QUERY_TIMEOUT
}
}
@@ -860,6 +996,7 @@ impl MapPulseLocalHttpFunction {
let pulse: u64 = urls[MAP_PULSE_LOCAL_URL_PREFIX.len()..]
.parse()
.map_err(|_| Error::with_public_msg_no_trace(format!("can not understand pulse map url: {}", req.uri())))?;
let req_from = req.headers().get("x-req-from").map_or(None, |x| Some(format!("{x:?}")));
let ts1 = Instant::now();
let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
let sql = "select channel, hostname, timebin, split, ks from map_pulse_files where hostname = $1 and pulse_min <= $2 and (pulse_max >= $2 or closed = 0)";
@@ -875,9 +1012,10 @@ impl MapPulseLocalHttpFunction {
(channel, hostname, timebin as u32, split as u32, ks as u32)
})
.collect();
trace!(
"database query took {}s",
Instant::now().duration_since(ts1).as_secs_f32()
info!(
"map pulse local req-from {:?} candidate list in {:.0}ms",
req_from,
Instant::now().duration_since(ts1).as_secs_f32() * 1e3
);
//let mut msg = String::new();
//use std::fmt::Write;
@@ -1016,23 +1154,46 @@ impl MapPulseHistoHttpFunction {
node.host, node.port, MAP_PULSE_LOCAL_URL_PREFIX, pulse
);
let uri: Uri = s.parse()?;
let fut = hyper::Client::new().get(uri);
let fut = tokio::time::timeout(Duration::from_millis(1000), fut);
let req = Request::get(uri)
.header("x-req-from", &node_config.node.host)
.body(Body::empty())?;
let fut = hyper::Client::new().request(req);
//let fut = hyper::Client::new().get(uri);
let fut = tokio::time::timeout(MAP_PULSE_LOCAL_TIMEOUT, fut);
futs.push_back(fut);
}
use futures_util::stream::StreamExt;
let mut map = BTreeMap::new();
while let Some(Ok(Ok(res))) = futs.next().await {
if let Ok(b) = hyper::body::to_bytes(res.into_body()).await {
if let Ok(lm) = serde_json::from_slice::<LocalMap>(&b) {
for ts in lm.tss {
let a = map.get(&ts);
if let Some(&j) = a {
map.insert(ts, j + 1);
} else {
map.insert(ts, 1);
while let Some(futres) = futs.next().await {
match futres {
Ok(res) => match res {
Ok(res) => match hyper::body::to_bytes(res.into_body()).await {
Ok(body) => match serde_json::from_slice::<LocalMap>(&body) {
Ok(lm) => {
for ts in lm.tss {
let a = map.get(&ts);
if let Some(&j) = a {
map.insert(ts, j + 1);
} else {
map.insert(ts, 1);
}
}
}
Err(e) => {
error!("pulse map sub request pulse {pulse} serde error {e}");
}
},
Err(e) => {
error!("pulse map sub request pulse {pulse} body error {e}");
}
},
Err(e) => {
error!("pulse map sub request pulse {pulse} error {e}");
}
},
Err(e) => {
let _: Elapsed = e;
error!("pulse map sub request timed out pulse {pulse}");
}
}
}
@@ -1063,19 +1224,56 @@ impl MapPulseHttpFunction {
info!("MapPulseHttpFunction handle uri: {:?}", req.uri());
let urls = format!("{}", req.uri());
let pulse: u64 = urls[MAP_PULSE_URL_PREFIX.len()..].parse()?;
let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?;
let mut i1 = 0;
let mut max = 0;
for i2 in 0..histo.tss.len() {
if histo.counts[i2] > max {
max = histo.counts[i2];
i1 = i2;
match CACHE.portal(pulse) {
CachePortal::Fresh => {
info!("value not yet in cache pulse {pulse}");
let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?;
let mut i1 = 0;
let mut max = 0;
for i2 in 0..histo.tss.len() {
if histo.counts[i2] > max {
max = histo.counts[i2];
i1 = i2;
}
}
if max > 0 {
let val = histo.tss[i1];
CACHE.set_value(pulse, val);
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?)
} else {
Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?)
}
}
CachePortal::Existing(rx) => {
info!("waiting for already running pulse map pulse {pulse}");
match rx.recv().await {
Ok(_) => {
error!("should never recv from existing operation pulse {pulse}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
}
Err(_e) => {
info!("woken up while value wait pulse {pulse}");
match CACHE.portal(pulse) {
CachePortal::Known(val) => {
info!("good, value after wakeup pulse {pulse}");
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?)
}
CachePortal::Fresh => {
error!("woken up, but portal fresh pulse {pulse}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
}
CachePortal::Existing(..) => {
error!("woken up, but portal existing pulse {pulse}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
}
}
}
}
}
CachePortal::Known(val) => {
info!("value already in cache pulse {pulse} ts {val}");
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?)
}
}
if max > 0 {
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&histo.tss[i1])?))?)
} else {
Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?)
}
}
}
@@ -1103,7 +1301,69 @@ impl Api4MapPulseHttpFunction {
info!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri());
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = MapPulseQuery::from_url(&url)?;
let histo = MapPulseHistoHttpFunction::histo(q.pulse, node_config).await?;
let pulse = q.pulse;
let ret = match CACHE.portal(pulse) {
CachePortal::Fresh => {
info!("value not yet in cache pulse {pulse}");
let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?;
let mut i1 = 0;
let mut max = 0;
for i2 in 0..histo.tss.len() {
if histo.counts[i2] > max {
max = histo.counts[i2];
i1 = i2;
}
}
if histo.tss.len() > 1 {
warn!("Ambigious pulse map pulse {} histo {:?}", pulse, histo);
}
if max > 0 {
let val = histo.tss[i1];
CACHE.set_value(pulse, val);
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?)
} else {
Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?)
}
}
CachePortal::Existing(rx) => {
info!("waiting for already running pulse map pulse {pulse}");
match rx.recv().await {
Ok(_) => {
error!("should never recv from existing operation pulse {pulse}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
}
Err(_e) => {
info!("woken up while value wait pulse {pulse}");
match CACHE.portal(pulse) {
CachePortal::Known(val) => {
info!("good, value after wakeup pulse {pulse}");
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?)
}
CachePortal::Fresh => {
error!("woken up, but portal fresh pulse {pulse}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
}
CachePortal::Existing(..) => {
error!("woken up, but portal existing pulse {pulse}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
}
}
}
}
}
CachePortal::Known(val) => {
info!("value already in cache pulse {pulse} ts {val}");
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?)
}
};
let ts2 = Instant::now();
info!(
"Api4MapPulseHttpFunction took {:.2}s",
ts2.duration_since(ts1).as_secs_f32()
);
ret
/*let histo = MapPulseHistoHttpFunction::histo(q.pulse, node_config).await?;
let mut i1 = 0;
let mut max = 0;
for i2 in 0..histo.tss.len() {
@@ -1124,7 +1384,7 @@ impl Api4MapPulseHttpFunction {
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&histo.tss[i1])?))?)
} else {
Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?)
}
}*/
}
}

View File

@@ -55,6 +55,7 @@ pub const STATS_EVENTS_FRAME_TYPE_ID: u32 = 0x2400;
pub const ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID: u32 = 0x2500;
pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800;
pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900;
pub const DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID: u32 = 0x8a00;
pub fn bool_is_false(j: &bool) -> bool {
*j == false
@@ -525,7 +526,17 @@ pub trait TimeBinnableDyn:
}
pub trait TimeBinnableDynStub:
fmt::Debug + FramableInner + FrameType + FrameTypeInnerDyn + WithLen + RangeOverlapInfo + Any + AsAnyRef + Sync + Send + 'static
fmt::Debug
+ FramableInner
+ FrameType
+ FrameTypeInnerDyn
+ WithLen
+ RangeOverlapInfo
+ Any
+ AsAnyRef
+ Sync
+ Send
+ 'static
{
}
@@ -748,10 +759,16 @@ pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK
I64 => Box::new(K::<i64>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
_ => err::todoval(),
_ => {
error!("TODO for {:?} {:?} {:?}", scalar_type, shape, agg_kind);
err::todoval()
}
}
}
_ => err::todoval(),
_ => {
error!("TODO for {:?} {:?} {:?}", scalar_type, shape, agg_kind);
err::todoval()
}
},
Shape::Wave(_n) => match agg_kind {
AggKind::DimXBins1 => {
@@ -761,12 +778,36 @@ pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK
U8 => Box::new(K::<u8>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
_ => err::todoval(),
BOOL => Box::new(K::<bool>::empty()),
_ => {
error!("TODO for {:?} {:?} {:?}", scalar_type, shape, agg_kind);
err::todoval()
}
}
}
_ => err::todoval(),
AggKind::Plain => {
use ScalarType::*;
type K<T> = waveevents::WaveEvents<T>;
match scalar_type {
U8 => Box::new(K::<u8>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
BOOL => Box::new(K::<bool>::empty()),
_ => {
error!("TODO for {:?} {:?} {:?}", scalar_type, shape, agg_kind);
err::todoval()
}
}
}
_ => {
error!("TODO for {:?} {:?} {:?}", scalar_type, shape, agg_kind);
err::todoval()
}
},
Shape::Image(..) => err::todoval(),
Shape::Image(..) => {
error!("TODO for {:?} {:?} {:?}", scalar_type, shape, agg_kind);
err::todoval()
}
}
}

View File

@@ -1,7 +1,7 @@
use items_0::subfr::SubFrId;
use num_traits::{Bounded, Float, Zero};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use serde::Serialize;
use std::cmp::Ordering;
use std::fmt::Debug;
use std::ops::Add;
@@ -117,8 +117,8 @@ pub trait NumOps:
+ 'static
+ Unpin
+ Debug
+ Zero
+ Bounded
//+ Zero
//+ Bounded
+ PartialOrd
+ SubFrId
+ Serialize
@@ -128,10 +128,11 @@ pub trait NumOps:
fn min_or_nan() -> Self;
fn max_or_nan() -> Self;
fn is_nan(&self) -> bool;
fn zero() -> Self;
}
macro_rules! impl_num_ops {
($ty:ident, $min_or_nan:ident, $max_or_nan:ident, $is_nan:ident) => {
($ty:ident, $min_or_nan:ident, $max_or_nan:ident, $is_nan:ident, $zero:expr) => {
impl NumOps for $ty {
fn min_or_nan() -> Self {
$ty::$min_or_nan
@@ -142,16 +143,51 @@ macro_rules! impl_num_ops {
fn is_nan(&self) -> bool {
$is_nan(self)
}
fn zero() -> Self {
$zero
}
}
};
}
impl AsPrimF32 for bool {
fn as_prim_f32(&self) -> f32 {
if *self {
1.
} else {
0.
}
}
}
impl NumOps for bool {
fn min_or_nan() -> Self {
todo!()
}
fn max_or_nan() -> Self {
todo!()
}
fn is_nan(&self) -> bool {
todo!()
}
fn zero() -> Self {
false
}
}
fn is_nan_int<T>(_x: &T) -> bool {
false
}
fn is_nan_float<T: Float>(x: &T) -> bool {
x.is_nan()
fn is_nan_f32(x: &f32) -> bool {
f32::is_nan(*x)
}
fn is_nan_f64(x: &f64) -> bool {
f64::is_nan(*x)
}
pub trait AsPrimF32 {
@@ -192,18 +228,18 @@ impl AsPrimF32 for StringNum {
}
}
impl_num_ops!(u8, MIN, MAX, is_nan_int);
impl_num_ops!(u16, MIN, MAX, is_nan_int);
impl_num_ops!(u32, MIN, MAX, is_nan_int);
impl_num_ops!(u64, MIN, MAX, is_nan_int);
impl_num_ops!(i8, MIN, MAX, is_nan_int);
impl_num_ops!(i16, MIN, MAX, is_nan_int);
impl_num_ops!(i32, MIN, MAX, is_nan_int);
impl_num_ops!(i64, MIN, MAX, is_nan_int);
impl_num_ops!(f32, NAN, NAN, is_nan_float);
impl_num_ops!(f64, NAN, NAN, is_nan_float);
impl_num_ops!(BoolNum, MIN, MAX, is_nan_int);
impl_num_ops!(StringNum, MIN, MAX, is_nan_int);
impl_num_ops!(u8, MIN, MAX, is_nan_int, 0);
impl_num_ops!(u16, MIN, MAX, is_nan_int, 0);
impl_num_ops!(u32, MIN, MAX, is_nan_int, 0);
impl_num_ops!(u64, MIN, MAX, is_nan_int, 0);
impl_num_ops!(i8, MIN, MAX, is_nan_int, 0);
impl_num_ops!(i16, MIN, MAX, is_nan_int, 0);
impl_num_ops!(i32, MIN, MAX, is_nan_int, 0);
impl_num_ops!(i64, MIN, MAX, is_nan_int, 0);
impl_num_ops!(f32, NAN, NAN, is_nan_f32, 0.);
impl_num_ops!(f64, NAN, NAN, is_nan_f64, 0.);
impl_num_ops!(BoolNum, MIN, MAX, is_nan_int, BoolNum(0));
impl_num_ops!(StringNum, MIN, MAX, is_nan_int, StringNum(String::new()));
impl SubFrId for StringNum {
const SUB: u32 = 0x0d;

View File

@@ -42,6 +42,16 @@ impl_as_prim_f32!(i64);
impl_as_prim_f32!(f32);
impl_as_prim_f32!(f64);
impl AsPrimF32 for bool {
fn as_prim_f32_b(&self) -> f32 {
if *self {
1.
} else {
0.
}
}
}
pub trait ScalarOps:
fmt::Debug + Clone + PartialOrd + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static
{
@@ -49,7 +59,7 @@ pub trait ScalarOps:
fn equal_slack(&self, rhs: &Self) -> bool;
}
macro_rules! impl_num_ops {
macro_rules! impl_scalar_ops {
($ty:ident, $zero:expr, $equal_slack:ident) => {
impl ScalarOps for $ty {
fn zero_b() -> Self {
@@ -75,13 +85,18 @@ fn equal_f64(a: f64, b: f64) -> bool {
(a - b).abs() < 1e-6 || (a / b > 0.99999 && a / b < 1.00001)
}
impl_num_ops!(u8, 0, equal_int);
impl_num_ops!(u16, 0, equal_int);
impl_num_ops!(u32, 0, equal_int);
impl_num_ops!(u64, 0, equal_int);
impl_num_ops!(i8, 0, equal_int);
impl_num_ops!(i16, 0, equal_int);
impl_num_ops!(i32, 0, equal_int);
impl_num_ops!(i64, 0, equal_int);
impl_num_ops!(f32, 0., equal_f32);
impl_num_ops!(f64, 0., equal_f64);
fn equal_bool(a: bool, b: bool) -> bool {
a == b
}
impl_scalar_ops!(u8, 0, equal_int);
impl_scalar_ops!(u16, 0, equal_int);
impl_scalar_ops!(u32, 0, equal_int);
impl_scalar_ops!(u64, 0, equal_int);
impl_scalar_ops!(i8, 0, equal_int);
impl_scalar_ops!(i16, 0, equal_int);
impl_scalar_ops!(i32, 0, equal_int);
impl_scalar_ops!(i64, 0, equal_int);
impl_scalar_ops!(f32, 0., equal_f32);
impl_scalar_ops!(f64, 0., equal_f64);
impl_scalar_ops!(bool, false, equal_bool);

View File

@@ -41,3 +41,7 @@ impl SubFrId for f32 {
impl SubFrId for f64 {
const SUB: u32 = 0x0c;
}
impl SubFrId for bool {
const SUB: u32 = 0x0d;
}

View File

@@ -19,6 +19,7 @@ chrono = { version = "0.4.19", features = ["serde"] }
crc32fast = "1.3.2"
futures-util = "0.3.24"
tokio = { version = "1.20", features = ["rt-multi-thread", "sync", "time"] }
humantime-serde = "1.1.1"
err = { path = "../err" }
items = { path = "../items" }
items_0 = { path = "../items_0" }

View File

@@ -10,9 +10,12 @@ use items_0::AsAnyRef;
use netpod::log::*;
use netpod::BinnedRange;
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use serde::Serialize;
use std::any::Any;
use std::fmt;
use std::time::Duration;
use std::time::SystemTime;
// TODO maybe rename to ChannelStatus?
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -25,8 +28,6 @@ impl ConnStatus {
pub fn from_ca_ingest_status_kind(k: u32) -> Self {
match k {
1 => Self::Connect,
2 => Self::Disconnect,
3 => Self::Disconnect,
_ => Self::Disconnect,
}
}
@@ -35,12 +36,47 @@ impl ConnStatus {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ConnStatusEvent {
pub ts: u64,
#[serde(with = "humantime_serde")]
//pub datetime: chrono::DateTime<chrono::Utc>,
pub datetime: SystemTime,
pub status: ConnStatus,
}
impl ConnStatusEvent {
pub fn new(ts: u64, status: ConnStatus) -> Self {
Self { ts, status }
let datetime = SystemTime::UNIX_EPOCH + Duration::from_millis(ts / 1000000);
Self { ts, datetime, status }
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum ChannelStatus {
Connect,
Disconnect,
}
impl ChannelStatus {
pub fn from_ca_ingest_status_kind(k: u32) -> Self {
match k {
1 => Self::Connect,
_ => Self::Disconnect,
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ChannelStatusEvent {
pub ts: u64,
#[serde(with = "humantime_serde")]
//pub datetime: chrono::DateTime<chrono::Utc>,
pub datetime: SystemTime,
pub status: ChannelStatus,
}
impl ChannelStatusEvent {
pub fn new(ts: u64, status: ChannelStatus) -> Self {
let datetime = SystemTime::UNIX_EPOCH + Duration::from_millis(ts / 1000000);
Self { ts, datetime, status }
}
}
@@ -88,6 +124,7 @@ mod serde_channel_events {
use super::{ChannelEvents, Events};
use crate::channelevents::ConnStatusEvent;
use crate::eventsdim0::EventsDim0;
use crate::eventsdim1::EventsDim1;
use items_0::subfr::SubFrId;
use serde::de::{self, EnumAccess, VariantAccess, Visitor};
use serde::ser::SerializeSeq;
@@ -174,6 +211,22 @@ mod serde_channel_events {
let obj: EventsDim0<f64> = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
bool::SUB => {
let obj: EventsDim0<bool> = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
_ => Err(de::Error::custom(&format!("unknown nty {e1}"))),
}
} else if e0 == EventsDim1::<u8>::serde_id() {
match e1 {
f32::SUB => {
let obj: EventsDim1<f32> = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
bool::SUB => {
let obj: EventsDim1<bool> = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
_ => Err(de::Error::custom(&format!("unknown nty {e1}"))),
}
} else {
@@ -324,7 +377,9 @@ mod test_channel_events_serde {
use bincode::DefaultOptions;
use items_0::bincode;
use items_0::Empty;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use serde::Serialize;
use std::time::SystemTime;
#[test]
fn channel_events() {
@@ -382,6 +437,7 @@ mod test_channel_events_serde {
evs.push(12, 3, 3.2f32);
let status = ConnStatusEvent {
ts: 567,
datetime: SystemTime::UNIX_EPOCH,
status: crate::channelevents::ConnStatus::Connect,
};
let item = ChannelEvents::Status(status);

View File

@@ -0,0 +1,24 @@
use items::FrameType;
use items::FrameTypeInnerStatic;
use serde::Serialize;
pub struct DatabufferEventBlob {}
impl FrameTypeInnerStatic for DatabufferEventBlob {
const FRAME_TYPE_ID: u32 = items::DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID;
}
impl FrameType for DatabufferEventBlob {
fn frame_type_id(&self) -> u32 {
<Self as FrameTypeInnerStatic>::FRAME_TYPE_ID
}
}
impl Serialize for DatabufferEventBlob {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
}

1059
items_2/src/eventsdim1.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,9 @@
pub mod binsdim0;
pub mod binsxbindim0;
pub mod channelevents;
pub mod databuffereventblobs;
pub mod eventsdim0;
pub mod eventsdim1;
pub mod eventsxbindim0;
pub mod merger;
pub mod merger_cev;
@@ -222,23 +224,47 @@ pub fn empty_events_dyn_2(scalar_type: &ScalarType, shape: &Shape, agg_kind: &Ag
I64 => Box::new(K::<i64>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
BOOL => Box::new(K::<bool>::empty()),
_ => {
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
}
}
_ => {
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
},
Shape::Wave(..) => match agg_kind {
AggKind::Plain => {
use ScalarType::*;
type K<T> = eventsdim1::EventsDim1<T>;
match scalar_type {
U8 => Box::new(K::<u8>::empty()),
U16 => Box::new(K::<u16>::empty()),
U32 => Box::new(K::<u32>::empty()),
U64 => Box::new(K::<u64>::empty()),
I8 => Box::new(K::<i8>::empty()),
I16 => Box::new(K::<i16>::empty()),
I32 => Box::new(K::<i32>::empty()),
I64 => Box::new(K::<i64>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
BOOL => Box::new(K::<bool>::empty()),
_ => {
error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
}
}
_ => {
error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
},
Shape::Wave(..) => {
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
Shape::Image(..) => {
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
}
@@ -273,10 +299,33 @@ pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK
err::todoval()
}
},
Shape::Wave(..) => {
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
Shape::Wave(..) => match agg_kind {
AggKind::Plain => {
use ScalarType::*;
type K<T> = eventsdim1::EventsDim1<T>;
match scalar_type {
U8 => Box::new(K::<u8>::empty()),
U16 => Box::new(K::<u16>::empty()),
U32 => Box::new(K::<u32>::empty()),
U64 => Box::new(K::<u64>::empty()),
I8 => Box::new(K::<i8>::empty()),
I16 => Box::new(K::<i16>::empty()),
I32 => Box::new(K::<i32>::empty()),
I64 => Box::new(K::<i64>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
BOOL => Box::new(K::<bool>::empty()),
_ => {
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
}
}
_ => {
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
},
Shape::Image(..) => {
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()

View File

@@ -237,6 +237,7 @@ fn merge03() {
let inp2_events_a = {
let ev = ConnStatusEvent {
ts: 1199,
datetime: std::time::SystemTime::UNIX_EPOCH,
status: ConnStatus::Disconnect,
};
let item: Sitemty<ChannelEvents> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(
@@ -248,6 +249,7 @@ fn merge03() {
let inp2_events_b = {
let ev = ConnStatusEvent {
ts: 1199,
datetime: std::time::SystemTime::UNIX_EPOCH,
status: ConnStatus::Disconnect,
};
let item: Sitemty<ChannelEvents> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(

View File

@@ -1,14 +1,22 @@
use err::Error;
use futures_util::{Stream, StreamExt};
use items::frame::{decode_frame, make_term_frame};
use items::{EventQueryJsonStringFrame, Framable, RangeCompletableItem, Sitemty, StreamItem};
use futures_util::Stream;
use futures_util::StreamExt;
use items::eventfull::EventFull;
use items::frame::decode_frame;
use items::frame::make_term_frame;
use items::EventQueryJsonStringFrame;
use items::Framable;
use items::RangeCompletableItem;
use items::Sitemty;
use items::StreamItem;
use items_0::Empty;
use items_2::channelevents::ChannelEvents;
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::AggKind;
use netpod::{NodeConfigCached, PerfOpts};
use netpod::NodeConfigCached;
use netpod::PerfOpts;
use std::net::SocketAddr;
use std::pin::Pin;
use streams::frames::inmem::InMemoryFrameAsyncReadStream;
@@ -48,6 +56,125 @@ impl<E: Into<Error>> From<(E, OwnedWriteHalf)> for ConnErr {
}
}
async fn make_channel_events_stream(
evq: PlainEventsQuery,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
if evq.channel().backend() == "test-inmem" {
warn!("TEST BACKEND DATA");
use netpod::timeunits::MS;
let node_count = node_config.node_config.cluster.nodes.len();
let node_ix = node_config.ix;
if evq.channel().name() == "inmem-d0-i32" {
let mut item = items_2::eventsdim0::EventsDim0::<i32>::empty();
let td = MS * 10;
for i in 0..20 {
let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i;
let pulse = 1 + node_ix as u64 + node_count as u64 * i;
item.push(ts, pulse, pulse as _);
}
let item = ChannelEvents::Events(Box::new(item) as _);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
let stream = futures_util::stream::iter([item]);
Ok(Box::pin(stream))
} else if evq.channel().name() == "inmem-d0-f32" {
let mut item = items_2::eventsdim0::EventsDim0::<f32>::empty();
let td = MS * 10;
for i in 0..20 {
let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i;
let pulse = 1 + node_ix as u64 + node_count as u64 * i;
item.push(ts, pulse, ts as _);
}
let item = ChannelEvents::Events(Box::new(item) as _);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
let stream = futures_util::stream::iter([item]);
Ok(Box::pin(stream))
} else {
let stream = futures_util::stream::empty();
Ok(Box::pin(stream))
}
} else if let Some(conf) = &node_config.node_config.cluster.scylla {
// TODO depends in general on the query
// TODO why both in PlainEventsQuery and as separate parameter? Check other usages.
let do_one_before_range = false;
// TODO use better builder pattern with shortcuts for production and dev defaults
let f = dbconn::channelconfig::chconf_from_database(evq.channel(), node_config)
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scyco = conf;
let scy = scyllaconn::create_scy_session(scyco).await?;
let series = f.series;
let scalar_type = f.scalar_type;
let shape = f.shape;
let do_test_stream_error = false;
debug!("Make EventsStreamScylla for {series} {scalar_type:?} {shape:?}");
let stream = scyllaconn::events::EventsStreamScylla::new(
series,
evq.range().clone(),
do_one_before_range,
scalar_type,
shape,
scy,
do_test_stream_error,
);
let stream = stream
.map(move |item| match &item {
Ok(k) => match k {
ChannelEvents::Events(k) => {
let n = k.len();
let d = evq.event_delay();
(item, n, d.clone())
}
ChannelEvents::Status(_) => (item, 1, None),
},
Err(_) => (item, 1, None),
})
.then(|(item, n, d)| async move {
if let Some(d) = d {
warn!("sleep {} times {:?}", n, d);
tokio::time::sleep(d).await;
}
item
})
.map(|item| {
let item = match item {
Ok(item) => match item {
ChannelEvents::Events(item) => {
let item = ChannelEvents::Events(item);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
item
}
ChannelEvents::Status(item) => {
let item = ChannelEvents::Status(item);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
item
}
},
Err(e) => Err(e),
};
item
});
Ok(Box::pin(stream))
} else if let Some(_) = &node_config.node.channel_archiver {
let e = Error::with_msg_no_trace("archapp not built");
Err(e)
} else if let Some(_) = &node_config.node.archiver_appliance {
let e = Error::with_msg_no_trace("archapp not built");
Err(e)
} else {
Ok(disk::raw::conn::make_event_pipe(&evq, node_config).await?)
}
}
async fn make_event_blobs_stream(
evq: PlainEventsQuery,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
info!("make_event_blobs_stream");
let stream = disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await?;
Ok(stream)
}
async fn events_conn_handler_inner_try(
stream: TcpStream,
addr: SocketAddr,
@@ -117,138 +244,31 @@ async fn events_conn_handler_inner_try(
return Err((e, netout).into());
}
let p1: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>> = if evq.channel().backend() == "test-inmem" {
warn!("TEST BACKEND DATA");
use netpod::timeunits::MS;
let node_count = node_config.node_config.cluster.nodes.len();
let node_ix = node_config.ix;
if evq.channel().name() == "inmem-d0-i32" {
let mut item = items_2::eventsdim0::EventsDim0::<i32>::empty();
let td = MS * 10;
for i in 0..20 {
let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i;
let pulse = 1 + node_ix as u64 + node_count as u64 * i;
item.push(ts, pulse, pulse as _);
let mut stream: Pin<Box<dyn Stream<Item = Box<dyn Framable + Send>> + Send>> =
if let AggKind::EventBlobs = evq.agg_kind() {
match make_event_blobs_stream(evq, node_config).await {
Ok(stream) => {
let stream = stream.map(|x| Box::new(x) as _);
Box::pin(stream)
}
Err(e) => {
return Err((e, netout).into());
}
}
let item = ChannelEvents::Events(Box::new(item) as _);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
let stream = futures_util::stream::iter([item]);
Box::pin(stream)
} else if evq.channel().name() == "inmem-d0-f32" {
let mut item = items_2::eventsdim0::EventsDim0::<f32>::empty();
let td = MS * 10;
for i in 0..20 {
let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i;
let pulse = 1 + node_ix as u64 + node_count as u64 * i;
item.push(ts, pulse, ts as _);
}
let item = ChannelEvents::Events(Box::new(item) as _);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
let stream = futures_util::stream::iter([item]);
Box::pin(stream)
} else {
let stream = futures_util::stream::empty();
Box::pin(stream)
}
} else if let Some(conf) = &node_config.node_config.cluster.scylla {
// TODO depends in general on the query
// TODO why both in PlainEventsQuery and as separate parameter? Check other usages.
let do_one_before_range = false;
// TODO use better builder pattern with shortcuts for production and dev defaults
let f = dbconn::channelconfig::chconf_from_database(evq.channel(), node_config)
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")));
let f = match f {
Ok(k) => k,
Err(e) => return Err((e, netout))?,
};
let scyco = conf;
let scy = match scyllaconn::create_scy_session(scyco).await {
Ok(k) => k,
Err(e) => return Err((e, netout))?,
};
let series = f.series;
let scalar_type = f.scalar_type;
let shape = f.shape;
let do_test_stream_error = false;
debug!("Make EventsStreamScylla for {series} {scalar_type:?} {shape:?}");
let stream = scyllaconn::events::EventsStreamScylla::new(
series,
evq.range().clone(),
do_one_before_range,
scalar_type,
shape,
scy,
do_test_stream_error,
);
let stream = stream
.map(|item| match &item {
Ok(k) => match k {
ChannelEvents::Events(k) => {
let n = k.len();
let d = evq.event_delay();
(item, n, d.clone())
}
ChannelEvents::Status(_) => (item, 1, None),
},
Err(_) => (item, 1, None),
})
.then(|(item, n, d)| async move {
if let Some(d) = d {
warn!("sleep {} times {:?}", n, d);
tokio::time::sleep(d).await;
match make_channel_events_stream(evq, node_config).await {
Ok(stream) => {
let stream = stream.map(|x| Box::new(x) as _);
Box::pin(stream)
}
item
})
.map(|item| {
let item = match item {
Ok(item) => match item {
ChannelEvents::Events(item) => {
let item = ChannelEvents::Events(item);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
item
}
ChannelEvents::Status(item) => {
let item = ChannelEvents::Status(item);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
item
}
},
Err(e) => Err(e),
};
item
});
Box::pin(stream)
} else if let Some(_) = &node_config.node.channel_archiver {
let e = Error::with_msg_no_trace("archapp not built");
return Err((e, netout))?;
} else if let Some(_) = &node_config.node.archiver_appliance {
let e = Error::with_msg_no_trace("archapp not built");
return Err((e, netout))?;
} else {
let stream = match evq.agg_kind() {
AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await {
Ok(_stream) => {
let e = Error::with_msg_no_trace("TODO make_event_blobs_pipe");
return Err((e, netout))?;
Err(e) => {
return Err((e, netout).into());
}
Err(e) => return Err((e, netout))?,
},
_ => match disk::raw::conn::make_event_pipe(&evq, node_config).await {
Ok(j) => j,
Err(e) => return Err((e, netout))?,
},
}
};
stream
};
let p1 = p1.inspect(|x| {
items::on_sitemty_range_complete!(x, warn!("GOOD ----------- SEE RangeComplete in conn.rs"));
});
let mut p1 = p1;
let mut buf_len_histo = HistoLog2::new(5);
while let Some(item) = p1.next().await {
while let Some(item) = stream.next().await {
let item = item.make_frame();
match item {
Ok(buf) => {

View File

@@ -3,17 +3,13 @@ use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::Empty;
use items_0::Events;
use items_0::WithLen;
use items_2::channelevents::ChannelEvents;
use items_2::channelevents::ConnStatus;
use items_2::channelevents::ConnStatusEvent;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
use netpod::log::*;
use netpod::query::ChannelStateEventsQuery;
use netpod::timeunits::*;
use netpod::NanoRange;
use netpod::ScalarType;
use netpod::Shape;
@@ -61,7 +57,7 @@ async fn find_ts_msp(
macro_rules! read_next_scalar_values {
($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => {
async fn $fname(
series: i64,
series: u64,
ts_msp: u64,
range: NanoRange,
fwd: bool,
@@ -97,7 +93,10 @@ macro_rules! read_next_scalar_values {
" where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?"
);
let res = scy
.query(cql, (series, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64))
.query(
cql,
(series as i64, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64),
)
.await
.err_conv()?;
let mut last_before = None;
@@ -135,7 +134,7 @@ macro_rules! read_next_scalar_values {
" where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1"
);
let res = scy
.query(cql, (series, ts_msp as i64, ts_lsp_max as i64))
.query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64))
.await
.err_conv()?;
let mut seen_before = false;
@@ -166,37 +165,107 @@ macro_rules! read_next_scalar_values {
macro_rules! read_next_array_values {
($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => {
async fn $fname(
series: i64,
series: u64,
ts_msp: u64,
_range: NanoRange,
_fwd: bool,
range: NanoRange,
fwd: bool,
scy: Arc<ScySession>,
) -> Result<EventsDim0<$st>, Error> {
// TODO change return type: so far EventsDim1 does not exist.
error!("TODO read_next_array_values");
err::todo();
if true {
return Err(Error::with_msg_no_trace("redo based on scalar case"));
}
) -> Result<EventsDim1<$st>, Error> {
type ST = $st;
type _SCYTY = $scyty;
trace!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp);
let cql = concat!(
"select ts_lsp, pulse, value from ",
$table_name,
" where series = ? and ts_msp = ?"
);
let _res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?;
let ret = EventsDim0::<ST>::empty();
/*
for row in res.rows_typed_or_empty::<(i64, i64, Vec<SCYTY>)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = row.2.into_iter().map(|x| x as ST).collect();
ret.push(ts, pulse, value);
type SCYTY = $scyty;
if ts_msp >= range.end {
warn!(
"given ts_msp {} >= range.end {} not necessary to read this",
ts_msp, range.end
);
}
*/
if range.end > i64::MAX as u64 {
return Err(Error::with_msg_no_trace(format!("range.end overflows i64")));
}
let ret = if fwd {
let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 };
let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 };
trace!(
"FWD {} ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {} {}",
stringify!($fname),
ts_msp,
ts_lsp_min,
ts_lsp_max,
range.beg,
range.end,
stringify!($table_name)
);
// TODO use prepared!
let cql = concat!(
"select ts_lsp, pulse, value from ",
$table_name,
" where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?"
);
let res = scy
.query(
cql,
(series as i64, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64),
)
.await
.err_conv()?;
let mut last_before = None;
let mut ret = EventsDim1::<ST>::empty();
for row in res.rows_typed_or_empty::<(i64, i64, Vec<SCYTY>)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = row.2.into_iter().map(|x| x as ST).collect();
if ts >= range.end {
} else if ts >= range.beg {
ret.push(ts, pulse, value);
} else {
if last_before.is_none() {
warn!("encounter event before range in forward read {ts}");
}
last_before = Some((ts, pulse, value));
}
}
ret
} else {
let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 };
trace!(
"BCK {} ts_msp {} ts_lsp_max {} beg {} end {} {}",
stringify!($fname),
ts_msp,
ts_lsp_max,
range.beg,
range.end,
stringify!($table_name)
);
// TODO use prepared!
let cql = concat!(
"select ts_lsp, pulse, value from ",
$table_name,
" where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1"
);
let res = scy
.query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64))
.await
.err_conv()?;
let mut seen_before = false;
let mut ret = EventsDim1::<ST>::empty();
for row in res.rows_typed_or_empty::<(i64, i64, Vec<SCYTY>)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = row.2.into_iter().map(|x| x as ST).collect();
if ts >= range.end {
} else if ts < range.beg {
ret.push(ts, pulse, value);
} else {
if !seen_before {
warn!("encounter event before range in forward read {ts}");
}
seen_before = true;
}
}
ret
};
trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp);
Ok(ret)
}
@@ -214,7 +283,8 @@ read_next_scalar_values!(read_next_values_scalar_i64, i64, i64, "events_scalar_i
read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32");
read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64");
read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16");
read_next_array_values!(read_next_values_array_u16, u16, i16, "events_array_u16");
read_next_array_values!(read_next_values_array_bool, bool, bool, "events_array_bool");
macro_rules! read_values {
($fname:ident, $self:expr, $ts_msp:expr) => {{
@@ -234,19 +304,20 @@ macro_rules! read_values {
}
struct ReadValues {
series: i64,
series: u64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
ts_msps: VecDeque<u64>,
fwd: bool,
fut: Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>,
fut_done: bool,
scy: Arc<ScySession>,
}
impl ReadValues {
fn new(
series: i64,
series: u64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
@@ -264,6 +335,7 @@ impl ReadValues {
fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace(
"future not initialized",
)))),
fut_done: false,
scy,
};
ret.next();
@@ -273,6 +345,7 @@ impl ReadValues {
fn next(&mut self) -> bool {
if let Some(ts_msp) = self.ts_msps.pop_front() {
self.fut = self.make_fut(ts_msp);
self.fut_done = false;
true
} else {
false
@@ -321,6 +394,10 @@ impl ReadValues {
ScalarType::U16 => {
read_values!(read_next_values_array_u16, self, ts_msp)
}
ScalarType::BOOL => {
info!("attempt to read bool");
read_values!(read_next_values_array_bool, self, ts_msp)
}
_ => {
error!("TODO ReadValues add more types");
err::todoval()
@@ -401,7 +478,7 @@ impl EventsStreamScylla {
if let Some(msp) = self.ts_msp_b1.clone() {
trace!("Try ReadBack1");
let st = ReadValues::new(
self.series as i64,
self.series,
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
@@ -413,7 +490,7 @@ impl EventsStreamScylla {
} else if self.ts_msps.len() >= 1 {
trace!("Go straight for forward read");
let st = ReadValues::new(
self.series as i64,
self.series,
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
@@ -433,7 +510,7 @@ impl EventsStreamScylla {
self.outqueue.push_back(item);
if self.ts_msps.len() > 0 {
let st = ReadValues::new(
self.series as i64,
self.series,
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
@@ -449,7 +526,7 @@ impl EventsStreamScylla {
if let Some(msp) = self.ts_msp_b2.clone() {
trace!("Try ReadBack2");
let st = ReadValues::new(
self.series as i64,
self.series,
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
@@ -461,7 +538,7 @@ impl EventsStreamScylla {
} else if self.ts_msps.len() >= 1 {
trace!("No 2nd back MSP, go for forward read");
let st = ReadValues::new(
self.series as i64,
self.series,
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
@@ -483,7 +560,7 @@ impl EventsStreamScylla {
}
if self.ts_msps.len() >= 1 {
let st = ReadValues::new(
self.series as i64,
self.series,
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
@@ -534,10 +611,12 @@ impl Stream for EventsStreamScylla {
},
FrState::ReadBack1(ref mut st) => match st.fut.poll_unpin(cx) {
Ready(Ok(item)) => {
st.fut_done = true;
self.back_1_done(item);
continue;
}
Ready(Err(e)) => {
st.fut_done = true;
self.state = FrState::Done;
Ready(Some(Err(e)))
}
@@ -545,10 +624,12 @@ impl Stream for EventsStreamScylla {
},
FrState::ReadBack2(ref mut st) => match st.fut.poll_unpin(cx) {
Ready(Ok(item)) => {
st.fut_done = true;
self.back_2_done(item);
continue;
}
Ready(Err(e)) => {
st.fut_done = true;
self.state = FrState::Done;
Ready(Some(Err(e)))
}
@@ -556,6 +637,7 @@ impl Stream for EventsStreamScylla {
},
FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) {
Ready(Ok(item)) => {
st.fut_done = true;
if !st.next() {
trace!("ReadValues exhausted");
self.state = FrState::Done;
@@ -565,7 +647,10 @@ impl Stream for EventsStreamScylla {
}
continue;
}
Ready(Err(e)) => Ready(Some(Err(e))),
Ready(Err(e)) => {
st.fut_done = true;
Ready(Some(Err(e)))
}
Pending => Pending,
},
FrState::Done => Ready(None),
@@ -573,57 +658,3 @@ impl Stream for EventsStreamScylla {
}
}
}
async fn _channel_state_events(
evq: &ChannelStateEventsQuery,
scy: Arc<ScySession>,
) -> Result<Pin<Box<dyn Stream<Item = Result<ConnStatusEvent, Error>> + Send>>, Error> {
let (tx, rx) = async_channel::bounded(8);
let evq = evq.clone();
let fut = async move {
let div = DAY;
let mut ts_msp = evq.range().beg / div * div;
loop {
let series = (evq
.channel()
.series()
.ok_or(Error::with_msg_no_trace(format!("series id not given"))))?;
let params = (series as i64, ts_msp as i64);
let mut res = scy
.query_iter(
"select ts_lsp, kind from channel_status where series = ? and ts_msp = ?",
params,
)
.await
.err_conv()?;
while let Some(row) = res.next().await {
let row = row.err_conv()?;
let (ts_lsp, kind): (i64, i32) = row.into_typed().err_conv()?;
let ts = ts_msp + ts_lsp as u64;
let kind = kind as u32;
if ts >= evq.range().beg && ts < evq.range().end {
let status = match kind {
1 => ConnStatus::Connect,
2 => ConnStatus::Disconnect,
_ => {
let e = Error::with_msg_no_trace(format!("bad status kind {kind}"));
let e2 = Error::with_msg_no_trace(format!("bad status kind {kind}"));
let _ = tx.send(Err(e)).await;
return Err(e2);
}
};
let ev = ConnStatusEvent { ts, status };
tx.send(Ok(ev)).await.map_err(|e| format!("{e}"))?;
}
}
ts_msp += div;
if ts_msp >= evq.range().end {
break;
}
}
Ok(())
};
// TODO join the task (better: rewrite as proper stream)
tokio::spawn(fut);
Ok(Box::pin(rx))
}

View File

@@ -1,7 +1,10 @@
use crate::errconv::ErrConv;
use err::Error;
use futures_util::{Future, FutureExt, Stream};
use items_2::channelevents::{ConnStatus, ConnStatusEvent};
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use items_2::channelevents::ChannelStatus;
use items_2::channelevents::ChannelStatusEvent;
use netpod::log::*;
use netpod::NanoRange;
use netpod::CONNECTION_STATUS_DIV;
@@ -9,7 +12,10 @@ use scylla::Session as ScySession;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::SystemTime;
async fn read_next_status_events(
series: u64,
@@ -18,7 +24,7 @@ async fn read_next_status_events(
fwd: bool,
do_one_before: bool,
scy: Arc<ScySession>,
) -> Result<VecDeque<ConnStatusEvent>, Error> {
) -> Result<VecDeque<ChannelStatusEvent>, Error> {
if ts_msp >= range.end {
warn!(
"given ts_msp {} >= range.end {} not necessary to read this",
@@ -73,10 +79,10 @@ async fn read_next_status_events(
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let kind = row.1 as u32;
// from netfetch::store::ChannelStatus
let ev = ConnStatusEvent {
let ev = ChannelStatusEvent {
ts,
status: ConnStatus::from_ca_ingest_status_kind(kind),
datetime: SystemTime::UNIX_EPOCH + Duration::from_millis(ts / 1000000),
status: ChannelStatus::from_ca_ingest_status_kind(kind),
};
if ts >= range.end {
} else if ts >= range.beg {
@@ -101,7 +107,7 @@ struct ReadValues {
ts_msps: VecDeque<u64>,
fwd: bool,
do_one_before_range: bool,
fut: Pin<Box<dyn Future<Output = Result<VecDeque<ConnStatusEvent>, Error>> + Send>>,
fut: Pin<Box<dyn Future<Output = Result<VecDeque<ChannelStatusEvent>, Error>> + Send>>,
scy: Arc<ScySession>,
}
@@ -142,7 +148,7 @@ impl ReadValues {
fn make_fut(
&mut self,
ts_msp: u64,
) -> Pin<Box<dyn Future<Output = Result<VecDeque<ConnStatusEvent>, Error>> + Send>> {
) -> Pin<Box<dyn Future<Output = Result<VecDeque<ChannelStatusEvent>, Error>> + Send>> {
info!("make fut for {ts_msp}");
let fut = read_next_status_events(
self.series,
@@ -168,7 +174,7 @@ pub struct StatusStreamScylla {
range: NanoRange,
do_one_before_range: bool,
scy: Arc<ScySession>,
outbuf: VecDeque<ConnStatusEvent>,
outbuf: VecDeque<ChannelStatusEvent>,
}
impl StatusStreamScylla {
@@ -185,7 +191,7 @@ impl StatusStreamScylla {
}
impl Stream for StatusStreamScylla {
type Item = Result<ConnStatusEvent, Error>;
type Item = Result<ChannelStatusEvent, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;

View File

@@ -4,6 +4,8 @@ use std::fs;
use std::io::{BufWriter, Read, Seek, SeekFrom, Stdin, Write};
use std::path::{Path, PathBuf};
const MAX_PER_FILE: u64 = 1024 * 1024 * 2;
pub struct Buffer {
buf: Vec<u8>,
wp: usize,
@@ -97,7 +99,7 @@ impl Buffer {
}
fn parse_lines(buf: &[u8]) -> Result<(Vec<Cow<str>>, usize), Error> {
let mut ret = vec![];
let mut ret = Vec::new();
let mut i1 = 0;
let mut i2 = 0;
while i1 < buf.len() {
@@ -134,9 +136,6 @@ fn parse_lines(buf: &[u8]) -> Result<(Vec<Cow<str>>, usize), Error> {
Ok((ret, i2))
}
const MAX_PER_FILE: u64 = 1024 * 1024 * 2;
const MAX_TOTAL_SIZE: u64 = 1024 * 1024 * 20;
struct Fileinfo {
path: PathBuf,
name: String,
@@ -144,7 +143,7 @@ struct Fileinfo {
}
fn file_list(dir: &Path) -> Result<Vec<Fileinfo>, Error> {
let mut ret = vec![];
let mut ret = Vec::new();
let rd = fs::read_dir(&dir)?;
for e in rd {
let e = e?;
@@ -191,7 +190,7 @@ fn next_file(dir: &Path) -> Result<BufWriter<fs::File>, Error> {
Ok(ret)
}
pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> {
pub fn append_inner(dirname: &str, total_size_max: u64, mut stdin: Stdin) -> Result<(), Error> {
let mut bytes_written = 0;
let dir = PathBuf::from(dirname);
let mut fout = open_latest_or_new(&dir)?;
@@ -265,7 +264,7 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> {
let l1 = fout.seek(SeekFrom::End(0))?;
if l1 >= MAX_PER_FILE {
let rd = fs::read_dir(&dir)?;
let mut w = vec![];
let mut w = Vec::new();
for e in rd {
let e = e?;
let fnos = e.file_name();
@@ -282,7 +281,7 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> {
let mut lentot = w.iter().map(|g| g.1).fold(0, |a, x| a + x);
write!(&mut fout, "[APPEND-LENTOT] {}\n", lentot)?;
for q in w {
if lentot <= MAX_TOTAL_SIZE as u64 {
if lentot <= total_size_max {
break;
}
write!(&mut fout, "[APPEND-REMOVE] {} {}\n", q.1, q.0.to_string_lossy())?;
@@ -303,8 +302,8 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> {
}
}
pub fn append(dirname: &str, stdin: Stdin) -> Result<(), Error> {
match append_inner(dirname, stdin) {
pub fn append(dirname: &str, total_size_max: u64, stdin: Stdin) -> Result<(), Error> {
match append_inner(dirname, total_size_max, stdin) {
Ok(k) => {
eprintln!("append_inner has returned");
Ok(k)