Removed unnecessary queue in AcquisitionEngine

This commit is contained in:
2014-01-15 10:15:00 +01:00
parent c9ff7ccf2c
commit ae78b8c7a7
5 changed files with 137 additions and 160 deletions

View File

@@ -119,8 +119,6 @@ import ch.psi.jcae.util.ComparatorREGEX;
/**
* Data acquisition engine for performing scans
* Mapping is specific to scan model version 1.0
* @author ebner
*
*/
public class Acquisition {
@@ -269,7 +267,13 @@ public class Acquisition {
* @throws InterruptedException
*/
public void execute() throws InterruptedException {
String hostname;
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
hostname="unknown";
}
try{
active = true;
@@ -277,38 +281,16 @@ public class Acquisition {
actionLoop.execute();
actionLoop.cleanup();
// Send notifications out to all recipients that want to have success notifications
try {
String hostname = InetAddress.getLocalHost().getHostName();
notificationAgent.sendNotification("Notification - FDA Execution Finished", "The execution of the FDA on '"+hostname+"' for file '"+datafile.getName()+"' finished successfully\n\nYou received this message because you are listed in the notification list for this data acquisition configuration.", false,true);
} catch (UnknownHostException e1) {
logger.log(Level.WARNING, "Unable to send notification", e1);
}
notificationAgent.sendNotification("Notification - FDA Execution Finished", "The execution of the FDA on '"+hostname+"' for file '"+datafile.getName()+"' finished successfully\n\nYou received this message because you are listed in the notification list for this data acquisition configuration.", false,true);
}
catch(RuntimeException e){
logger.log(Level.WARNING, "Execution failed: ", e);
try {
String hostname = InetAddress.getLocalHost().getHostName();
notificationAgent.sendNotification("Notification - FDA Execution Failed", "The execution of the FDA failed on '"+hostname+"' for file '"+datafile.getName()+"'\n\nYou received this message because you are listed in the notification list for this data acquisition configuration.", true,false);
} catch (UnknownHostException e1) {
logger.log(Level.WARNING, "Unable to send notification", e1);
}
notificationAgent.sendNotification("Notification - FDA Execution Failed", "The execution of the FDA failed on '"+hostname+"' for file '"+datafile.getName()+"'\n\nYou received this message because you are listed in the notification list for this data acquisition configuration.", true,false);
throw e;
}
catch(InterruptedException e){
logger.log(Level.WARNING, "Execution interrupted: ", e);
// Execution got aborted.
try {
String hostname = InetAddress.getLocalHost().getHostName();
notificationAgent.sendNotification("Notification - FDA Execution was aborted", "The execution of the FDA on '"+hostname+"' for file '"+datafile.getName()+"' was aborted\n\nYou received this message because you are listed in the notification list for this data acquisition configuration.", false, true);
} catch (UnknownHostException e1) {
logger.log(Level.WARNING, "Unable to send notification", e1);
}
// throw e;
notificationAgent.sendNotification("Notification - FDA Execution was aborted", "The execution of the FDA on '"+hostname+"' for file '"+datafile.getName()+"' was aborted\n\nYou received this message because you are listed in the notification list for this data acquisition configuration.", false, true);
}
finally{
active = false;
@@ -340,10 +322,6 @@ public class Acquisition {
// Clear global variables Jython
jVariableDictionary.clear();
// // Destroy the CA context
// cservice.destroy();
// logger.fine("ChannelService destroyed");
// Remove log handler
if(logHandler!=null){
logger.fine("Close log handler");
@@ -352,15 +330,8 @@ public class Acquisition {
}
}
/**
* Abort acquisition
*/
public void abort(){
actionLoop.abort();
// if(acquisitionThread!=null){
// acquisitionThread.interrupt();
// }
}
public String getDatafileName(){

View File

@@ -36,10 +36,6 @@ import ch.psi.fda.visualizer.XYZSeriesDataFilter;
import ch.psi.plot.xyz.MatrixPlot;
import ch.psi.plot.xyz.MatrixPlotData;
/**
* @author ebner
*
*/
public class VisualizationMapper {
@@ -69,6 +65,12 @@ public class VisualizationMapper {
return id;
}
/**
* Converts a list of visualizations into a list of data filters which can be applied to the data stream
*
* @param vl
* @return
*/
public static List<SeriesDataFilter> mapVisualizations(List<Visualization> vl){
List<SeriesDataFilter> filters = new ArrayList<SeriesDataFilter>();

View File

@@ -18,17 +18,18 @@
*/
package ch.psi.fda.rest;
import java.util.concurrent.BlockingQueue;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.aq.Acquisition;
import ch.psi.fda.aq.AcquisitionConfiguration;
import ch.psi.fda.model.v1.Configuration;
import ch.psi.jcae.ChannelService;
@@ -38,69 +39,18 @@ import ch.psi.jcae.ChannelService;
*/
public class AcquisitionEngine {
private static final Logger logger = Logger.getLogger(AcquisitionEngine.class.getName());
private AcquisitionConfiguration config;
private ChannelService cService;
private final AcquisitionConfiguration config;
private final ChannelService cService;
private final ZMQDataService zmqService;
/**
* Variable holding the current acquisition
*/
private volatile Acquisition acquisition;
private volatile ExecutionRequest currentRequest;
private ExecutorService eservice;
private BlockingQueue<ExecutionRequest> requests = new LinkedBlockingQueue<>();
private final ExecutorService eservice = Executors.newSingleThreadExecutor();
private final Map<String, Future<?>> erequests = new HashMap<>();
@Inject
public AcquisitionEngine(ChannelService cService, ZMQDataService zmqService, AcquisitionConfiguration config) {
this.zmqService = zmqService;
this.cService = cService;
this.config = config;
// Start main execution loop
eservice = Executors.newSingleThreadExecutor();
eservice.execute(new Runnable() {
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
try {
ExecutionRequest r = requests.take();
logger.info("Execute - " + r.getTrackingId());
EventBus ebus = new EventBus();
// Provide tracking id to data service and register the service to the event bus
AcquisitionEngine.this.zmqService.setTrackingId(r.getTrackingId());
ebus.register(AcquisitionEngine.this.zmqService);
synchronized (AcquisitionEngine.this) { // synchronize access to acquisition object via the AcquisitionEngine object
acquisition = new Acquisition(AcquisitionEngine.this.cService, AcquisitionEngine.this.config);
currentRequest = r;
}
acquisition.initalize(ebus, r.getConfiguration());
acquisition.execute();
logger.info("" + r.getTrackingId() + " done");
// Cleanup
ebus.unregister(AcquisitionEngine.this.zmqService);
} finally {
acquisition.destroy();
synchronized (AcquisitionEngine.this) {
acquisition = null;
currentRequest = null;
}
}
}
} catch (InterruptedException e) {
}
}
});
}
/**
@@ -111,38 +61,29 @@ public class AcquisitionEngine {
* @return
*/
public void submit(String trackingId, Configuration configuration){
// Check whether trackingId already exists
for(ExecutionRequest req: requests){
if(req.getTrackingId().equals(trackingId)){
throw new RuntimeException("TrackingId "+trackingId+" is already submitted");
}
if(erequests.keySet().contains(trackingId) && !erequests.get(trackingId).isDone()){ // Allow finished tracking ids to be reused for new scans
throw new IllegalArgumentException("A request with tracking ID "+trackingId+" is already submitted");
}
ExecutionRequest r = new ExecutionRequest(trackingId, configuration);
try{
requests.put(r);
} catch (InterruptedException e) {
}
AcquisitionJob job = new AcquisitionJob(cService, zmqService, config, trackingId, configuration);
Future<?> future = eservice.submit(job);
erequests.put(trackingId, future);
}
public void terminateAll(){
requests.clear();
terminate();
}
/**
* Terminate the currently executed request
*/
private void terminate(){
synchronized(this){
if(acquisition==null){
return;
public void terminateAll() {
for(Future<?> f: erequests.values()){
f.cancel(true);
}
for(Future<?> f: erequests.values()){
try{
f.get(10, TimeUnit.SECONDS);
}
catch(CancellationException | InterruptedException | ExecutionException | TimeoutException e){
// Nothing to be done here
}
logger.info("Stop current acquisition");
acquisition.abort();
}
}
@@ -153,39 +94,24 @@ public class AcquisitionEngine {
*/
public void terminate(String trackingId){
// If request is currently executed terminate it
if(currentRequest!=null && currentRequest.getTrackingId().equals(trackingId)){
terminate();
return;
final Future<?> f = erequests.get(trackingId);
if(f==null){
throw new IllegalArgumentException("There is no request running/pending with tracking id "+trackingId);
}
// Remove request from request queue
ExecutionRequest rremove = null;
for(ExecutionRequest r:requests){
if(r.getTrackingId().equals(trackingId)){
rremove = r; // We have to split the filtering and termination as we otherwise get a concurrent access exception.
break;
}
f.cancel(true);
try{
f.get(10, TimeUnit.SECONDS);
}
if(rremove!=null){
boolean b = requests.remove(rremove);
// There is a chance that between the upper loop and here the request
// already got dequeued. This check ensures that the requests got removed
// if not it got dequeued and therefore we have to terminate the current
// execution.
if(!b){
terminate();
}
catch(CancellationException | InterruptedException | ExecutionException | TimeoutException e){
// Nothing to be done here
}
}
/**
* @return
*/
public boolean isActive() {
if(acquisition==null){
return false;
public boolean isActive(String trackingId) {
Future<?> f = erequests.get(trackingId);
if(f==null){
throw new IllegalArgumentException("There is no request for tracking id "+trackingId);
}
return acquisition.isActive();
return !f.isDone();
}
}

View File

@@ -0,0 +1,77 @@
/**
*
* Copyright 2014 Paul Scherrer Institute. All rights reserved.
*
* This code is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* This code is distributed in the hope that it will be useful, but without any
* warranty; without even the implied warranty of merchantability or fitness for
* a particular purpose. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.rest;
import java.util.logging.Logger;
import ch.psi.fda.aq.Acquisition;
import ch.psi.fda.aq.AcquisitionConfiguration;
import ch.psi.fda.model.v1.Configuration;
import ch.psi.jcae.ChannelService;
import com.google.common.eventbus.EventBus;
public class AcquisitionJob implements Runnable {
private static final Logger logger = Logger.getLogger(AcquisitionJob.class.getName());
private final AcquisitionConfiguration config;
private final ChannelService cService;
private final ZMQDataService zmqService;
private final String trackingId;
private final Configuration configuration;
public AcquisitionJob(ChannelService cService, ZMQDataService zmqService, AcquisitionConfiguration config, String trackingId, Configuration configuration) {
this.zmqService = zmqService;
this.cService = cService;
this.config = config;
this.trackingId = trackingId;
this.configuration = configuration;
}
@Override
public void run() {
Acquisition acquisition = null;
try {
logger.info("Execute - " + trackingId);
EventBus ebus = new EventBus();
zmqService.setTrackingId(trackingId);
ebus.register(zmqService);
acquisition = new Acquisition(cService, config);
acquisition.initalize(ebus, configuration);
acquisition.execute();
logger.info("" + trackingId + " done");
// Cleanup
ebus.unregister(zmqService);
} catch (InterruptedException e) {
logger.info("Execution of "+trackingId+ " was interrupted");
} finally {
if(acquisition!=null){
acquisition.destroy();
}
}
}
}

View File

@@ -56,8 +56,9 @@ public class FDAService {
}
@GET
public boolean isActive(){
return aengine.isActive();
@Path("{trackingId}/running")
public boolean isActive(String trackingId){
return aengine.isActive(trackingId);
}
}