diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 36a0bf1..59bd744 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -21,4 +21,5 @@ netpod = { path = "../netpod" } dbconn = { path = "../dbconn" } disk = { path = "../disk" } parse = { path = "../parse" } +netfetch = { path = "../netfetch" } taskrun = { path = "../taskrun" } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 1dcf674..f241bde 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -198,6 +198,12 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path == "/api/4/ca_connect_1" { + if req.method() == Method::GET { + Ok(ca_connect_1(req, &node_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else if path == "/api/4/channel/config" { if req.method() == Method::GET { Ok(channel_config(req, &node_config).await?) @@ -532,3 +538,20 @@ pub async fn channel_config(req: Request, node_config: &NodeConfigCached) .body(Body::from(serde_json::to_string(&res)?))?; Ok(ret) } + +pub async fn ca_connect_1(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let (head, _body) = req.into_parts(); + let params = netpod::query_params(head.uri.query()); + let addr = params.get("addr").unwrap().into(); + let res = netfetch::ca_connect_1(addr, node_config).await?; + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/jsonlines") + .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) { + Ok(mut item) => { + item.push('\n'); + Ok(item) + } + Err(e) => Err(e), + })))?; + Ok(ret) +} diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml new file mode 100644 index 0000000..12728fb --- /dev/null +++ b/netfetch/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "netfetch" +version = "0.0.1-a.0" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_cbor = "0.11.1" +tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio-stream = {version = "0.1.5", features = ["fs"]} +async-channel = "1.6" +bytes = "1.0.1" +arrayref = "0.3.6" +byteorder = "1.4.3" +futures-core = "0.3.14" +futures-util = "0.3.14" +err = { path = "../err" } +netpod = { path = "../netpod" } diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs new file mode 100644 index 0000000..41a905b --- /dev/null +++ b/netfetch/src/lib.rs @@ -0,0 +1,62 @@ +use async_channel::{bounded, Receiver}; +use bytes::{BufMut, BytesMut}; +use err::Error; +use futures_util::FutureExt; +use netpod::NodeConfigCached; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Message { + cmd: u16, + payload_len: u16, + type_type: u16, + data_len: u16, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum FetchItem { + Log(String), + Message(Message), +} + +pub async fn ca_connect_1( + addr: String, + _node_config: &NodeConfigCached, +) -> Result>, Error> { + let (tx, rx) = bounded(16); + let tx2 = tx.clone(); + tokio::task::spawn( + async move { + let mut conn = tokio::net::TcpStream::connect(addr).await?; + let (mut inp, mut out) = conn.split(); + tx.send(Ok(FetchItem::Log(format!("connected")))).await?; + let mut b2 = BytesMut::with_capacity(128); + b2.put_u16(0); + b2.put_u16(0); + b2.put_u16(0); + b2.put_u16(0xb); + b2.put_u32(0); + b2.put_u32(0); + out.write_all(&b2).await?; + tx.send(Ok(FetchItem::Log(format!("written")))).await?; + let mut buf = [0; 64]; + let n1 = inp.read(&mut buf).await?; + tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf)))) + .await?; + Ok::<_, Error>(()) + } + .then({ + move |item| async move { + match item { + Ok(_) => {} + Err(e) => { + tx2.send(Ok(FetchItem::Log(format!("Seeing error: {:?}", e)))).await?; + } + } + Ok::<_, Error>(()) + } + }), + ); + Ok(rx) +}