From ae78b8c7a7e260e0f7b453fd4f89ccb7cd17dba3 Mon Sep 17 00:00:00 2001 From: Simon Ebner Date: Wed, 15 Jan 2014 10:15:00 +0100 Subject: [PATCH] Removed unnecessary queue in AcquisitionEngine --- .../main/java/ch/psi/fda/aq/Acquisition.java | 49 ++---- .../ch/psi/fda/aq/VisualizationMapper.java | 10 +- .../ch/psi/fda/rest/AcquisitionEngine.java | 156 +++++------------- .../java/ch/psi/fda/rest/AcquisitionJob.java | 77 +++++++++ .../ch/psi/fda/rest/services/FDAService.java | 5 +- 5 files changed, 137 insertions(+), 160 deletions(-) create mode 100644 ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionJob.java diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java index a153413..3d731ff 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java @@ -119,8 +119,6 @@ import ch.psi.jcae.util.ComparatorREGEX; /** * Data acquisition engine for performing scans * Mapping is specific to scan model version 1.0 - * @author ebner - * */ public class Acquisition { @@ -269,7 +267,13 @@ public class Acquisition { * @throws InterruptedException */ public void execute() throws InterruptedException { - + String hostname; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + hostname="unknown"; + } + try{ active = true; @@ -277,38 +281,16 @@ public class Acquisition { actionLoop.execute(); actionLoop.cleanup(); - // Send notifications out to all recipients that want to have success notifications - try { - String hostname = InetAddress.getLocalHost().getHostName(); - notificationAgent.sendNotification("Notification - FDA Execution Finished", "The execution of the FDA on '"+hostname+"' for file '"+datafile.getName()+"' finished successfully\n\nYou received this message because you are listed in the notification list for this data acquisition configuration.", false,true); - } catch (UnknownHostException e1) { - logger.log(Level.WARNING, "Unable to send notification", e1); - } + notificationAgent.sendNotification("Notification - FDA Execution Finished", "The execution of the FDA on '"+hostname+"' for file '"+datafile.getName()+"' finished successfully\n\nYou received this message because you are listed in the notification list for this data acquisition configuration.", false,true); } catch(RuntimeException e){ logger.log(Level.WARNING, "Execution failed: ", e); - - try { - String hostname = InetAddress.getLocalHost().getHostName(); - notificationAgent.sendNotification("Notification - FDA Execution Failed", "The execution of the FDA failed on '"+hostname+"' for file '"+datafile.getName()+"'\n\nYou received this message because you are listed in the notification list for this data acquisition configuration.", true,false); - } catch (UnknownHostException e1) { - logger.log(Level.WARNING, "Unable to send notification", e1); - } - + notificationAgent.sendNotification("Notification - FDA Execution Failed", "The execution of the FDA failed on '"+hostname+"' for file '"+datafile.getName()+"'\n\nYou received this message because you are listed in the notification list for this data acquisition configuration.", true,false); throw e; } catch(InterruptedException e){ logger.log(Level.WARNING, "Execution interrupted: ", e); - - // Execution got aborted. - try { - String hostname = InetAddress.getLocalHost().getHostName(); - notificationAgent.sendNotification("Notification - FDA Execution was aborted", "The execution of the FDA on '"+hostname+"' for file '"+datafile.getName()+"' was aborted\n\nYou received this message because you are listed in the notification list for this data acquisition configuration.", false, true); - } catch (UnknownHostException e1) { - logger.log(Level.WARNING, "Unable to send notification", e1); - } -// throw e; - + notificationAgent.sendNotification("Notification - FDA Execution was aborted", "The execution of the FDA on '"+hostname+"' for file '"+datafile.getName()+"' was aborted\n\nYou received this message because you are listed in the notification list for this data acquisition configuration.", false, true); } finally{ active = false; @@ -340,10 +322,6 @@ public class Acquisition { // Clear global variables Jython jVariableDictionary.clear(); -// // Destroy the CA context -// cservice.destroy(); -// logger.fine("ChannelService destroyed"); - // Remove log handler if(logHandler!=null){ logger.fine("Close log handler"); @@ -352,15 +330,8 @@ public class Acquisition { } } - /** - * Abort acquisition - */ public void abort(){ - actionLoop.abort(); -// if(acquisitionThread!=null){ -// acquisitionThread.interrupt(); -// } } public String getDatafileName(){ diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/VisualizationMapper.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/VisualizationMapper.java index 24e327b..79a6821 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/VisualizationMapper.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/VisualizationMapper.java @@ -36,10 +36,6 @@ import ch.psi.fda.visualizer.XYZSeriesDataFilter; import ch.psi.plot.xyz.MatrixPlot; import ch.psi.plot.xyz.MatrixPlotData; -/** - * @author ebner - * - */ public class VisualizationMapper { @@ -69,6 +65,12 @@ public class VisualizationMapper { return id; } + /** + * Converts a list of visualizations into a list of data filters which can be applied to the data stream + * + * @param vl + * @return + */ public static List mapVisualizations(List vl){ List filters = new ArrayList(); diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionEngine.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionEngine.java index f288f5b..210af7e 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionEngine.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionEngine.java @@ -18,17 +18,18 @@ */ package ch.psi.fda.rest; -import java.util.concurrent.BlockingQueue; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.logging.Logger; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.inject.Inject; -import com.google.common.eventbus.EventBus; - -import ch.psi.fda.aq.Acquisition; import ch.psi.fda.aq.AcquisitionConfiguration; import ch.psi.fda.model.v1.Configuration; import ch.psi.jcae.ChannelService; @@ -38,69 +39,18 @@ import ch.psi.jcae.ChannelService; */ public class AcquisitionEngine { - private static final Logger logger = Logger.getLogger(AcquisitionEngine.class.getName()); - - private AcquisitionConfiguration config; - private ChannelService cService; + private final AcquisitionConfiguration config; + private final ChannelService cService; private final ZMQDataService zmqService; - /** - * Variable holding the current acquisition - */ - private volatile Acquisition acquisition; - private volatile ExecutionRequest currentRequest; - - private ExecutorService eservice; - - - private BlockingQueue requests = new LinkedBlockingQueue<>(); + private final ExecutorService eservice = Executors.newSingleThreadExecutor(); + private final Map> erequests = new HashMap<>(); @Inject public AcquisitionEngine(ChannelService cService, ZMQDataService zmqService, AcquisitionConfiguration config) { this.zmqService = zmqService; this.cService = cService; this.config = config; - - // Start main execution loop - eservice = Executors.newSingleThreadExecutor(); - eservice.execute(new Runnable() { - @Override - public void run() { - try { - while (!Thread.currentThread().isInterrupted()) { - try { - ExecutionRequest r = requests.take(); - - logger.info("Execute - " + r.getTrackingId()); - - EventBus ebus = new EventBus(); - // Provide tracking id to data service and register the service to the event bus - AcquisitionEngine.this.zmqService.setTrackingId(r.getTrackingId()); - ebus.register(AcquisitionEngine.this.zmqService); - - synchronized (AcquisitionEngine.this) { // synchronize access to acquisition object via the AcquisitionEngine object - acquisition = new Acquisition(AcquisitionEngine.this.cService, AcquisitionEngine.this.config); - currentRequest = r; - } - acquisition.initalize(ebus, r.getConfiguration()); - acquisition.execute(); - logger.info("" + r.getTrackingId() + " done"); - - // Cleanup - ebus.unregister(AcquisitionEngine.this.zmqService); - } finally { - acquisition.destroy(); - - synchronized (AcquisitionEngine.this) { - acquisition = null; - currentRequest = null; - } - } - } - } catch (InterruptedException e) { - } - } - }); } /** @@ -111,38 +61,29 @@ public class AcquisitionEngine { * @return */ public void submit(String trackingId, Configuration configuration){ - - // Check whether trackingId already exists - for(ExecutionRequest req: requests){ - if(req.getTrackingId().equals(trackingId)){ - throw new RuntimeException("TrackingId "+trackingId+" is already submitted"); - } + + if(erequests.keySet().contains(trackingId) && !erequests.get(trackingId).isDone()){ // Allow finished tracking ids to be reused for new scans + throw new IllegalArgumentException("A request with tracking ID "+trackingId+" is already submitted"); } - ExecutionRequest r = new ExecutionRequest(trackingId, configuration); - try{ - requests.put(r); - } catch (InterruptedException e) { - } + AcquisitionJob job = new AcquisitionJob(cService, zmqService, config, trackingId, configuration); + Future future = eservice.submit(job); + erequests.put(trackingId, future); } - public void terminateAll(){ - requests.clear(); - terminate(); - } - - /** - * Terminate the currently executed request - */ - private void terminate(){ - synchronized(this){ - if(acquisition==null){ - return; + public void terminateAll() { + for(Future f: erequests.values()){ + f.cancel(true); + } + + for(Future f: erequests.values()){ + try{ + f.get(10, TimeUnit.SECONDS); + } + catch(CancellationException | InterruptedException | ExecutionException | TimeoutException e){ + // Nothing to be done here } - - logger.info("Stop current acquisition"); - acquisition.abort(); } } @@ -153,39 +94,24 @@ public class AcquisitionEngine { */ public void terminate(String trackingId){ - // If request is currently executed terminate it - if(currentRequest!=null && currentRequest.getTrackingId().equals(trackingId)){ - terminate(); - return; + final Future f = erequests.get(trackingId); + if(f==null){ + throw new IllegalArgumentException("There is no request running/pending with tracking id "+trackingId); } - - // Remove request from request queue - ExecutionRequest rremove = null; - for(ExecutionRequest r:requests){ - if(r.getTrackingId().equals(trackingId)){ - rremove = r; // We have to split the filtering and termination as we otherwise get a concurrent access exception. - break; - } + f.cancel(true); + try{ + f.get(10, TimeUnit.SECONDS); } - if(rremove!=null){ - boolean b = requests.remove(rremove); - // There is a chance that between the upper loop and here the request - // already got dequeued. This check ensures that the requests got removed - // if not it got dequeued and therefore we have to terminate the current - // execution. - if(!b){ - terminate(); - } + catch(CancellationException | InterruptedException | ExecutionException | TimeoutException e){ + // Nothing to be done here } } - /** - * @return - */ - public boolean isActive() { - if(acquisition==null){ - return false; + public boolean isActive(String trackingId) { + Future f = erequests.get(trackingId); + if(f==null){ + throw new IllegalArgumentException("There is no request for tracking id "+trackingId); } - return acquisition.isActive(); + return !f.isDone(); } } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionJob.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionJob.java new file mode 100644 index 0000000..e4fcde3 --- /dev/null +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionJob.java @@ -0,0 +1,77 @@ +/** + * + * Copyright 2014 Paul Scherrer Institute. All rights reserved. + * + * This code is free software: you can redistribute it and/or modify it under + * the terms of the GNU Lesser General Public License as published by the Free + * Software Foundation, either version 3 of the License, or (at your option) any + * later version. + * + * This code is distributed in the hope that it will be useful, but without any + * warranty; without even the implied warranty of merchantability or fitness for + * a particular purpose. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this code. If not, see . + * + */ +package ch.psi.fda.rest; + +import java.util.logging.Logger; + +import ch.psi.fda.aq.Acquisition; +import ch.psi.fda.aq.AcquisitionConfiguration; +import ch.psi.fda.model.v1.Configuration; +import ch.psi.jcae.ChannelService; + +import com.google.common.eventbus.EventBus; + +public class AcquisitionJob implements Runnable { + + private static final Logger logger = Logger.getLogger(AcquisitionJob.class.getName()); + + private final AcquisitionConfiguration config; + private final ChannelService cService; + private final ZMQDataService zmqService; + + private final String trackingId; + private final Configuration configuration; + + public AcquisitionJob(ChannelService cService, ZMQDataService zmqService, AcquisitionConfiguration config, String trackingId, Configuration configuration) { + this.zmqService = zmqService; + this.cService = cService; + this.config = config; + + this.trackingId = trackingId; + this.configuration = configuration; + } + + @Override + public void run() { + + Acquisition acquisition = null; + try { + + logger.info("Execute - " + trackingId); + + EventBus ebus = new EventBus(); + zmqService.setTrackingId(trackingId); + ebus.register(zmqService); + + acquisition = new Acquisition(cService, config); + acquisition.initalize(ebus, configuration); + acquisition.execute(); + logger.info("" + trackingId + " done"); + + // Cleanup + ebus.unregister(zmqService); + } catch (InterruptedException e) { + logger.info("Execution of "+trackingId+ " was interrupted"); + } finally { + if(acquisition!=null){ + acquisition.destroy(); + } + } + } +} diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/FDAService.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/FDAService.java index 6b4cd1b..c981aea 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/FDAService.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/FDAService.java @@ -56,8 +56,9 @@ public class FDAService { } @GET - public boolean isActive(){ - return aengine.isActive(); + @Path("{trackingId}/running") + public boolean isActive(String trackingId){ + return aengine.isActive(trackingId); } }