diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index dc943d8..98ab63d 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.5-aa.6" +version = "0.5.5-aa.7" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/dbconn/src/dbconn.rs b/crates/dbconn/src/dbconn.rs index 9c465e6..4a3d2cf 100644 --- a/crates/dbconn/src/dbconn.rs +++ b/crates/dbconn/src/dbconn.rs @@ -66,7 +66,7 @@ pub async fn delay_io_medium() { } pub async fn create_connection(db_config: &Database) -> Result<(PgClient, JoinHandle>), Error> { - warn!("create_connection creating postgres connection"); + info!("create_connection creating postgres connection"); // TODO use a common already running worker pool for these queries: let d = db_config; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name); diff --git a/crates/httpret/Cargo.toml b/crates/httpret/Cargo.toml index e7d43ad..affe900 100644 --- a/crates/httpret/Cargo.toml +++ b/crates/httpret/Cargo.toml @@ -15,9 +15,10 @@ http = "1.2.0" http-body-util = { version = "0.1.2" } hyper = { version = "1.6.0", features = ["http1", "http2", "client", "server"] } hyper-util = { version = "0.1.10", features = ["http1", "http2", "client", "server"] } +h3 = { version = "0.0.6", optional = true } h3-quinn = { version = "0.0.7", optional = true } quinn = { version = "0.11.6", optional = true, default-features = false, features = ["log", "platform-verifier", "runtime-tokio", "rustls-ring"] } -rustls = { version = "0.23.22", optional = true, default-features = false, features = ["logging", "std", "tls12", "ring"] } +rustls = { version = "0.23.22", optional = true, default-features = false, features = ["logging", "std", "ring"] } rustls-pki-types = { version = "1.11.0", optional = true } pin-project = "1.1.9" bytes = "1.10.0" @@ -53,7 +54,7 @@ daqbuf-redis = { path = "../daqbuf-redis" } httpclient = { path = "../httpclient" } [features] -default = [] -#default = ["http3"] +#default = [] +default = ["http3"] prometheus_endpoint = [] -http3 = ["h3-quinn", "quinn", "rustls", "rustls-pki-types"] +http3 = ["h3", "h3-quinn", "quinn", "rustls", "rustls-pki-types"] diff --git a/crates/httpret/src/http3.rs b/crates/httpret/src/http3.rs index ab07252..ebbdff2 100644 --- a/crates/httpret/src/http3.rs +++ b/crates/httpret/src/http3.rs @@ -1,3 +1,5 @@ +use bytes::Bytes; +use http::StatusCode; use quinn; use quinn::crypto::rustls::QuicServerConfig; use quinn::Endpoint; @@ -12,6 +14,8 @@ use std::task::Context; use std::task::Poll; use taskrun::tokio; +const EARLY_DATA_MAX: u32 = u32::MAX; + macro_rules! info { ($($arg:expr),*) => ( if true { netpod::log::info!($($arg),*); } ); } autoerr::create_error_v1!( @@ -19,6 +23,12 @@ autoerr::create_error_v1!( enum variants { NoRuntime, IO(#[from] std::io::Error), + H3(#[from] h3::Error), + Http(#[from] http::Error), + Pem(#[from] rustls::pki_types::pem::Error), + Rustls(#[from] rustls::Error), + NoInitialCipherSuite(#[from] quinn::crypto::rustls::NoInitialCipherSuite), + QuinnConnection(#[from] quinn::ConnectionError), }, ); @@ -27,60 +37,63 @@ pub struct Http3Support { } impl Http3Support { - pub async fn new(bind_addr: SocketAddr) -> Result { - let key = match PemObject::from_pem_file("key.pem") { - Ok(x) => x, - Err(e) => { - info!("key error {}", e); - return Ok(Self::dummy()); - } - }; - let cert = match PemObject::from_pem_file("cert.pem") { - Ok(x) => x, - Err(e) => { - info!("cert error {}", e); - return Ok(Self::dummy()); - } - }; - let conf = EndpointConfig::default(); - let mut tls_conf = match rustls::ServerConfig::builder() + pub async fn new_or_dummy(bind_addr: SocketAddr) -> Result { + Ok(Self::new(bind_addr).await.unwrap_or_else(|e| { + info!("error {}", e); + Self::dummy() + })) + } + + fn dummy() -> Self { + Self { ep: None } + } + + async fn new(bind_addr: SocketAddr) -> Result { + let key = PemObject::from_pem_file("key.pem")?; + let cert = PemObject::from_pem_file("cert.pem")?; + let mut tls_conf = rustls::ServerConfig::builder() .with_no_client_auth() - .with_single_cert(vec![cert], key) - { - Ok(x) => x, - Err(e) => { - info!("tls config error {}", e); - return Ok(Self::dummy()); - } - }; - tls_conf.alpn_protocols = vec![b"h3".to_vec()]; - tls_conf.max_early_data_size = u32::MAX; + .with_single_cert(vec![cert], key)?; + tls_conf.alpn_protocols = vec![b"HTTP/3".to_vec(), b"h3".to_vec()]; + tls_conf.max_early_data_size = EARLY_DATA_MAX; let tls_conf = tls_conf; - let v = match QuicServerConfig::try_from(tls_conf) { - Ok(x) => x, - Err(e) => { - info!("config error {}", e); - return Ok(Self::dummy()); - } - }; + let v = QuicServerConfig::try_from(tls_conf)?; + let quic_conf = Arc::new(v); + let conf_srv = quinn::ServerConfig::with_crypto(quic_conf); + let ep2 = Endpoint::server(conf_srv, bind_addr)?; + { + let ep = ep2.clone(); + tokio::task::spawn(Self::accept(ep)); + } + let ret = Self { ep: Some(ep2) }; + Ok(ret) + } + + async fn new_plain_quic(bind_addr: SocketAddr) -> Result { + let key = PemObject::from_pem_file("key.pem")?; + let cert = PemObject::from_pem_file("cert.pem")?; + let conf = EndpointConfig::default(); + let mut tls_conf = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(vec![cert], key)?; + tls_conf.alpn_protocols = vec![b"h3".to_vec()]; + tls_conf.max_early_data_size = EARLY_DATA_MAX; + let tls_conf = tls_conf; + let v = QuicServerConfig::try_from(tls_conf)?; let quic_conf = Arc::new(v); let conf_srv = quinn::ServerConfig::with_crypto(quic_conf); let sock = std::net::UdpSocket::bind(bind_addr)?; info!("h3 sock {:?}", sock); let rt = quinn::default_runtime().ok_or_else(|| Error::NoRuntime)?; - let ep = Endpoint::new(conf, Some(conf_srv), sock, rt)?; + let ep1 = Endpoint::new(conf, Some(conf_srv.clone()), sock, rt)?; { - let ep = ep.clone(); + let ep = ep1.clone(); tokio::task::spawn(Self::accept(ep)); } - let ret = Self { ep: Some(ep) }; + let ret = Self { ep: Some(ep1) }; Ok(ret) } - fn dummy() -> Self { - Self { ep: None } - } - pub async fn wait_idle(self) -> () { if let Some(ep) = self.ep.as_ref() { ep.close(quinn::VarInt::from_u32(1), b"shutdown"); @@ -95,37 +108,39 @@ impl Http3Support { } } - async fn handle_incoming(inc: Incoming) { - info!("new incoming {:?}", inc.remote_address()); - let conn = match inc.await { - Ok(x) => x, + async fn handle_incoming(inc: Incoming) -> Result<(), Error> { + match Self::handle_incoming_inner(inc).await { + Ok(x) => Ok(x), Err(e) => { - info!("connection error {}", e); - return; + info!("error handle_connection {}", e); + Err(e) } - }; - let fut1 = { - let conn = conn.clone(); - async move { - let bi = conn.accept_bi().await; - info!("got bi {:?}", bi); - match bi { - Ok(mut v) => { - v.0.write(b"some-data").await; - } - Err(e) => {} - } - } - }; - let fut2 = { - let conn = conn.clone(); - async move { - let uni = conn.accept_uni().await; - info!("got uni {:?}", uni); - } - }; - tokio::spawn(fut1); - tokio::spawn(fut2); + } + } + + async fn handle_incoming_inner(inc: Incoming) -> Result<(), Error> { + let addr_remote = inc.remote_address(); + info!("new incoming {:?}", addr_remote); + let conn1 = inc.accept()?.await?; + let conn2 = h3_quinn::Connection::new(conn1); + let mut conn3 = h3::server::builder().build::<_, Bytes>(conn2).await?; + while let Some((req, mut stream)) = conn3.accept().await? { + let (head, _body) = req.into_parts(); + info!( + "see request {} {:?} {:?} {:?}", + addr_remote, head.method, head.uri, head.headers + ); + let res = http::Response::builder() + .version(http::Version::HTTP_3) + .status(StatusCode::OK) + .header("x-daqbuf-tmp", "8e4b217") + .body(())?; + stream.send_response(res).await?; + stream.send_data(Bytes::from_static(b"2025-02-05T16:37:12Z")).await?; + stream.finish().await?; + info!("response sent {}", addr_remote); + } + Ok(()) } } diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 652d946..11d258e 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -160,7 +160,7 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res let shared_res = Arc::new(shared_res); use std::str::FromStr; let bind_addr = SocketAddr::from_str(&format!("{}:{}", ncc.node.listen(), ncc.node.port))?; - let http3 = http3::Http3Support::new(bind_addr.clone()).await?; + let http3 = http3::Http3Support::new_or_dummy(bind_addr.clone()).await?; // tokio::net::TcpSocket::new_v4()?.listen(200)? let listener = TcpListener::bind(bind_addr).await?; loop { diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index bcf4faa..a98a7c2 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -80,7 +80,7 @@ pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) - status_board_init(); use std::str::FromStr; let bind_addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?; - let http3 = http3::Http3Support::new(bind_addr.clone()).await?; + let http3 = http3::Http3Support::new_or_dummy(bind_addr.clone()).await?; let listener = TcpListener::bind(bind_addr).await?; loop { let (stream, addr) = match listener.accept().await { diff --git a/crates/scyllaconn/src/conn.rs b/crates/scyllaconn/src/conn.rs index 3057757..888270c 100644 --- a/crates/scyllaconn/src/conn.rs +++ b/crates/scyllaconn/src/conn.rs @@ -18,7 +18,7 @@ pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result Result { - warn!("creating scylla connection"); + info!("creating scylla connection"); let scy = scylla::SessionBuilder::new() .known_nodes(&scyconf.hosts) .default_execution_profile_handle(