Cleaned up injects, added ZMQ streaming and viewer

This commit is contained in:
2013-11-27 09:55:14 +01:00
parent 70089581c4
commit a40573d320
6 changed files with 318 additions and 7 deletions
+8
View File
@@ -25,6 +25,14 @@
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.jeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@@ -0,0 +1,162 @@
/**
*
* Copyright 2010 Paul Scherrer Institute. All rights reserved.
*
* This code is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This code is distributed in the hope that it will be useful,
* but without any warranty; without even the implied warranty of
* merchantability or fitness for a particular purpose. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda;
import java.awt.FlowLayout;
import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTabbedPane;
import javax.swing.ScrollPaneLayout;
import org.jeromq.ZMQ;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.gui.ScrollableFlowPanel;
import ch.psi.fda.visualizer.SeriesDataFilter;
import ch.psi.fda.visualizer.Visualizer;
import ch.psi.fda.visualizer.XYSeriesDataFilter;
import ch.psi.plot.xy.LinePlot;
/**
* Visualize data according to the scan description
*/
public class ViewerMain {
private static Logger logger = Logger.getLogger(ViewerMain.class.getName());
/**
* Visualize data
* @param configuration
* @param data
* @throws InterruptedException
*/
public void visualize() throws InterruptedException{
List<SeriesDataFilter> filters = new ArrayList<>();
filters.add(new XYSeriesDataFilter("id0", "timestamp", new LinePlot("One")));
Visualizer visualizer = new Visualizer(filters);
// visualizer.setTerminateAtEOS(true);
// Adapt default visualizer behavior to optimize performance for visualization
visualizer.setUpdateAtStreamElement(true);
visualizer.setUpdateAtStreamDelimiter(true);
visualizer.setUpdateAtEndOfStream(true);
JPanel opanel = new ScrollableFlowPanel();
opanel.setLayout(new FlowLayout());
JScrollPane spane = new JScrollPane(opanel, ScrollPaneLayout.VERTICAL_SCROLLBAR_AS_NEEDED, ScrollPaneLayout.HORIZONTAL_SCROLLBAR_NEVER);
JTabbedPane tpane = new JTabbedPane();
tpane.addTab("Overview", spane);
for (JPanel p : visualizer.getPlotPanels()) {
opanel.add(p);
}
final JFrame frame = new JFrame();
frame.setSize(1200,800);
frame.add(tpane);
// frame.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);
frame.addWindowListener(new WindowAdapter(){
@Override
public void windowClosing(WindowEvent we){
System.exit(0);
}
});
frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);//.DO_NOTHING_ON_CLOSE);
java.awt.EventQueue.invokeLater(new Runnable() {
public void run() {
frame.setVisible(true);
}
});
// Start receiving messages
EventBus bus = new EventBus();
bus.register(visualizer);
visualizer.configure();
ZMQ.Context context = ZMQ.context();
zmq.ZError.clear(); // Clear error code
ZMQ.Socket socket = context.socket(ZMQ.SUB);
socket.connect("tcp://emac:10000");
socket.subscribe(""); // SUBSCRIBE !
while(true){
byte[] content = null;
// String header = socket.recvStr(); // header
socket.recvStr(); // header
while(socket.hasReceiveMore()){
content = socket.recv();
}
if(content==null){ // we lost something discard read messages
logger.warning("Lost some message - discard and continue");
continue;
}
try (
ByteArrayInputStream bis = new ByteArrayInputStream(content);
ObjectInput in = new ObjectInputStream(bis);
) {
Object o = in.readObject();
bus.post(o);
} catch (IOException | ClassNotFoundException e) {
logger.log(Level.WARNING,"Unable to deserialize message",e);
}
}
}
/**
* This method accepts a data file and an option (-file) specifying the scan configuration file
* used for creating the data file. If no option is specified the configuration file is derived by the
* data file name.
*
* @param args Command line arguments
*/
public static void main(String[] args) {
try{
ViewerMain e = new ViewerMain();
e.visualize();
}
catch(Exception ee){
System.out.println("Visualization failed due to: "+ee.getMessage());
logger.log(Level.WARNING, "Visualization failed due to Exception:", ee);
System.exit(-1);
}
}
}
@@ -41,22 +41,24 @@ public class AcquisitionEngine {
private static final Logger logger = Logger.getLogger(AcquisitionEngine.class.getName());
@Inject
private AcquisitionConfiguration config;
@Inject
private ChannelService cservice;
private final ZMQDataService dservice;
/**
* Variable holding the current acquisition
*/
private volatile Acquisition acquisition;
private volatile ExecutionRequest currentRequest;
private ExecutorService eservice;
private BlockingQueue<ExecutionRequest> requests = new LinkedBlockingQueue<>();
@Inject
public AcquisitionEngine(ChannelService cservice, AcquisitionConfiguration config) {
public AcquisitionEngine(ChannelService cservice, ZMQDataService dservice, AcquisitionConfiguration config) {
this.dservice = dservice;
this.cservice = cservice;
this.config = config;
@@ -73,16 +75,27 @@ public class AcquisitionEngine {
logger.info("Execute - " + r.getTrackingId());
EventBus ebus = new EventBus();
// Provide tracking id to data service and register the service to the event bus
AcquisitionEngine.this.dservice.setTrackingId(r.getTrackingId());
ebus.register(AcquisitionEngine.this.dservice);
synchronized (AcquisitionEngine.this) { // synchronize access to acquisition object via the AcquisitionEngine object
acquisition = new Acquisition(AcquisitionEngine.this.cservice, AcquisitionEngine.this.config);
currentRequest = r;
}
acquisition.initalize(ebus, r.getConfiguration());
acquisition.execute();
logger.info("" + r.getTrackingId() + " done");
// Cleanup
ebus.unregister(AcquisitionEngine.this.dservice);
} finally {
acquisition.destroy();
acquisition = null;
synchronized (AcquisitionEngine.this) {
acquisition = null;
currentRequest = null;
}
}
}
} catch (InterruptedException e) {
@@ -91,7 +104,13 @@ public class AcquisitionEngine {
});
}
/**
* Submit a scan to be executed. This will generate an execution request which is
* enqueued in the execution queue.
*
* @param configuration
* @return
*/
public String submit(Configuration configuration){
ExecutionRequest r = new ExecutionRequest(UUID.randomUUID().toString(), configuration);
try{
@@ -102,7 +121,10 @@ public class AcquisitionEngine {
}
public void stop(){
/**
* Terminate the currently executed request
*/
public void terminate(){
synchronized(this){
if(acquisition==null){
return;
@@ -112,4 +134,37 @@ public class AcquisitionEngine {
acquisition.abort();
}
}
/**
* Terminate the request which is identified by its tracking id. If the request
* is still in the execution queue it gets removed there.
* @param trackingId
*/
public void terminate(String trackingId){
// If request is currently executed terminate it
if(currentRequest.getTrackingId().equals(trackingId)){
terminate();
return;
}
// Remove request from request queue
ExecutionRequest rremove = null;
for(ExecutionRequest r:requests){
if(r.getTrackingId().equals(trackingId)){
rremove = r; // We have to split the filtering and termination as we otherwise get a concurrent access exception.
break;
}
}
if(rremove!=null){
boolean b = requests.remove(rremove);
// There is a chance that between the upper loop and here the request
// already got dequeued. This check ensures that the requests got removed
// if not it got dequeued and therefore we have to terminate the current
// execution.
if(!b){
terminate();
}
}
}
}
@@ -16,6 +16,7 @@ public class ResourceBinder extends AbstractBinder {
bind(DefaultChannelService.class).to(ChannelService.class).in(Singleton.class);
bind(AcquisitionConfiguration.class).to(AcquisitionConfiguration.class).in(Singleton.class);
bind(AcquisitionEngine.class).to(AcquisitionEngine.class).in(Singleton.class);
bind(ZMQDataService.class).to(ZMQDataService.class).in(Singleton.class);
}
}
@@ -0,0 +1,85 @@
/**
*
* Copyright 2013 Paul Scherrer Institute. All rights reserved.
*
* This code is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* This code is distributed in the hope that it will be useful, but without any
* warranty; without even the implied warranty of merchantability or fitness for
* a particular purpose. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.rest;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jeromq.ZMQ;
import ch.psi.fda.core.messages.Message;
import com.google.common.eventbus.Subscribe;
/**
* @author ebner
*
*/
public class ZMQDataService {
private static final Logger logger = Logger.getLogger(ZMQDataService.class.getName());
private final int bufferSize = 5;
private ZMQ.Context context;
private ZMQ.Socket socket;
private String trackingId;
public ZMQDataService(){
initialize();
}
public void initialize(){
context = ZMQ.context();
zmq.ZError.clear(); // Clear error code
socket = context.socket(ZMQ.PUB);
socket.setHWM(bufferSize);
socket.bind("tcp://*:10000");
}
public void terminate(){
socket.close();
context.term();
zmq.ZError.clear(); // Clear error code
}
@Subscribe
public void onMessage(Message m) {
logger.info(m.toString());
socket.sendMore("{\"trackingId\":\"" + trackingId + "\"}");
try (
ByteArrayOutputStream b = new ByteArrayOutputStream();
ObjectOutputStream o = new ObjectOutputStream(b);
) {
o.writeObject(m);
socket.send(b.toByteArray());
} catch (IOException e) {
logger.log(Level.WARNING, "Unable to serialize message", e);
}
}
public void setTrackingId(String id){
trackingId = id;
}
}
@@ -44,6 +44,6 @@ public class ScanService {
@DELETE
public void stop(){
aengine.stop();
aengine.terminate();
}
}