Files
daqingest/scywr/src/futinsert.rs
T
2025-04-29 15:16:38 +02:00

74 lines
2.3 KiB
Rust

use crate::access::Error;
use crate::session::ScySession;
use futures_util::Future;
use futures_util::FutureExt;
use netpod::log::error;
use scylla::errors::ExecutionError;
use scylla::response::query_result::QueryResult;
use scylla::serialize::row::SerializeRow;
use scylla::statement::prepared::PreparedStatement;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Instant;
pub struct ScyInsertFut<'a> {
fut: Pin<Box<dyn Future<Output = Result<QueryResult, ExecutionError>> + Send + 'a>>,
polled: usize,
ts_create: Instant,
ts_poll_first: Instant,
}
impl<'a> ScyInsertFut<'a> {
const NAME: &'static str = "ScyInsertFut";
pub fn new<V>(scy: &'a ScySession, query: &'a PreparedStatement, values: V) -> Self
where
V: SerializeRow + Send + 'static,
{
let fut = scy.execute_unpaged(query, values);
let fut = Box::pin(fut) as _;
let tsnow = Instant::now();
Self {
fut,
polled: 0,
ts_create: tsnow,
ts_poll_first: tsnow,
}
}
}
impl<'a> Future for ScyInsertFut<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.polled == 0 {
self.ts_poll_first = Instant::now();
}
self.polled += 1;
loop {
break match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_res) => Ready(Ok(())),
Err(e) => {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_poll_first = tsnow.duration_since(self.ts_poll_first).as_secs_f32() * 1e3;
error!(
"{} polled {} dt_created {:6.2} ms dt_poll_first {:6.2} ms",
Self::NAME,
self.polled,
dt_created,
dt_poll_first
);
error!("{} done Err {:?}", Self::NAME, e);
Ready(Err(e.into()))
}
},
Pending => Pending,
};
}
}
}