From dc89de73cef21c45134cbd9dfc408757b1eec1f2 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 21 Jul 2022 15:49:36 +0200 Subject: [PATCH] Find handled channels by name --- netfetch/Cargo.toml | 2 + netfetch/src/ca.rs | 25 ++++++++++- netfetch/src/ca/conn.rs | 64 +++++++++++++++++++++++++++ netfetch/src/metrics.rs | 97 ++++++++++++++++++++++++++++++++++++++--- 4 files changed, 182 insertions(+), 6 deletions(-) diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 4caa3d8..66e3199 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -28,6 +28,8 @@ hex = "0.4" libc = "0.2" regex = "1.5.5" axum = "0.5" +http = "0.2" +hyper = "0.14" log = { path = "../log" } stats = { path = "../stats" } err = { path = "../../daqbuffer/err" } diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 4daa108..5e4f6ec 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -4,7 +4,9 @@ pub mod search; pub mod store; use self::store::DataStore; +use crate::ca::conn::ConnCommand; use crate::store::{CommonInsertItemQueue, QueryItem}; +use async_channel::Sender; use conn::CaConn; use err::Error; use futures_util::StreamExt; @@ -13,7 +15,7 @@ use netpod::{Database, ScyllaConfig}; use scylla::batch::Consistency; use serde::{Deserialize, Serialize}; use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::net::{Ipv4Addr, SocketAddrV4}; use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; @@ -229,16 +231,35 @@ async fn spawn_scylla_insert_workers( Ok(()) } +pub struct CommandQueueSet { + queues: tokio::sync::Mutex>>, +} + +impl CommandQueueSet { + pub fn new() -> Self { + Self { + queues: tokio::sync::Mutex::new(VecDeque::>::new()), + } + } + + pub fn queues(&self) -> &tokio::sync::Mutex>> { + &self.queues + } +} + pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let facility = "scylla"; let insert_frac = Arc::new(AtomicU64::new(1000)); let insert_ivl_min = Arc::new(AtomicU64::new(8800)); let opts = parse_config(opts.config).await?; let scyconf = opts.scyconf.clone(); + + let command_queue_set = Arc::new(CommandQueueSet::new()); tokio::spawn(crate::metrics::start_metrics_service( opts.api_bind.clone(), insert_frac.clone(), insert_ivl_min.clone(), + command_queue_set.clone(), )); let d = Database { name: opts.pgconf.name.clone(), @@ -363,6 +384,8 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { conn.channel_add(c); } let stats2 = conn.stats(); + let conn_command_tx = conn.conn_command_tx(); + command_queue_set.queues().lock().await.push_back(conn_command_tx); let conn_block = async move { while let Some(item) = conn.next().await { match item { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index cfaec83..ac0a0b7 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -5,6 +5,7 @@ use crate::ca::proto::{CreateChan, EventAdd, HeadInfo}; use crate::ca::store::ChannelRegistry; use crate::series::{Existence, SeriesId}; use crate::store::{CommonInsertItemQueueSender, InsertItem, IvlItem, MuteItem, QueryItem}; +use async_channel::Sender; use err::Error; use futures_util::stream::FuturesOrdered; use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt}; @@ -108,6 +109,26 @@ impl IdStore { } } +#[derive(Debug)] +pub enum ConnCommandKind { + FindChannel(String, Sender<(SocketAddrV4, Vec)>), +} + +#[derive(Debug)] +pub struct ConnCommand { + kind: ConnCommandKind, +} + +impl ConnCommand { + pub fn find_channel(pattern: String) -> (ConnCommand, async_channel::Receiver<(SocketAddrV4, Vec)>) { + let (tx, rx) = async_channel::bounded(1); + let cmd = Self { + kind: ConnCommandKind::FindChannel(pattern, tx), + }; + (cmd, rx) + } +} + #[allow(unused)] pub struct CaConn { state: CaConnState, @@ -134,6 +155,8 @@ pub struct CaConn { stats: Arc, insert_queue_max: usize, insert_ivl_min: Arc, + conn_command_tx: async_channel::Sender, + conn_command_rx: async_channel::Receiver, } impl CaConn { @@ -146,6 +169,7 @@ impl CaConn { insert_queue_max: usize, insert_ivl_min: Arc, ) -> Self { + let (cq_tx, cq_rx) = async_channel::bounded(32); Self { state: CaConnState::Unconnected, proto: None, @@ -170,6 +194,45 @@ impl CaConn { stats: Arc::new(CaConnStats::new()), insert_queue_max, insert_ivl_min, + conn_command_tx: cq_tx, + conn_command_rx: cq_rx, + } + } + + pub fn conn_command_tx(&self) -> async_channel::Sender { + self.conn_command_tx.clone() + } + + fn handle_conn_command(&mut self, cx: &mut Context) { + // TODO if this loops for too long time, interrupt and make sure we get called waken up again. + use Poll::*; + loop { + match self.conn_command_rx.poll_next_unpin(cx) { + Ready(Some(a)) => match a.kind { + ConnCommandKind::FindChannel(pattern, tx) => { + info!("Search for {pattern:?}"); + let mut res = Vec::new(); + for name in self.name_by_cid.values() { + if !pattern.is_empty() && name.contains(&pattern) { + res.push(name.clone()); + } + } + let msg = (self.remote_addr_dbg.clone(), res); + match tx.try_send(msg) { + Ok(_) => {} + Err(_) => { + error!("response channel full or closed"); + } + } + } + }, + Ready(None) => { + error!("Command queue closed"); + } + Pending => { + break; + } + } } } @@ -723,6 +786,7 @@ impl Stream for CaConn { self.poll_count += 1; // TODO factor out the inner loop: let ret = 'outer: loop { + self.handle_conn_command(cx); let q = self.handle_insert_futs(cx); let ts2 = Instant::now(); self.stats diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 4e5a5bb..68b41c4 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,12 +1,28 @@ +use crate::ca::conn::ConnCommand; +use crate::ca::CommandQueueSet; use log::*; +use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -pub async fn start_metrics_service(bind_to: String, insert_frac: Arc, insert_ivl_min: Arc) { - let app = axum::Router::new() +async fn get_empty() -> String { + format!("") +} + +pub async fn start_metrics_service( + bind_to: String, + insert_frac: Arc, + insert_ivl_min: Arc, + command_queue_set: Arc, +) { + use axum::extract::Query; + use axum::routing::{get, post, put}; + use axum::{extract, Router}; + use http::request::Parts; + let app = Router::new() .route( "/metrics", - axum::routing::get(|| async { + get(|| async { let stats = crate::ca::get_metrics(); match stats { Some(s) => { @@ -20,17 +36,88 @@ pub async fn start_metrics_service(bind_to: String, insert_frac: Arc, } }), ) + .route( + "/daqingest/find/channel", + get(|Query(params): Query>| async move { + let pattern = params.get("pattern").map_or(String::new(), |x| x.clone()).to_string(); + let g = command_queue_set.queues().lock().await; + let mut rxs = Vec::new(); + for tx in g.iter() { + let (cmd, rx) = ConnCommand::find_channel(pattern.clone()); + rxs.push(rx); + if let Err(_) = tx.send(cmd).await { + error!("can not send command"); + } + } + let mut res = Vec::new(); + for rx in rxs { + let item = rx.recv().await.unwrap(); + let item = (item.0.to_string(), item.1); + if item.1.len() > 0 { + res.push(item); + } + } + serde_json::to_string(&res).unwrap() + }), + ) .route( "/insert_frac", - axum::routing::put(|v: axum::extract::Json| async move { + get(get_empty).put(|v: extract::Json| async move { insert_frac.store(v.0, Ordering::Release); }), ) .route( "/insert_ivl_min", - axum::routing::put(|v: axum::extract::Json| async move { + put(|v: extract::Json| async move { insert_ivl_min.store(v.0, Ordering::Release); }), + ) + .route( + "/api/v1/status/buildinfo", + get(|| async { + let res = serde_json::json!({ + "status": "success", + "data": { + "version": "2.37", + "revision": "aaaaaaaaaaaaaaaaaaaaaaaaa", + "branch": "dev", + "buildUser": "empty", + "buildDate": "2022-07-14", + "goVersion": "go1" + } + }); + serde_json::to_string(&res).unwrap() + }), + ) + .route( + "/api/v1/query", + post( + |Query(params): Query>, parts: Parts| async move { + info!("/api/v1/query params {params:?} {parts:?}"); + }, + ), + ) + .fallback( + get(|parts: Parts, body: extract::RawBody| async move { + let bytes = hyper::body::to_bytes(body.0).await.unwrap(); + let s = String::from_utf8_lossy(&bytes); + info!("GET {parts:?} body: {s:?}"); + }) + .post(|parts: Parts, body: extract::RawBody| async move { + let bytes = hyper::body::to_bytes(body.0).await.unwrap(); + let s = String::from_utf8_lossy(&bytes); + info!("POST {parts:?} body: {s:?}"); + }) + .put(|parts: Parts, body: extract::RawBody| async move { + let bytes = hyper::body::to_bytes(body.0).await.unwrap(); + let s = String::from_utf8_lossy(&bytes); + info!("PUT {parts:?} body: {s:?}"); + }) + .delete(|parts: Parts, body: extract::RawBody| async move { + let bytes = hyper::body::to_bytes(body.0).await.unwrap(); + let s = String::from_utf8_lossy(&bytes); + info!("DELETE {parts:?} body: {s:?}"); + }), ); axum::Server::bind(&bind_to.parse().unwrap()) .serve(app.into_make_service())