Added (singleton) fdaq engine class to be able to unregister the zmq
streamer and to avoid having more than one thing driving the fdaq box ...
This commit is contained in:
@@ -0,0 +1,73 @@
|
||||
/**
|
||||
*
|
||||
* Copyright 2014 Paul Scherrer Institute. All rights reserved.
|
||||
*
|
||||
* This code is free software: you can redistribute it and/or modify it under
|
||||
* the terms of the GNU Lesser General Public License as published by the Free
|
||||
* Software Foundation, either version 3 of the License, or (at your option) any
|
||||
* later version.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but without any
|
||||
* warranty; without even the implied warranty of merchantability or fitness for
|
||||
* a particular purpose. See the GNU Lesser General Public License for more
|
||||
* details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with this code. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
package ch.psi.fda.rest;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import javax.inject.Inject;
|
||||
|
||||
import ch.psi.fda.fdaq.FdaqService;
|
||||
import ch.psi.fda.rest.model.FdaqRequest;
|
||||
import ch.psi.fda.serializer.SerializerTXT;
|
||||
|
||||
import com.google.common.eventbus.AsyncEventBus;
|
||||
import com.google.common.eventbus.EventBus;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class FdaqEngine {
|
||||
|
||||
private FdaqService fdaq;
|
||||
private EventBus bus;
|
||||
private ZMQDataService zmqService;
|
||||
private boolean stream = false;
|
||||
|
||||
@Inject
|
||||
public FdaqEngine(ZMQDataService zmqService){
|
||||
this.zmqService = zmqService;
|
||||
}
|
||||
|
||||
public void acquire(String trackingId, FdaqRequest request){
|
||||
bus = new AsyncEventBus(Executors.newSingleThreadExecutor());
|
||||
fdaq = new FdaqService(bus);
|
||||
|
||||
SerializerTXT serializer = new SerializerTXT(new File(request.getFilename()));
|
||||
serializer.setShowDimensionHeader(false);
|
||||
bus.register(serializer);
|
||||
|
||||
stream = (request.getStream()!=null && request.getStream().getIds().length>0) ;
|
||||
if(stream){
|
||||
// Stream data via ZMQ
|
||||
zmqService.setTrackingId(trackingId);
|
||||
bus.register(zmqService);
|
||||
}
|
||||
|
||||
|
||||
fdaq.acquire();
|
||||
}
|
||||
|
||||
public void stop(){
|
||||
fdaq.stop();
|
||||
bus.unregister(zmqService);
|
||||
|
||||
// TODO check whether serializer also needs to be unregistered ...
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,7 @@ public class ResourceBinder extends AbstractBinder {
|
||||
bind(DefaultChannelService.class).to(ChannelService.class).in(Singleton.class);
|
||||
bind(AcquisitionConfiguration.class).to(AcquisitionConfiguration.class).in(Singleton.class);
|
||||
bind(AcquisitionEngine.class).to(AcquisitionEngine.class).in(Singleton.class);
|
||||
bind(FdaqEngine.class).to(FdaqEngine.class).in(Singleton.class);
|
||||
bind(ZMQDataService.class).to(ZMQDataService.class).in(Singleton.class);
|
||||
}
|
||||
|
||||
|
||||
@@ -19,9 +19,8 @@
|
||||
|
||||
package ch.psi.fda.rest.services;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.PUT;
|
||||
@@ -29,28 +28,20 @@ import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
||||
import com.google.common.eventbus.AsyncEventBus;
|
||||
import com.google.common.eventbus.EventBus;
|
||||
|
||||
import ch.psi.fda.fdaq.FdaqService;
|
||||
import ch.psi.fda.rest.model.FdaqRequest;
|
||||
import ch.psi.fda.serializer.SerializerTXT;
|
||||
|
||||
@Path("fdaq")
|
||||
public class FDAQService {
|
||||
|
||||
@Inject
|
||||
private FdaqService fdaq;
|
||||
|
||||
@PUT
|
||||
@Path("{trackingId}")
|
||||
@Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
|
||||
public void execute(@PathParam("trackingId") String trackingId, FdaqRequest request) throws InterruptedException {
|
||||
|
||||
EventBus bus = new AsyncEventBus(Executors.newSingleThreadExecutor());
|
||||
FdaqService fdaq = new FdaqService(bus);
|
||||
|
||||
SerializerTXT serializer = new SerializerTXT(new File(request.getFilename()));
|
||||
serializer.setShowDimensionHeader(false);
|
||||
bus.register(serializer);
|
||||
|
||||
fdaq.acquire();
|
||||
}
|
||||
|
||||
@@ -62,7 +53,6 @@ public class FDAQService {
|
||||
|
||||
@DELETE
|
||||
public void terminateAll(){
|
||||
FdaqService fdaq = new FdaqService(null);
|
||||
fdaq.stop();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user