From 542379545be0cee190b3e6419167dfb71b4654e0 Mon Sep 17 00:00:00 2001 From: Simon Ebner Date: Wed, 8 Jan 2014 16:30:59 +0100 Subject: [PATCH] Added (singleton) fdaq engine class to be able to unregister the zmq streamer and to avoid having more than one thing driving the fdaq box ... --- .../main/java/ch/psi/fda/rest/FdaqEngine.java | 73 +++++++++++++++++++ .../java/ch/psi/fda/rest/ResourceBinder.java | 1 + .../ch/psi/fda/rest/services/FDAQService.java | 18 +---- 3 files changed, 78 insertions(+), 14 deletions(-) create mode 100644 ch.psi.fda/src/main/java/ch/psi/fda/rest/FdaqEngine.java diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/FdaqEngine.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/FdaqEngine.java new file mode 100644 index 0000000..a97bbeb --- /dev/null +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/FdaqEngine.java @@ -0,0 +1,73 @@ +/** + * + * Copyright 2014 Paul Scherrer Institute. All rights reserved. + * + * This code is free software: you can redistribute it and/or modify it under + * the terms of the GNU Lesser General Public License as published by the Free + * Software Foundation, either version 3 of the License, or (at your option) any + * later version. + * + * This code is distributed in the hope that it will be useful, but without any + * warranty; without even the implied warranty of merchantability or fitness for + * a particular purpose. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this code. If not, see . + * + */ +package ch.psi.fda.rest; + +import java.io.File; +import java.util.concurrent.Executors; + +import javax.inject.Inject; + +import ch.psi.fda.fdaq.FdaqService; +import ch.psi.fda.rest.model.FdaqRequest; +import ch.psi.fda.serializer.SerializerTXT; + +import com.google.common.eventbus.AsyncEventBus; +import com.google.common.eventbus.EventBus; + +/** + * + */ +public class FdaqEngine { + + private FdaqService fdaq; + private EventBus bus; + private ZMQDataService zmqService; + private boolean stream = false; + + @Inject + public FdaqEngine(ZMQDataService zmqService){ + this.zmqService = zmqService; + } + + public void acquire(String trackingId, FdaqRequest request){ + bus = new AsyncEventBus(Executors.newSingleThreadExecutor()); + fdaq = new FdaqService(bus); + + SerializerTXT serializer = new SerializerTXT(new File(request.getFilename())); + serializer.setShowDimensionHeader(false); + bus.register(serializer); + + stream = (request.getStream()!=null && request.getStream().getIds().length>0) ; + if(stream){ + // Stream data via ZMQ + zmqService.setTrackingId(trackingId); + bus.register(zmqService); + } + + + fdaq.acquire(); + } + + public void stop(){ + fdaq.stop(); + bus.unregister(zmqService); + + // TODO check whether serializer also needs to be unregistered ... + } +} diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/ResourceBinder.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/ResourceBinder.java index e52a223..095f0ce 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/rest/ResourceBinder.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/ResourceBinder.java @@ -17,6 +17,7 @@ public class ResourceBinder extends AbstractBinder { bind(DefaultChannelService.class).to(ChannelService.class).in(Singleton.class); bind(AcquisitionConfiguration.class).to(AcquisitionConfiguration.class).in(Singleton.class); bind(AcquisitionEngine.class).to(AcquisitionEngine.class).in(Singleton.class); + bind(FdaqEngine.class).to(FdaqEngine.class).in(Singleton.class); bind(ZMQDataService.class).to(ZMQDataService.class).in(Singleton.class); } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/FDAQService.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/FDAQService.java index 58058d3..11e44b3 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/FDAQService.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/FDAQService.java @@ -19,9 +19,8 @@ package ch.psi.fda.rest.services; -import java.io.File; -import java.util.concurrent.Executors; +import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.PUT; @@ -29,28 +28,20 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.core.MediaType; -import com.google.common.eventbus.AsyncEventBus; -import com.google.common.eventbus.EventBus; import ch.psi.fda.fdaq.FdaqService; import ch.psi.fda.rest.model.FdaqRequest; -import ch.psi.fda.serializer.SerializerTXT; @Path("fdaq") public class FDAQService { + @Inject + private FdaqService fdaq; + @PUT @Path("{trackingId}") @Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) public void execute(@PathParam("trackingId") String trackingId, FdaqRequest request) throws InterruptedException { - - EventBus bus = new AsyncEventBus(Executors.newSingleThreadExecutor()); - FdaqService fdaq = new FdaqService(bus); - - SerializerTXT serializer = new SerializerTXT(new File(request.getFilename())); - serializer.setShowDimensionHeader(false); - bus.register(serializer); - fdaq.acquire(); } @@ -62,7 +53,6 @@ public class FDAQService { @DELETE public void terminateAll(){ - FdaqService fdaq = new FdaqService(null); fdaq.stop(); } }