diff --git a/.github/workflows/build-rhel7.yml b/.github/workflows/build-rhel7.yml index baadaf4..0383f6a 100644 --- a/.github/workflows/build-rhel7.yml +++ b/.github/workflows/build-rhel7.yml @@ -83,22 +83,20 @@ jobs: - run: find $GITHUB_WORKSPACE -type f -and \( -name \*.rs -or -name \*.toml \) - run: find ${{steps.wdset.outputs.gh}} -type f -and \( -name \*.rs -or -name \*.toml \) working-directory: ${{steps.wdset.outputs.gh}}/build - # - run: cargo build --release - # working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest - - run: mkdir -p target/release - working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/ - - run: cp /usr/bin/cat target/release/daqingest - working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/ + - run: cargo build --release + working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest + # - run: mkdir -p target/release && cp /usr/bin/cat target/release/daqingest + # working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/ - run: ls -l working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release - run: ./daqingest --version working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release - # - run: echo "daqingest_version=$(./daqingest --version)" >> "$GITHUB_OUTPUT" - # id: daqingest_version_set - # working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release - - run: echo "daqingest_version=0.0.0-dummy.0" >> "$GITHUB_OUTPUT" + - run: echo "daqingest_version=$(./daqingest --version)" >> "$GITHUB_OUTPUT" id: daqingest_version_set working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release + # - run: echo "daqingest_version=0.0.0-dummy.0" >> "$GITHUB_OUTPUT" + # id: daqingest_version_set + # working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest/target/release - uses: actions/upload-artifact@v3 with: name: daqingest-${{steps.daqingest_version_set.outputs.daqingest_version}} diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 98df7e3..234ed63 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -200,8 +200,6 @@ pub struct Daemon { chan_check_next: Option, search_tx: Sender, ioc_finder_jh: JoinHandle>, - datastore: Arc, - common_insert_item_queue: Arc, insert_queue_counter: Arc, count_unknown_address: usize, count_search_pending: usize, @@ -236,14 +234,16 @@ impl Daemon { .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let common_insert_item_queue = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); - // let common_insert_item_queue_2 = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); let insert_queue_counter = Arc::new(AtomicUsize::new(0)); // Insert queue hook let rx = inserthook::active_channel_insert_hook(common_insert_item_queue.receiver().unwrap()); let common_insert_item_queue_2 = rx; - let conn_set_ctrl = CaConnSet::start(channel_info_query_tx.clone()); + let conn_set_ctrl = CaConnSet::start( + common_insert_item_queue.sender().unwrap().inner().clone(), + channel_info_query_tx.clone(), + ); let ingest_commons = IngestCommons { pgconf: Arc::new(opts.pgconf.clone()), @@ -319,8 +319,6 @@ impl Daemon { chan_check_next: None, search_tx, ioc_finder_jh, - datastore, - common_insert_item_queue, insert_queue_counter, count_unknown_address: 0, count_search_pending: 0, diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 25aedd7..f4c4d44 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -13,6 +13,7 @@ use err::Error; use futures_util::FutureExt; use futures_util::StreamExt; use netpod::log::*; +use scywr::iteminsertqueue::QueryItem; use series::ChannelStatusSeriesId; use stats::CaConnStats; use std::collections::BTreeMap; @@ -30,6 +31,7 @@ pub struct CmdId(SocketAddrV4, usize); pub struct CaConnRes { sender: Sender, stats: Arc, + // TODO await on jh jh: JoinHandle>, } @@ -96,7 +98,10 @@ pub struct CaConnSet { } impl CaConnSet { - pub fn start(channel_info_query_tx: Sender) -> CaConnSetCtrl { + pub fn start( + storage_insert_tx: Sender, + channel_info_query_tx: Sender, + ) -> CaConnSetCtrl { let (connset_tx, connset_rx) = async_channel::bounded(10000); let connset = Self { ca_conn_ress: BTreeMap::new(), @@ -105,7 +110,7 @@ impl CaConnSet { channel_info_query_tx, shutdown: false, }; - // TODO use jh + // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); CaConnSetCtrl { tx: connset_tx } }