This commit is contained in:
Dominik Werder
2023-02-24 13:32:19 +01:00
parent 0f29eac2b5
commit 2e286eb28e
23 changed files with 399 additions and 257 deletions

View File

@@ -3,7 +3,7 @@ pub mod scan;
pub mod search;
pub mod pg {
pub use tokio_postgres::{Client, Error};
pub use tokio_postgres::{Client, Error, NoTls};
}
use err::Error;
@@ -11,9 +11,9 @@ use netpod::log::*;
use netpod::TableSizes;
use netpod::{Channel, Database, NodeConfigCached};
use netpod::{ScalarType, Shape};
use pg::{Client as PgClient, NoTls};
use std::sync::Arc;
use std::time::Duration;
use tokio_postgres::{Client, Client as PgClient, NoTls};
trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
@@ -49,7 +49,7 @@ pub async fn delay_io_medium() {
delay_us(2000).await;
}
pub async fn create_connection(db_config: &Database) -> Result<Client, Error> {
pub async fn create_connection(db_config: &Database) -> Result<PgClient, Error> {
// TODO use a common already running worker pool for these queries:
let d = db_config;
let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name);
@@ -135,7 +135,7 @@ pub async fn random_channel(node_config: &NodeConfigCached) -> Result<String, Er
Ok(rows[0].get(0))
}
pub async fn insert_channel(name: String, facility: i64, dbc: &Client) -> Result<(), Error> {
pub async fn insert_channel(name: String, facility: i64, dbc: &PgClient) -> Result<(), Error> {
let rows = dbc
.query(
"select count(rowid) from channels where facility = $1 and name = $2",

View File

@@ -1,23 +1,37 @@
use crate::{create_connection, delay_io_medium, delay_io_short, ErrConv};
use async_channel::{bounded, Receiver};
use chrono::{DateTime, Utc};
use crate::create_connection;
use crate::delay_io_medium;
use crate::delay_io_short;
use crate::ErrConv;
use async_channel::bounded;
use async_channel::Receiver;
use chrono::DateTime;
use chrono::Utc;
use err::Error;
use futures_util::{FutureExt, Stream};
use futures_util::FutureExt;
use futures_util::Stream;
use netpod::log::*;
use netpod::{Database, NodeConfigCached};
use netpod::Database;
use netpod::NodeConfigCached;
use parse::channelconfig::NErr;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use serde::Serialize;
use std::future::Future;
use std::io::ErrorKind;
use std::os::unix::ffi::OsStringExt;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use tokio::fs::{DirEntry, ReadDir};
use std::sync::Arc;
use std::sync::RwLock;
use std::task::Context;
use std::task::Poll;
use tokio::fs::DirEntry;
use tokio::fs::ReadDir;
use tokio_postgres::Client;
mod updatechannelnames;
#[derive(Debug, Serialize, Deserialize)]
pub struct NodeDiskIdent {
pub rowid: i64,
@@ -89,9 +103,10 @@ pub async fn get_node_disk_ident_2(
pub struct FindChannelNamesFromConfigReadDir {
#[pin]
read_dir_fut: Option<Pin<Box<dyn Future<Output = std::io::Result<ReadDir>> + Send>>>,
read_dir: Option<ReadDir>,
#[pin]
dir_entry_fut: Option<Pin<Box<dyn Future<Output = std::io::Result<Option<DirEntry>>> + Send>>>,
read_dir: Option<Pin<Box<ReadDir>>>,
#[pin]
done: bool,
}
impl FindChannelNamesFromConfigReadDir {
@@ -99,7 +114,7 @@ impl FindChannelNamesFromConfigReadDir {
Self {
read_dir_fut: Some(Box::pin(tokio::fs::read_dir(base_dir.as_ref().join("config")))),
read_dir: None,
dir_entry_fut: None,
done: false,
}
}
}
@@ -109,35 +124,36 @@ impl Stream for FindChannelNamesFromConfigReadDir {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let span = span!(Level::INFO, "FindChNameCfgDir");
let _spg = span.enter();
let mut pself = self.project();
loop {
break if let Some(fut) = pself.dir_entry_fut.as_mut().as_pin_mut() {
match fut.poll(cx) {
Ready(Ok(Some(item))) => {
let g = unsafe { &mut *(pself.read_dir.as_mut().unwrap() as *mut ReadDir) };
let fut = g.next_entry();
*pself.dir_entry_fut = Some(Box::pin(fut));
Ready(Some(Ok(item)))
}
break if *pself.done {
Ready(None)
} else if let Some(mut fut) = pself.read_dir.as_mut().as_pin_mut() {
match fut.poll_next_entry(cx) {
Ready(Ok(Some(item))) => Ready(Some(Ok(item))),
Ready(Ok(None)) => {
*pself.dir_entry_fut = None;
*pself.done = true;
Ready(None)
}
Ready(Err(e)) => Ready(Some(Err(e.into()))),
Ready(Err(e)) => {
*pself.done = true;
Ready(Some(Err(e.into())))
}
Pending => Pending,
}
} else if let Some(fut) = pself.read_dir_fut.as_mut().as_pin_mut() {
match fut.poll(cx) {
Ready(Ok(item)) => {
*pself.read_dir_fut = None;
*pself.read_dir = Some(item);
//let fut = pself.read_dir.as_mut().unwrap().next_entry();
let g = unsafe { &mut *(pself.read_dir.as_mut().unwrap() as *mut ReadDir) };
let fut = g.next_entry();
*pself.dir_entry_fut = Some(Box::pin(fut));
*pself.read_dir = Some(Box::pin(item));
continue;
}
Ready(Err(e)) => Ready(Some(Err(e.into()))),
Ready(Err(e)) => {
*pself.done = true;
Ready(Some(Err(e.into())))
}
Pending => Pending,
}
} else {
@@ -161,166 +177,7 @@ where
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdatedDbWithChannelNames {
msg: String,
count: u32,
}
#[pin_project]
pub struct UpdatedDbWithChannelNamesStream {
errored: bool,
data_complete: bool,
#[allow(dead_code)]
node_config: Pin<Box<NodeConfigCached>>,
// TODO can we pass a Pin to the async fn instead of creating static ref?
node_config_ref: &'static NodeConfigCached,
#[pin]
client_fut: Option<Pin<Box<dyn Future<Output = Result<Client, Error>> + Send>>>,
#[pin]
client: Option<Client>,
client_ref: Option<&'static Client>,
#[pin]
ident_fut: Option<Pin<Box<dyn Future<Output = Result<NodeDiskIdent, Error>> + Send>>>,
ident: Option<NodeDiskIdent>,
#[pin]
find: Option<FindChannelNamesFromConfigReadDir>,
#[pin]
update_batch: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
channel_inp_done: bool,
clist: Vec<String>,
}
impl UpdatedDbWithChannelNamesStream {
pub fn new(node_config: NodeConfigCached) -> Result<Self, Error> {
let node_config = Box::pin(node_config.clone());
let node_config_ref = unsafe { &*(&node_config as &NodeConfigCached as *const _) };
let mut ret = Self {
errored: false,
data_complete: false,
node_config,
node_config_ref,
client_fut: None,
client: None,
client_ref: None,
ident_fut: None,
ident: None,
find: None,
update_batch: None,
channel_inp_done: false,
clist: Vec::new(),
};
ret.client_fut = Some(Box::pin(create_connection(
&ret.node_config_ref.node_config.cluster.database,
)));
Ok(ret)
}
}
impl Stream for UpdatedDbWithChannelNamesStream {
type Item = Result<UpdatedDbWithChannelNames, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let mut pself = self.project();
loop {
break if *pself.errored {
Ready(None)
} else if *pself.data_complete {
Ready(None)
} else if let Some(fut) = pself.find.as_mut().as_pin_mut() {
match fut.poll_next(cx) {
Ready(Some(Ok(item))) => {
pself
.clist
.push(String::from_utf8(item.file_name().into_vec()).unwrap());
continue;
}
Ready(Some(Err(e))) => {
*pself.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => {
*pself.channel_inp_done = true;
// Work through the collected items
let l = std::mem::replace(pself.clist, Vec::new());
let fut = update_db_with_channel_name_list(
l,
pself.ident.as_ref().unwrap().facility,
pself.client.as_ref().get_ref().as_ref().unwrap(),
);
// TODO
//pself.update_batch.replace(Box::pin(fut));
let _ = fut;
continue;
}
Pending => Pending,
}
} else if let Some(fut) = pself.ident_fut.as_mut().as_pin_mut() {
match fut.poll(cx) {
Ready(Ok(item)) => {
*pself.ident_fut = None;
*pself.ident = Some(item);
let ret = UpdatedDbWithChannelNames {
msg: format!("Got ident {:?}", pself.ident),
count: 43,
};
let base_path = &pself
.node_config
.node
.sf_databuffer
.as_ref()
.ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))?
.data_base_path;
let s = FindChannelNamesFromConfigReadDir::new(base_path);
*pself.find = Some(s);
Ready(Some(Ok(ret)))
}
Ready(Err(e)) => {
*pself.errored = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
} else if let Some(fut) = pself.client_fut.as_mut().as_pin_mut() {
match fut.poll(cx) {
Ready(Ok(item)) => {
*pself.client_fut = None;
//*pself.client = Some(Box::pin(item));
//*pself.client_ref = Some(unsafe { &*(&pself.client.as_ref().unwrap() as &Client as *const _) });
*pself.client = Some(item);
let c2: &Client = pself.client.as_ref().get_ref().as_ref().unwrap();
*pself.client_ref = Some(unsafe { &*(c2 as *const _) });
//() == pself.node_config.as_ref();
//() == pself.client.as_ref().as_pin_ref().unwrap();
/* *pself.ident_fut = Some(Box::pin(get_node_disk_ident_2(
pself.node_config.as_ref(),
pself.client.as_ref().as_pin_ref().unwrap(),
)));*/
*pself.ident_fut = Some(Box::pin(get_node_disk_ident(
pself.node_config_ref,
pself.client_ref.as_ref().unwrap(),
)));
let ret = UpdatedDbWithChannelNames {
msg: format!("Client opened connection"),
count: 42,
};
Ready(Some(Ok(ret)))
}
Ready(Err(e)) => {
*pself.errored = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
} else {
Ready(None)
};
}
}
}
#[allow(unused)]
async fn update_db_with_channel_name_list(list: Vec<String>, backend: i64, dbc: &Client) -> Result<(), Error> {
delay_io_short().await;
dbc.query("begin", &[]).await.err_conv()?;
@@ -336,6 +193,12 @@ async fn update_db_with_channel_name_list(list: Vec<String>, backend: i64, dbc:
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdatedDbWithChannelNames {
msg: String,
count: u32,
}
pub async fn update_db_with_channel_names(
node_config: NodeConfigCached,
db_config: &Database,
@@ -413,8 +276,8 @@ pub async fn update_db_with_channel_names(
Ok(rx)
}
pub fn update_db_with_channel_names_3<'a>(
node_config: &'a NodeConfigCached,
pub fn update_db_with_channel_names_3(
node_config: &NodeConfigCached,
) -> impl Stream<Item = Result<UpdatedDbWithChannelNames, Error>> + 'static {
let base_path = &node_config
.node
@@ -559,7 +422,7 @@ pub enum UpdateChannelConfigResult {
/**
Parse the config of the given channel and update database.
*/
pub async fn update_db_with_channel_config(
async fn update_db_with_channel_config(
node_config: &NodeConfigCached,
node_disk_ident: &NodeDiskIdent,
channel_id: i64,

View File

@@ -0,0 +1,171 @@
use super::get_node_disk_ident;
use super::update_db_with_channel_name_list;
use super::FindChannelNamesFromConfigReadDir;
use super::NodeDiskIdent;
use super::UpdatedDbWithChannelNames;
use crate::create_connection;
use crate::pg::Client as PgClient;
use err::Error;
use futures_util::Future;
use futures_util::Stream;
use netpod::NodeConfigCached;
use pin_project::pin_project;
use std::os::unix::prelude::OsStringExt;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[pin_project]
struct UpdatedDbWithChannelNamesStream {
errored: bool,
data_complete: bool,
#[allow(dead_code)]
node_config: Pin<Box<NodeConfigCached>>,
// TODO can we pass a Pin to the async fn instead of creating static ref?
node_config_ref: &'static NodeConfigCached,
#[pin]
client_fut: Option<Pin<Box<dyn Future<Output = Result<PgClient, Error>> + Send>>>,
#[pin]
client: Option<PgClient>,
client_ref: Option<&'static PgClient>,
#[pin]
ident_fut: Option<Pin<Box<dyn Future<Output = Result<NodeDiskIdent, Error>> + Send>>>,
ident: Option<NodeDiskIdent>,
#[pin]
find: Option<FindChannelNamesFromConfigReadDir>,
#[pin]
update_batch: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
channel_inp_done: bool,
clist: Vec<String>,
}
impl UpdatedDbWithChannelNamesStream {
#[allow(unused)]
fn new(node_config: NodeConfigCached) -> Result<Self, Error> {
let node_config = Box::pin(node_config.clone());
let node_config_ref = unsafe { &*(&node_config as &NodeConfigCached as *const _) };
let mut ret = Self {
errored: false,
data_complete: false,
node_config,
node_config_ref,
client_fut: None,
client: None,
client_ref: None,
ident_fut: None,
ident: None,
find: None,
update_batch: None,
channel_inp_done: false,
clist: Vec::new(),
};
ret.client_fut = Some(Box::pin(create_connection(
&ret.node_config_ref.node_config.cluster.database,
)));
Ok(ret)
}
}
impl Stream for UpdatedDbWithChannelNamesStream {
type Item = Result<UpdatedDbWithChannelNames, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let mut pself = self.project();
loop {
break if *pself.errored {
Ready(None)
} else if *pself.data_complete {
Ready(None)
} else if let Some(fut) = pself.find.as_mut().as_pin_mut() {
match fut.poll_next(cx) {
Ready(Some(Ok(item))) => {
pself
.clist
.push(String::from_utf8(item.file_name().into_vec()).unwrap());
continue;
}
Ready(Some(Err(e))) => {
*pself.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => {
*pself.channel_inp_done = true;
// Work through the collected items
let l = std::mem::replace(pself.clist, Vec::new());
let fut = update_db_with_channel_name_list(
l,
pself.ident.as_ref().unwrap().facility,
pself.client.as_ref().get_ref().as_ref().unwrap(),
);
// TODO
//pself.update_batch.replace(Box::pin(fut));
let _ = fut;
continue;
}
Pending => Pending,
}
} else if let Some(fut) = pself.ident_fut.as_mut().as_pin_mut() {
match fut.poll(cx) {
Ready(Ok(item)) => {
*pself.ident_fut = None;
*pself.ident = Some(item);
let ret = UpdatedDbWithChannelNames {
msg: format!("Got ident {:?}", pself.ident),
count: 43,
};
let base_path = &pself
.node_config
.node
.sf_databuffer
.as_ref()
.ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))?
.data_base_path;
let s = FindChannelNamesFromConfigReadDir::new(base_path);
*pself.find = Some(s);
Ready(Some(Ok(ret)))
}
Ready(Err(e)) => {
*pself.errored = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
} else if let Some(fut) = pself.client_fut.as_mut().as_pin_mut() {
match fut.poll(cx) {
Ready(Ok(item)) => {
*pself.client_fut = None;
//*pself.client = Some(Box::pin(item));
//*pself.client_ref = Some(unsafe { &*(&pself.client.as_ref().unwrap() as &Client as *const _) });
*pself.client = Some(item);
let c2: &PgClient = pself.client.as_ref().get_ref().as_ref().unwrap();
*pself.client_ref = Some(unsafe { &*(c2 as *const _) });
//() == pself.node_config.as_ref();
//() == pself.client.as_ref().as_pin_ref().unwrap();
/* *pself.ident_fut = Some(Box::pin(get_node_disk_ident_2(
pself.node_config.as_ref(),
pself.client.as_ref().as_pin_ref().unwrap(),
)));*/
*pself.ident_fut = Some(Box::pin(get_node_disk_ident(
pself.node_config_ref,
pself.client_ref.as_ref().unwrap(),
)));
let ret = UpdatedDbWithChannelNames {
msg: format!("Client opened connection"),
count: 42,
};
Ready(Some(Ok(ret)))
}
Ready(Err(e)) => {
*pself.errored = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
} else {
Ready(None)
};
}
}
}

View File

@@ -100,6 +100,7 @@ pub async fn search_channel_scylla(
" series, facility, channel, scalar_type, shape_dims",
" from series_by_channel",
" where channel ~* $1",
" and scalar_type != -2147483647",
" limit 400000",
));
let pgclient = crate::create_connection(pgconf).await?;

View File

@@ -134,8 +134,6 @@ impl Stream for EventChunkerMultifile {
}
}
if max >= self.range.end {
info!("REACHED RANGE END, TRUNCATE");
info!("{:20} ... {:20}", self.range.beg, self.range.end);
self.range_final = true;
h.truncate_ts(self.range.end);
self.evs = None;
@@ -187,7 +185,6 @@ impl Stream for EventChunkerMultifile {
path.clone(),
self.expand,
self.do_decompress,
format!("{:?}", path),
);
let filtered = RangeFilter::new(chunker, self.range.clone(), self.expand);
self.evs = Some(Box::pin(filtered));
@@ -223,7 +220,6 @@ impl Stream for EventChunkerMultifile {
of.path.clone(),
self.expand,
self.do_decompress,
format!("{:?}", of.path),
);
chunkers.push(chunker);
}

View File

@@ -154,6 +154,8 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let span = netpod::log::span!(Level::INFO, "disk::merge");
let _spg = span.enter();
'outer: loop {
break if self.completed {
panic!("poll_next on completed");
@@ -390,7 +392,6 @@ mod test {
dbg_path.clone(),
expand,
do_decompress,
format!("{:?}", dbg_path),
);
chunker
})

View File

@@ -109,7 +109,6 @@ pub fn main() -> Result<(), Error> {
path.clone(),
false,
true,
format!("{:?}", path),
);
err::todo();
Ok(())

View File

@@ -1,6 +1,6 @@
[package]
name = "httpret"
version = "0.3.6"
version = "0.4.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -477,7 +477,7 @@ async fn prebinned_inner(
todo!()
}
pub async fn random_channel(
async fn random_channel(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
@@ -488,7 +488,7 @@ pub async fn random_channel(
Ok(ret)
}
pub async fn clear_cache_all(
async fn clear_cache_all(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
@@ -505,7 +505,7 @@ pub async fn clear_cache_all(
Ok(ret)
}
pub async fn update_db_with_channel_names(
async fn update_db_with_channel_names(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
@@ -541,7 +541,7 @@ pub async fn update_db_with_channel_names(
}
}
pub async fn update_db_with_channel_names_3(
async fn update_db_with_channel_names_3(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
@@ -564,7 +564,7 @@ pub async fn update_db_with_channel_names_3(
Ok(ret)
}
pub async fn update_db_with_all_channel_configs(
async fn update_db_with_all_channel_configs(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
@@ -587,7 +587,7 @@ pub async fn update_db_with_all_channel_configs(
Ok(ret)
}
pub async fn update_search_cache(
async fn update_search_cache(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,

View File

@@ -107,10 +107,6 @@ impl EventFull {
let mut nkeep = usize::MAX;
for (i, &ts) in self.tss.iter().enumerate() {
if ts >= end {
for (i, &ts) in self.tss.iter().enumerate() {
eprintln!("{i:5} {ts:20}");
}
eprintln!("truncate to i {i} ts {ts}");
nkeep = i;
break;
}

View File

@@ -207,7 +207,6 @@ pub struct EventsDim0CollectorOutput<NTY> {
timed_out: bool,
#[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")]
continue_at: Option<IsoDateTime>,
dummy_marker: u32,
}
impl<NTY: ScalarOps> EventsDim0CollectorOutput<NTY> {
@@ -377,7 +376,6 @@ impl<NTY: ScalarOps> items_0::collect_s::CollectorType for EventsDim0Collector<N
range_final: self.range_final,
timed_out: self.timed_out,
continue_at,
dummy_marker: 4242,
};
if !ret.is_valid() {
error!("invalid:\n{}", ret.info_str());

View File

@@ -0,0 +1,19 @@
use items_0::Events;
pub trait EventTransform {
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events>;
}
pub struct IdentityTransform {}
impl IdentityTransform {
pub fn default() -> Self {
Self {}
}
}
impl EventTransform for IdentityTransform {
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events> {
src
}
}

View File

@@ -5,6 +5,7 @@ pub mod databuffereventblobs;
pub mod eventsdim0;
pub mod eventsdim1;
pub mod eventsxbindim0;
pub mod eventtransform;
pub mod merger;
pub mod streams;
#[cfg(test)]
@@ -251,7 +252,7 @@ pub fn empty_events_dyn_ev(
}
},
Shape::Wave(..) => match agg_kind {
AggKind::Plain => {
AggKind::Plain | AggKind::TimeWeightedScalar => {
use ScalarType::*;
type K<T> = eventsdim1::EventsDim1<T>;
match scalar_type {

View File

@@ -367,7 +367,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.poll_count += 1;
let span1 = span!(Level::INFO, "Merger", pc = self.poll_count);
let span1 = span!(Level::TRACE, "Merger", pc = self.poll_count);
let _spg = span1.enter();
loop {
trace3!("poll");

View File

@@ -3,6 +3,7 @@ pub mod histo;
pub mod query;
pub mod status;
pub mod streamext;
pub mod transform;
use crate::log::*;
use bytes::Bytes;

View File

@@ -5,6 +5,7 @@ pub mod prebinned;
use crate::get_url_query_pairs;
use crate::is_false;
use crate::log::*;
use crate::transform::Transform;
use crate::AggKind;
use crate::AppendToUrl;
use crate::ByteSize;
@@ -86,6 +87,8 @@ pub struct PlainEventsQuery {
range: NanoRange,
#[serde(default, skip_serializing_if = "Option::is_none")]
agg_kind: Option<AggKind>,
#[serde(default, skip_serializing_if = "Option::is_none")]
transform: Option<Transform>,
#[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")]
timeout: Option<Duration>,
#[serde(default, skip_serializing_if = "Option::is_none")]
@@ -114,6 +117,7 @@ impl PlainEventsQuery {
channel,
range,
agg_kind,
transform: None,
timeout,
events_max,
event_delay: None,
@@ -156,7 +160,7 @@ impl PlainEventsQuery {
}
pub fn events_max(&self) -> u64 {
self.events_max.unwrap_or(1024 * 1024)
self.events_max.unwrap_or(1024 * 512)
}
pub fn event_delay(&self) -> &Option<Duration> {
@@ -206,12 +210,13 @@ impl FromUrl for PlainEventsQuery {
let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?;
let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?;
let ret = Self {
channel: Channel::from_pairs(&pairs)?,
channel: Channel::from_pairs(pairs)?,
range: NanoRange {
beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(),
end: end_date.parse::<DateTime<Utc>>()?.to_nanos(),
},
agg_kind: agg_kind_from_binning_scheme(&pairs)?,
agg_kind: agg_kind_from_binning_scheme(pairs)?,
transform: Some(Transform::from_pairs(pairs)?),
timeout: pairs
.get("timeout")
.map(|x| x.parse::<u64>().map(Duration::from_millis).ok())
@@ -247,6 +252,9 @@ impl AppendToUrl for PlainEventsQuery {
fn append_to_url(&self, url: &mut Url) {
let date_fmt = "%Y-%m-%dT%H:%M:%S.%6fZ";
self.channel.append_to_url(url);
if let Some(x) = &self.transform {
x.append_to_url(url);
}
if let Some(x) = &self.agg_kind {
binning_scheme_append_to_url(x, url);
}

47
netpod/src/transform.rs Normal file
View File

@@ -0,0 +1,47 @@
use crate::get_url_query_pairs;
use crate::AppendToUrl;
use crate::FromUrl;
use serde::Deserialize;
use serde::Serialize;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Transform {
array_pick: Option<usize>,
}
impl Transform {
fn url_prefix() -> &'static str {
"transform"
}
}
impl FromUrl for Transform {
fn from_url(url: &url::Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &std::collections::BTreeMap<String, String>) -> Result<Self, err::Error> {
let upre = Self::url_prefix();
let ret = Self {
array_pick: pairs
.get(&format!("{}ArrayPick", upre))
.map(|x| match x.parse::<usize>() {
Ok(n) => Some(n),
Err(_) => None,
})
.unwrap_or(None),
};
Ok(ret)
}
}
impl AppendToUrl for Transform {
fn append_to_url(&self, url: &mut url::Url) {
let upre = Self::url_prefix();
let mut g = url.query_pairs_mut();
if let Some(x) = &self.array_pick {
g.append_pair(&format!("{}ArrayPick", upre), &format!("{}", x));
}
}
}

View File

@@ -249,7 +249,30 @@ async fn events_conn_handler_inner_try(
} else {
match make_channel_events_stream(evq, node_config).await {
Ok(stream) => {
let stream = stream.map(|x| Box::new(x) as _);
let stream = stream
.map({
use items_2::eventtransform::EventTransform;
let mut tf = items_2::eventtransform::IdentityTransform::default();
move |item| match item {
Ok(item2) => match item2 {
StreamItem::DataItem(item3) => match item3 {
RangeCompletableItem::Data(item4) => match item4 {
ChannelEvents::Events(item5) => {
let a = tf.transform(item5);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(
ChannelEvents::Events(a),
)))
}
x => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))),
},
x => Ok(StreamItem::DataItem(x)),
},
x => Ok(x),
},
_ => item,
}
})
.map(|x| Box::new(x) as _);
Box::pin(stream)
}
Err(e) => {
@@ -266,7 +289,7 @@ async fn events_conn_handler_inner_try(
if buf.len() > 1024 * 64 {
warn!("emit buf len {}", buf.len());
} else {
trace!("emit buf len {}", buf.len());
info!("emit buf len {}", buf.len());
}
buf_len_histo.ingest(buf.len() as u32);
match netout.write_all(&buf).await {

View File

@@ -40,7 +40,6 @@ pub struct EventChunker {
seen_after_range_count: usize,
unordered_warn_count: usize,
repeated_ts_warn_count: usize,
dbgdesc: String,
}
impl Drop for EventChunker {
@@ -84,7 +83,6 @@ impl EventChunker {
dbg_path: PathBuf,
expand: bool,
do_decompress: bool,
dbgdesc: String,
) -> Self {
trace!("EventChunker::from_start");
let mut inp = NeedMinBuffer::new(inp);
@@ -113,7 +111,6 @@ impl EventChunker {
seen_after_range_count: 0,
unordered_warn_count: 0,
repeated_ts_warn_count: 0,
dbgdesc,
}
}
@@ -126,18 +123,8 @@ impl EventChunker {
dbg_path: PathBuf,
expand: bool,
do_decompress: bool,
dbgdesc: String,
) -> Self {
let mut ret = Self::from_start(
inp,
channel_config,
range,
stats_conf,
dbg_path,
expand,
do_decompress,
dbgdesc,
);
let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress);
ret.state = DataFileState::Event;
ret.need_min = 4;
ret.inp.set_need_min(4);

View File

@@ -1,7 +1,8 @@
use super::inmem::InMemoryFrameAsyncReadStream;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items::frame::decode_frame;
use items::inmem::InMemoryFrame;
use items::FrameTypeInnerStatic;
use items::Sitemty;
use items::StreamItem;
@@ -11,23 +12,16 @@ use std::marker::PhantomData;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
pub struct EventsFromFrames<T, I>
where
T: AsyncRead + Unpin,
{
inp: InMemoryFrameAsyncReadStream<T>,
pub struct EventsFromFrames<O> {
inp: Pin<Box<dyn Stream<Item = Result<StreamItem<InMemoryFrame>, Error>> + Send>>,
errored: bool,
completed: bool,
_m1: PhantomData<I>,
_m1: PhantomData<O>,
}
impl<T, I> EventsFromFrames<T, I>
where
T: AsyncRead + Unpin,
{
pub fn new(inp: InMemoryFrameAsyncReadStream<T>) -> Self {
impl<O> EventsFromFrames<O> {
pub fn new(inp: Pin<Box<dyn Stream<Item = Result<StreamItem<InMemoryFrame>, Error>> + Send>>) -> Self {
Self {
inp,
errored: false,
@@ -37,15 +31,16 @@ where
}
}
impl<T, I> Stream for EventsFromFrames<T, I>
impl<O> Stream for EventsFromFrames<O>
where
T: AsyncRead + Unpin,
I: FrameTypeInnerStatic + DeserializeOwned + Unpin,
O: FrameTypeInnerStatic + DeserializeOwned + Unpin,
{
type Item = Sitemty<I>;
type Item = Sitemty<O>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let span = netpod::log::span!(netpod::log::Level::INFO, "EvFrFr");
let _spg = span.enter();
loop {
break if self.completed {
panic!("poll_next on completed");
@@ -57,7 +52,7 @@ where
Ready(Some(Ok(item))) => match item {
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
StreamItem::DataItem(frame) => match decode_frame::<Sitemty<I>>(&frame) {
StreamItem::DataItem(frame) => match decode_frame::<Sitemty<O>>(&frame) {
Ok(item) => match item {
Ok(item) => Ready(Some(Ok(item))),
Err(e) => {

View File

@@ -9,6 +9,7 @@ use netpod::log::*;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
use tracing::Instrument;
#[allow(unused)]
macro_rules! trace2 {
@@ -152,7 +153,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let span = span!(Level::TRACE, "inmem");
let span = span!(Level::INFO, "InMemRd");
let _spanguard = span.enter();
loop {
break if self.complete {

View File

@@ -93,6 +93,9 @@ impl SlideBuf {
pub fn wadv(&mut self, x: usize) -> Result<(), Error> {
check_invariants!(self);
if self.wcap() < x {
self.rewind();
}
if self.wcap() < x {
return Err(Error::NotEnoughSpace);
} else {
@@ -253,6 +256,9 @@ impl SlideBuf {
pub fn ___write_buf___(&mut self, n: usize) -> Result<&mut [u8], Error> {
check_invariants!(self);
self.rewind_if_needed(n);
if self.wcap() < n {
self.rewind();
}
if self.wcap() < n {
Err(Error::NotEnoughSpace)
} else {
@@ -283,6 +289,9 @@ impl SlideBuf {
pub fn available_writable_area(&mut self, need_min: usize) -> Result<&mut [u8], Error> {
check_invariants!(self);
self.rewind_if_needed(need_min);
if self.wcap() < need_min {
self.rewind();
}
if self.wcap() < need_min {
Err(Error::NotEnoughSpace)
} else {
@@ -294,6 +303,9 @@ impl SlideBuf {
pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> {
check_invariants!(self);
self.rewind_if_needed(buf.len());
if self.wcap() < buf.len() {
self.rewind();
}
if self.wcap() < buf.len() {
return Err(Error::NotEnoughSpace);
} else {
@@ -308,6 +320,9 @@ impl SlideBuf {
type T = u8;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed(TS);
if self.wcap() < TS {
self.rewind();
}
if self.wcap() < TS {
return Err(Error::NotEnoughSpace);
} else {
@@ -322,6 +337,9 @@ impl SlideBuf {
type T = u16;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed(TS);
if self.wcap() < TS {
self.rewind();
}
if self.wcap() < TS {
return Err(Error::NotEnoughSpace);
} else {
@@ -336,6 +354,9 @@ impl SlideBuf {
type T = u32;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed(TS);
if self.wcap() < TS {
self.rewind();
}
if self.wcap() < TS {
return Err(Error::NotEnoughSpace);
} else {
@@ -350,6 +371,9 @@ impl SlideBuf {
type T = u64;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed(TS);
if self.wcap() < TS {
self.rewind();
}
if self.wcap() < TS {
return Err(Error::NotEnoughSpace);
} else {
@@ -364,6 +388,9 @@ impl SlideBuf {
type T = f32;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed(TS);
if self.wcap() < TS {
self.rewind();
}
if self.wcap() < TS {
return Err(Error::NotEnoughSpace);
} else {
@@ -378,6 +405,9 @@ impl SlideBuf {
type T = f64;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed(TS);
if self.wcap() < TS {
self.rewind();
}
if self.wcap() < TS {
return Err(Error::NotEnoughSpace);
} else {

View File

@@ -20,6 +20,7 @@ use netpod::{Node, PerfOpts};
use std::pin::Pin;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tracing::Instrument;
pub async fn x_processed_event_blobs_stream_from_node(
query: PlainEventsQuery,
@@ -43,6 +44,7 @@ pub async fn x_processed_event_blobs_stream_from_node(
netout.flush().await?;
netout.forget();
let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap);
let frames = Box::pin(frames) as _;
let items = EventsFromFrames::new(frames);
Ok(Box::pin(items))
}
@@ -71,8 +73,11 @@ where
netout.flush().await?;
netout.forget();
// TODO for images, we need larger buffer capacity
let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 1024 * 2);
let stream = EventsFromFrames::<_, T>::new(frames);
let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 1024 * 2)
//.instrument(netpod::log::span!(netpod::log::Level::TRACE, "InMemRd"))
;
let frames = Box::pin(frames) as _;
let stream = EventsFromFrames::<T>::new(frames);
streams.push(Box::pin(stream) as _);
}
Ok(streams)