diff --git a/ch.psi.fda/pom.xml b/ch.psi.fda/pom.xml index d3ce916..34b20ad 100644 --- a/ch.psi.fda/pom.xml +++ b/ch.psi.fda/pom.xml @@ -25,6 +25,14 @@ 2.3.1 + + + org.jeromq + jeromq + 0.2.0 + + + com.google.guava guava diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/ViewerMain.java b/ch.psi.fda/src/main/java/ch/psi/fda/ViewerMain.java new file mode 100644 index 0000000..79a51e8 --- /dev/null +++ b/ch.psi.fda/src/main/java/ch/psi/fda/ViewerMain.java @@ -0,0 +1,162 @@ +/** + * + * Copyright 2010 Paul Scherrer Institute. All rights reserved. + * + * This code is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This code is distributed in the hope that it will be useful, + * but without any warranty; without even the implied warranty of + * merchantability or fitness for a particular purpose. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this code. If not, see . + * + */ + +package ch.psi.fda; + +import java.awt.FlowLayout; +import java.awt.event.WindowAdapter; +import java.awt.event.WindowEvent; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.swing.JFrame; +import javax.swing.JPanel; +import javax.swing.JScrollPane; +import javax.swing.JTabbedPane; +import javax.swing.ScrollPaneLayout; + +import org.jeromq.ZMQ; + +import com.google.common.eventbus.EventBus; + +import ch.psi.fda.gui.ScrollableFlowPanel; +import ch.psi.fda.visualizer.SeriesDataFilter; +import ch.psi.fda.visualizer.Visualizer; +import ch.psi.fda.visualizer.XYSeriesDataFilter; +import ch.psi.plot.xy.LinePlot; + +/** + * Visualize data according to the scan description + */ +public class ViewerMain { + + private static Logger logger = Logger.getLogger(ViewerMain.class.getName()); + + /** + * Visualize data + * @param configuration + * @param data + * @throws InterruptedException + */ + public void visualize() throws InterruptedException{ + + List filters = new ArrayList<>(); + filters.add(new XYSeriesDataFilter("id0", "timestamp", new LinePlot("One"))); + + Visualizer visualizer = new Visualizer(filters); + +// visualizer.setTerminateAtEOS(true); + // Adapt default visualizer behavior to optimize performance for visualization + visualizer.setUpdateAtStreamElement(true); + visualizer.setUpdateAtStreamDelimiter(true); + visualizer.setUpdateAtEndOfStream(true); + + JPanel opanel = new ScrollableFlowPanel(); + opanel.setLayout(new FlowLayout()); + + JScrollPane spane = new JScrollPane(opanel, ScrollPaneLayout.VERTICAL_SCROLLBAR_AS_NEEDED, ScrollPaneLayout.HORIZONTAL_SCROLLBAR_NEVER); + JTabbedPane tpane = new JTabbedPane(); + tpane.addTab("Overview", spane); + + for (JPanel p : visualizer.getPlotPanels()) { + opanel.add(p); + } + + final JFrame frame = new JFrame(); + frame.setSize(1200,800); + frame.add(tpane); +// frame.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE); + frame.addWindowListener(new WindowAdapter(){ + @Override + public void windowClosing(WindowEvent we){ + System.exit(0); + } + }); + frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);//.DO_NOTHING_ON_CLOSE); + + java.awt.EventQueue.invokeLater(new Runnable() { + public void run() { + frame.setVisible(true); + } + }); + + // Start receiving messages + EventBus bus = new EventBus(); + bus.register(visualizer); + visualizer.configure(); + + ZMQ.Context context = ZMQ.context(); + zmq.ZError.clear(); // Clear error code + ZMQ.Socket socket = context.socket(ZMQ.SUB); + socket.connect("tcp://emac:10000"); + socket.subscribe(""); // SUBSCRIBE ! + + while(true){ + byte[] content = null; +// String header = socket.recvStr(); // header + socket.recvStr(); // header + while(socket.hasReceiveMore()){ + content = socket.recv(); + } + if(content==null){ // we lost something discard read messages + logger.warning("Lost some message - discard and continue"); + continue; + } + + try ( + ByteArrayInputStream bis = new ByteArrayInputStream(content); + ObjectInput in = new ObjectInputStream(bis); + ) { + Object o = in.readObject(); + bus.post(o); + } catch (IOException | ClassNotFoundException e) { + logger.log(Level.WARNING,"Unable to deserialize message",e); + } + + } + } + + /** + * This method accepts a data file and an option (-file) specifying the scan configuration file + * used for creating the data file. If no option is specified the configuration file is derived by the + * data file name. + * + * @param args Command line arguments + */ + public static void main(String[] args) { + + + try{ + ViewerMain e = new ViewerMain(); + e.visualize(); + } + catch(Exception ee){ + System.out.println("Visualization failed due to: "+ee.getMessage()); + logger.log(Level.WARNING, "Visualization failed due to Exception:", ee); + System.exit(-1); + } + } + +} diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionEngine.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionEngine.java index cc0f820..a3d580f 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionEngine.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionEngine.java @@ -41,22 +41,24 @@ public class AcquisitionEngine { private static final Logger logger = Logger.getLogger(AcquisitionEngine.class.getName()); - @Inject private AcquisitionConfiguration config; - @Inject private ChannelService cservice; + private final ZMQDataService dservice; /** * Variable holding the current acquisition */ private volatile Acquisition acquisition; + private volatile ExecutionRequest currentRequest; private ExecutorService eservice; + private BlockingQueue requests = new LinkedBlockingQueue<>(); @Inject - public AcquisitionEngine(ChannelService cservice, AcquisitionConfiguration config) { + public AcquisitionEngine(ChannelService cservice, ZMQDataService dservice, AcquisitionConfiguration config) { + this.dservice = dservice; this.cservice = cservice; this.config = config; @@ -73,16 +75,27 @@ public class AcquisitionEngine { logger.info("Execute - " + r.getTrackingId()); EventBus ebus = new EventBus(); + // Provide tracking id to data service and register the service to the event bus + AcquisitionEngine.this.dservice.setTrackingId(r.getTrackingId()); + ebus.register(AcquisitionEngine.this.dservice); synchronized (AcquisitionEngine.this) { // synchronize access to acquisition object via the AcquisitionEngine object acquisition = new Acquisition(AcquisitionEngine.this.cservice, AcquisitionEngine.this.config); + currentRequest = r; } acquisition.initalize(ebus, r.getConfiguration()); acquisition.execute(); logger.info("" + r.getTrackingId() + " done"); + + // Cleanup + ebus.unregister(AcquisitionEngine.this.dservice); } finally { acquisition.destroy(); - acquisition = null; + + synchronized (AcquisitionEngine.this) { + acquisition = null; + currentRequest = null; + } } } } catch (InterruptedException e) { @@ -91,7 +104,13 @@ public class AcquisitionEngine { }); } - + /** + * Submit a scan to be executed. This will generate an execution request which is + * enqueued in the execution queue. + * + * @param configuration + * @return + */ public String submit(Configuration configuration){ ExecutionRequest r = new ExecutionRequest(UUID.randomUUID().toString(), configuration); try{ @@ -102,7 +121,10 @@ public class AcquisitionEngine { } - public void stop(){ + /** + * Terminate the currently executed request + */ + public void terminate(){ synchronized(this){ if(acquisition==null){ return; @@ -112,4 +134,37 @@ public class AcquisitionEngine { acquisition.abort(); } } + + /** + * Terminate the request which is identified by its tracking id. If the request + * is still in the execution queue it gets removed there. + * @param trackingId + */ + public void terminate(String trackingId){ + + // If request is currently executed terminate it + if(currentRequest.getTrackingId().equals(trackingId)){ + terminate(); + return; + } + + // Remove request from request queue + ExecutionRequest rremove = null; + for(ExecutionRequest r:requests){ + if(r.getTrackingId().equals(trackingId)){ + rremove = r; // We have to split the filtering and termination as we otherwise get a concurrent access exception. + break; + } + } + if(rremove!=null){ + boolean b = requests.remove(rremove); + // There is a chance that between the upper loop and here the request + // already got dequeued. This check ensures that the requests got removed + // if not it got dequeued and therefore we have to terminate the current + // execution. + if(!b){ + terminate(); + } + } + } } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/ResourceBinder.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/ResourceBinder.java index 2156118..34e6b51 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/rest/ResourceBinder.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/ResourceBinder.java @@ -16,6 +16,7 @@ public class ResourceBinder extends AbstractBinder { bind(DefaultChannelService.class).to(ChannelService.class).in(Singleton.class); bind(AcquisitionConfiguration.class).to(AcquisitionConfiguration.class).in(Singleton.class); bind(AcquisitionEngine.class).to(AcquisitionEngine.class).in(Singleton.class); + bind(ZMQDataService.class).to(ZMQDataService.class).in(Singleton.class); } } \ No newline at end of file diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/ZMQDataService.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/ZMQDataService.java new file mode 100644 index 0000000..7c935e5 --- /dev/null +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/ZMQDataService.java @@ -0,0 +1,85 @@ +/** + * + * Copyright 2013 Paul Scherrer Institute. All rights reserved. + * + * This code is free software: you can redistribute it and/or modify it under + * the terms of the GNU Lesser General Public License as published by the Free + * Software Foundation, either version 3 of the License, or (at your option) any + * later version. + * + * This code is distributed in the hope that it will be useful, but without any + * warranty; without even the implied warranty of merchantability or fitness for + * a particular purpose. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this code. If not, see . + * + */ +package ch.psi.fda.rest; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.jeromq.ZMQ; + +import ch.psi.fda.core.messages.Message; + +import com.google.common.eventbus.Subscribe; + +/** + * @author ebner + * + */ +public class ZMQDataService { + + private static final Logger logger = Logger.getLogger(ZMQDataService.class.getName()); + + private final int bufferSize = 5; + + private ZMQ.Context context; + private ZMQ.Socket socket; + + private String trackingId; + + public ZMQDataService(){ + initialize(); + } + + public void initialize(){ + context = ZMQ.context(); + zmq.ZError.clear(); // Clear error code + socket = context.socket(ZMQ.PUB); + socket.setHWM(bufferSize); + socket.bind("tcp://*:10000"); + } + + public void terminate(){ + socket.close(); + context.term(); + zmq.ZError.clear(); // Clear error code + } + + @Subscribe + public void onMessage(Message m) { + logger.info(m.toString()); + socket.sendMore("{\"trackingId\":\"" + trackingId + "\"}"); + try ( + ByteArrayOutputStream b = new ByteArrayOutputStream(); + ObjectOutputStream o = new ObjectOutputStream(b); + ) { + o.writeObject(m); + socket.send(b.toByteArray()); + } catch (IOException e) { + logger.log(Level.WARNING, "Unable to serialize message", e); + } + } + + + public void setTrackingId(String id){ + trackingId = id; + } +} diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/ScanService.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/ScanService.java index 5c6ea1f..98b04d4 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/ScanService.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/ScanService.java @@ -44,6 +44,6 @@ public class ScanService { @DELETE public void stop(){ - aengine.stop(); + aengine.terminate(); } }