0.5.0-alpha.0
This commit is contained in:
@@ -795,7 +795,7 @@ impl Stream for DataApiPython3DataStream {
|
|||||||
self.completed = true;
|
self.completed = true;
|
||||||
let dt = self.ts_ctor.elapsed().as_secs_f32();
|
let dt = self.ts_ctor.elapsed().as_secs_f32();
|
||||||
info!(
|
info!(
|
||||||
"response body sent {} bytes {} items {:0} ms",
|
"response body sent {} bytes {} items {:.0} ms",
|
||||||
self.count_bytes,
|
self.count_bytes,
|
||||||
self.count_emits,
|
self.count_emits,
|
||||||
1e3 * dt
|
1e3 * dt
|
||||||
|
|||||||
@@ -62,6 +62,13 @@ impl UpdateDbWithChannelNamesHandler {
|
|||||||
}
|
}
|
||||||
Err(e) => Err(e),
|
Err(e) => Err(e),
|
||||||
});
|
});
|
||||||
|
let stream = streams::print_on_done::PrintOnDone::new(
|
||||||
|
stream,
|
||||||
|
Box::pin(|ts| {
|
||||||
|
let dt = ts.elapsed();
|
||||||
|
info!("{} stream done {:.0} ms", Self::self_name(), 1e3 * dt.as_secs_f32());
|
||||||
|
}),
|
||||||
|
);
|
||||||
let ret = response(StatusCode::OK)
|
let ret = response(StatusCode::OK)
|
||||||
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
|
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
|
||||||
.body(body_stream(stream))?;
|
.body(body_stream(stream))?;
|
||||||
@@ -122,6 +129,13 @@ impl UpdateDbWithAllChannelConfigsHandler {
|
|||||||
}
|
}
|
||||||
Err(e) => Err(e),
|
Err(e) => Err(e),
|
||||||
});
|
});
|
||||||
|
let stream = streams::print_on_done::PrintOnDone::new(
|
||||||
|
stream,
|
||||||
|
Box::pin(|ts| {
|
||||||
|
let dt = ts.elapsed();
|
||||||
|
info!("{} stream done {:.0} ms", Self::self_name(), 1e3 * dt.as_secs_f32());
|
||||||
|
}),
|
||||||
|
);
|
||||||
let ret = response(StatusCode::OK)
|
let ret = response(StatusCode::OK)
|
||||||
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
|
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
|
||||||
.body(body_stream(stream))?;
|
.body(body_stream(stream))?;
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ use netpod::FromUrl;
|
|||||||
use netpod::NodeConfigCached;
|
use netpod::NodeConfigCached;
|
||||||
use netpod::ACCEPT_ALL;
|
use netpod::ACCEPT_ALL;
|
||||||
use netpod::APP_JSON;
|
use netpod::APP_JSON;
|
||||||
use url::Url;
|
|
||||||
|
|
||||||
pub struct ConnectionStatusEvents {}
|
pub struct ConnectionStatusEvents {}
|
||||||
|
|
||||||
|
|||||||
@@ -377,6 +377,12 @@ async fn http_service_inner(
|
|||||||
Ok(h.handle(req, &node_config).await?)
|
Ok(h.handle(req, &node_config).await?)
|
||||||
} else if let Some(h) = api4::search::ChannelSearchHandler::handler(&req) {
|
} else if let Some(h) = api4::search::ChannelSearchHandler::handler(&req) {
|
||||||
Ok(h.handle(req, &node_config).await?)
|
Ok(h.handle(req, &node_config).await?)
|
||||||
|
} else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) {
|
||||||
|
Ok(h.handle(req, ctx, &node_config).await?)
|
||||||
|
} else if let Some(h) = channel_status::ChannelStatusEventsHandler::handler(&req) {
|
||||||
|
Ok(h.handle(req, ctx, &node_config).await?)
|
||||||
|
} else if let Some(h) = api4::events::EventsHandler::handler(&req) {
|
||||||
|
Ok(h.handle(req, ctx, &node_config).await?)
|
||||||
} else if let Some(h) = api4::binned::BinnedHandler::handler(&req) {
|
} else if let Some(h) = api4::binned::BinnedHandler::handler(&req) {
|
||||||
Ok(h.handle(req, ctx, &node_config).await?)
|
Ok(h.handle(req, ctx, &node_config).await?)
|
||||||
} else if let Some(h) = channelconfig::ChannelConfigQuorumHandler::handler(&req) {
|
} else if let Some(h) = channelconfig::ChannelConfigQuorumHandler::handler(&req) {
|
||||||
@@ -395,12 +401,6 @@ async fn http_service_inner(
|
|||||||
Ok(h.handle(req, &node_config).await?)
|
Ok(h.handle(req, &node_config).await?)
|
||||||
} else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) {
|
} else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) {
|
||||||
Ok(h.handle(req, &node_config).await?)
|
Ok(h.handle(req, &node_config).await?)
|
||||||
} else if let Some(h) = api4::events::EventsHandler::handler(&req) {
|
|
||||||
Ok(h.handle(req, ctx, &node_config).await?)
|
|
||||||
} else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) {
|
|
||||||
Ok(h.handle(req, ctx, &node_config).await?)
|
|
||||||
} else if let Some(h) = channel_status::ChannelStatusEventsHandler::handler(&req) {
|
|
||||||
Ok(h.handle(req, ctx, &node_config).await?)
|
|
||||||
} else if path == "/api/4/prebinned" {
|
} else if path == "/api/4/prebinned" {
|
||||||
if req.method() == Method::GET {
|
if req.method() == Method::GET {
|
||||||
Ok(prebinned(req, ctx, &node_config).await?)
|
Ok(prebinned(req, ctx, &node_config).await?)
|
||||||
@@ -433,8 +433,6 @@ async fn http_service_inner(
|
|||||||
Ok(h.handle(req, ctx, &node_config).await?)
|
Ok(h.handle(req, ctx, &node_config).await?)
|
||||||
} else if let Some(h) = pulsemap::MapPulseScyllaHandler::handler(&req) {
|
} else if let Some(h) = pulsemap::MapPulseScyllaHandler::handler(&req) {
|
||||||
Ok(h.handle(req, &node_config).await?)
|
Ok(h.handle(req, &node_config).await?)
|
||||||
} else if let Some(h) = pulsemap::IndexFullHttpFunction::handler(&req) {
|
|
||||||
Ok(h.handle(req, &node_config).await?)
|
|
||||||
} else if let Some(h) = pulsemap::IndexChannelHttpFunction::handler(&req) {
|
} else if let Some(h) = pulsemap::IndexChannelHttpFunction::handler(&req) {
|
||||||
Ok(h.handle(req, ctx, &node_config).await?)
|
Ok(h.handle(req, ctx, &node_config).await?)
|
||||||
} else if let Some(h) = pulsemap::MarkClosedHttpFunction::handler(&req) {
|
} else if let Some(h) = pulsemap::MarkClosedHttpFunction::handler(&req) {
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ use crate::gather::SubRes;
|
|||||||
use crate::pulsemap::MapPulseQuery;
|
use crate::pulsemap::MapPulseQuery;
|
||||||
use crate::response;
|
use crate::response;
|
||||||
use crate::response_err;
|
use crate::response_err;
|
||||||
use crate::status_board;
|
|
||||||
use crate::status_board_init;
|
use crate::status_board_init;
|
||||||
use crate::Cont;
|
use crate::Cont;
|
||||||
use futures_util::pin_mut;
|
use futures_util::pin_mut;
|
||||||
|
|||||||
@@ -91,7 +91,7 @@ async fn make_tables(pgc: &dbconn::pg::Client) -> Result<(), Error> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn timer_channel_names() -> Vec<String> {
|
fn _timer_channel_names() -> Vec<String> {
|
||||||
let sections = vec!["SINEG01", "SINSB01", "SINSB02", "SINSB03", "SINSB04", "SINXB01"];
|
let sections = vec!["SINEG01", "SINSB01", "SINSB02", "SINSB03", "SINSB04", "SINXB01"];
|
||||||
let suffixes = vec!["MASTER"];
|
let suffixes = vec!["MASTER"];
|
||||||
let mut all: Vec<_> = sections
|
let mut all: Vec<_> = sections
|
||||||
@@ -388,57 +388,6 @@ async fn read_chunk_at(mut file: File, pos: u64, chunk_len: Option<u64>) -> Resu
|
|||||||
Ok((ret, file))
|
Ok((ret, file))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct IndexFullHttpFunction {}
|
|
||||||
|
|
||||||
impl IndexFullHttpFunction {
|
|
||||||
pub fn handler(req: &Requ) -> Option<Self> {
|
|
||||||
if req.uri().path().eq("/api/1/map/index/full") {
|
|
||||||
Some(Self {})
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
|
|
||||||
if req.method() != Method::GET {
|
|
||||||
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
|
|
||||||
}
|
|
||||||
let ret = match Self::index(false, node_config).await {
|
|
||||||
Ok(msg) => response(StatusCode::OK).body(body_string(msg))?,
|
|
||||||
Err(e) => {
|
|
||||||
error!("IndexFullHttpFunction {e}");
|
|
||||||
response(StatusCode::INTERNAL_SERVER_ERROR).body(body_string(format!("{:?}", e)))?
|
|
||||||
}
|
|
||||||
};
|
|
||||||
Ok(ret)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn index(do_print: bool, node_config: &NodeConfigCached) -> Result<String, Error> {
|
|
||||||
// TODO avoid double-insert on central storage.
|
|
||||||
let mut msg = format!("LOG");
|
|
||||||
let pgc = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
|
|
||||||
// TODO remove update of static columns when older clients are removed.
|
|
||||||
let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname, ks) values ($1, $2, $3, $4, $5, $6, $7) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6";
|
|
||||||
let insert_01 = pgc.prepare(sql).await?;
|
|
||||||
make_tables(&pgc).await?;
|
|
||||||
let chs = timer_channel_names();
|
|
||||||
for channel_name in chs {
|
|
||||||
match IndexChannelHttpFunction::index_channel(&channel_name, &pgc, do_print, &insert_01, node_config).await
|
|
||||||
{
|
|
||||||
Ok(m) => {
|
|
||||||
msg.push_str("\n");
|
|
||||||
msg.push_str(&m);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("error while indexing {} {:?}", channel_name, e);
|
|
||||||
//return Err(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct IndexChannelHttpFunction {}
|
pub struct IndexChannelHttpFunction {}
|
||||||
|
|
||||||
impl IndexChannelHttpFunction {
|
impl IndexChannelHttpFunction {
|
||||||
@@ -688,13 +637,20 @@ async fn update_task(do_abort: Arc<AtomicUsize>, node_config: NodeConfigCached)
|
|||||||
false
|
false
|
||||||
};
|
};
|
||||||
CACHE.housekeeping();
|
CACHE.housekeeping();
|
||||||
match IndexFullHttpFunction::index(do_print, &node_config).await {
|
|
||||||
|
// TODO do the actual work here
|
||||||
|
let _ = node_config;
|
||||||
|
match tokio::time::sleep(Duration::from_millis(100))
|
||||||
|
.map(|_| Ok::<_, Error>(123u32))
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("issue during last update task: {:?}", e);
|
error!("issue during last update task: {:?}", e);
|
||||||
tokio::time::sleep(Duration::from_millis(20000)).await;
|
tokio::time::sleep(Duration::from_millis(20000)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let ts2 = Instant::now();
|
let ts2 = Instant::now();
|
||||||
let dt = ts2.duration_since(ts1);
|
let dt = ts2.duration_since(ts1);
|
||||||
if do_print || dt >= Duration::from_millis(4000) {
|
if do_print || dt >= Duration::from_millis(4000) {
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ pub mod generators;
|
|||||||
pub mod itemclone;
|
pub mod itemclone;
|
||||||
pub mod needminbuffer;
|
pub mod needminbuffer;
|
||||||
pub mod plaineventsjson;
|
pub mod plaineventsjson;
|
||||||
|
pub mod print_on_done;
|
||||||
pub mod rangefilter2;
|
pub mod rangefilter2;
|
||||||
pub mod slidebuf;
|
pub mod slidebuf;
|
||||||
pub mod tcprawclient;
|
pub mod tcprawclient;
|
||||||
|
|||||||
41
crates/streams/src/print_on_done.rs
Normal file
41
crates/streams/src/print_on_done.rs
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
use futures_util::Stream;
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::Context;
|
||||||
|
use std::task::Poll;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
pub struct PrintOnDone<INP> {
|
||||||
|
ts_ctor: Instant,
|
||||||
|
inp: INP,
|
||||||
|
on_done: Pin<Box<dyn Fn(Instant) -> () + Send>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<INP> PrintOnDone<INP> {
|
||||||
|
pub fn new(inp: INP, on_done: Pin<Box<dyn Fn(Instant) -> () + Send>>) -> Self {
|
||||||
|
Self {
|
||||||
|
ts_ctor: Instant::now(),
|
||||||
|
inp,
|
||||||
|
on_done,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<INP> Stream for PrintOnDone<INP>
|
||||||
|
where
|
||||||
|
INP: Stream + Unpin,
|
||||||
|
{
|
||||||
|
type Item = <INP as Stream>::Item;
|
||||||
|
|
||||||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
|
use Poll::*;
|
||||||
|
match self.inp.poll_next_unpin(cx) {
|
||||||
|
Ready(Some(x)) => Ready(Some(x)),
|
||||||
|
Ready(None) => {
|
||||||
|
(self.on_done)(self.ts_ctor);
|
||||||
|
Ready(None)
|
||||||
|
}
|
||||||
|
Pending => Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user