Moved on to google guava EventBus for visualizer ...

This commit is contained in:
2013-10-03 10:30:29 +02:00
parent 3e38e4002d
commit 4045873135
9 changed files with 251 additions and 430 deletions

View File

@@ -7,9 +7,9 @@
<dependencies>
<dependency>
<groupId>org.jeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.2.0</version>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<dependency>

View File

@@ -37,6 +37,8 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.core.ActionLoop;
import ch.psi.fda.core.Actor;
import ch.psi.fda.core.EngineConfiguration;
@@ -180,7 +182,7 @@ public class Acquisition {
* @param getQueue Flag whether to return a queue or not. If false the return value of the function will be null.
* @throws InterruptedException
*/
public DataQueue initalize(Configuration smodel, boolean getQueue) {
public DataQueue initalize(EventBus bus, Configuration smodel, boolean getQueue) {
// Create notification agent with globally configured recipients
notificationAgent = new NotificationAgent(configuration.getSmptServer(), "fda.notification@psi.ch");
@@ -252,8 +254,10 @@ public class Acquisition {
// Add manipulator into processing chain
this.manipulator = new Manipulator(collector.getOutQueue(), this.manipulations);
// // Insert dispatcher into processing chain
this.dispatcher = new DataDispatcher(manipulator.getOutQueue());
this.dispatcher = new DataDispatcher(bus, manipulator.getOutQueue());
DataQueue dq = new DataQueue(new LinkedBlockingQueue<Message>(1000), manipulator.getOutQueue().getDataMessageMetadata()); // Create bounded queue to
// prevent running out of

View File

@@ -27,6 +27,7 @@ import java.awt.event.WindowEvent;
import java.io.File;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -47,9 +48,11 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import sun.misc.Signal;
import sun.misc.SignalHandler;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.gui.ProgressPanel;
import ch.psi.fda.gui.ScrollableFlowPanel;
@@ -258,12 +261,17 @@ public class AcquisitionMain {
vis=true;
}
DataQueue vdq = acquisition.initalize(c, vis);
EventBus b = new AsyncEventBus(Executors.newSingleThreadExecutor());
DataQueue vdq = acquisition.initalize(b, c, vis);
Visualizer visualizer = null;
// Only register data visualization task/processor if there are visualizations
if(vis){
visualizer = new Visualizer(vdq, c.getVisualization());
visualizer = new Visualizer(vdq.getDataMessageMetadata(), c.getVisualization());
b.register(visualizer);
// If there is a continous dimension only update plot at the end of a line
if(c.getScan() != null && c.getScan().getCdimension()!=null){
visualizer.setUpdateAtStreamElement(false);
@@ -397,15 +405,15 @@ public class AcquisitionMain {
try {
if(visualizer != null){
// Start visualization
visualizer.startVisualization();
visualizer.configure();
}
acquisition.execute();
if(visualizer != null){
// Stop visualization
visualizer.stopVisualization();
}
// if(visualizer != null){
// // Stop visualization
// visualizer.stopVisualization();
// }
} catch (InterruptedException e1) {
throw new RuntimeException("Acquisition was interrupted",e1);

View File

@@ -22,8 +22,11 @@ package ch.psi.fda.core.collector;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.SerializationUtils;
import org.jeromq.ZMQ;
//import org.apache.commons.lang.SerializationUtils;
//import org.jeromq.ZMQ;
//import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.EndOfStreamMessage;
@@ -39,7 +42,10 @@ public class DataDispatcher implements Runnable{
private DataQueue queue;
private List<DataQueue> outQueues;
public DataDispatcher(DataQueue queue){
private EventBus bus;
public DataDispatcher(EventBus b, DataQueue queue){
this.bus = b;
this.queue = queue;
this.outQueues = new ArrayList<DataQueue>();
}
@@ -52,9 +58,9 @@ public class DataDispatcher implements Runnable{
try{
ZMQ.Context context = ZMQ.context();
ZMQ.Socket socket = context.socket(ZMQ.PUB);
socket.bind("tcp://*:9090");
// ZMQ.Context context = ZMQ.context();
// ZMQ.Socket socket = context.socket(ZMQ.PUB);
// socket.bind("tcp://*:9090");
// socket.bind("inproc://visualize");
// TODO Need to synchronize message metadata
@@ -68,7 +74,8 @@ public class DataDispatcher implements Runnable{
for(DataQueue q: outQueues){
q.getQueue().put(message);
}
socket.send(SerializationUtils.serialize(message));
bus.post(message);
// socket.send(SerializationUtils.serialize(message));
// Read next message
message = queue.getQueue().take();
@@ -78,10 +85,11 @@ public class DataDispatcher implements Runnable{
for(DataQueue q: outQueues){
q.getQueue().put(message);
}
socket.send(SerializationUtils.serialize(message));
// socket.send(SerializationUtils.serialize(message));
bus.post(message);
socket.close();
context.term();
// socket.close();
// context.term();
} catch (InterruptedException e) {
// TODO Stop loop and exit logic instead of throwing an Exception

View File

@@ -116,9 +116,9 @@ public class VisualizationEngine {
DataDeserializer deserializer = new DataDeserializerTXT(data);
// Create Visualizer
Visualizer visualizer = new Visualizer(deserializer.getQueue(), configuration.getVisualization());
Visualizer visualizer = new Visualizer(deserializer.getQueue().getDataMessageMetadata(), configuration.getVisualization());
visualizer.setTerminateAtEOS(true);
// visualizer.setTerminateAtEOS(true);
// Adapt default visualizer behavior to optimize performance for visualization
visualizer.setUpdateAtStreamElement(false);
visualizer.setUpdateAtStreamDelimiter(false);
@@ -158,7 +158,7 @@ public class VisualizationEngine {
Thread td = new Thread(deserializer);
td.start();
visualizer.startVisualization();
visualizer.configure();
td.join();
logger.info("Deserializer finished");

View File

@@ -1,74 +0,0 @@
/**
*
* Copyright 2010 Paul Scherrer Institute. All rights reserved.
*
* This code is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This code is distributed in the hope that it will be useful,
* but without any warranty; without even the implied warranty of
* merchantability or fitness for a particular purpose. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.visualizer;
import java.util.ArrayList;
import java.util.List;
import ch.psi.fda.core.messages.DataQueue;
/**
* @author ebner
*
*/
public class FilterSet {
private DataQueue queue;
private List<SeriesDataFilter> filters = new ArrayList<SeriesDataFilter>();
private String name;
public FilterSet(DataQueue queue){
this("", queue);
}
public FilterSet(String name, DataQueue queue){
this.queue = queue;
}
/**
* @return the queue
*/
public DataQueue getQueue() {
return queue;
}
/**
* @return the filters
*/
public List<SeriesDataFilter> getFilters() {
return filters;
}
/**
* @return the name
*/
public String getName() {
return name;
}
/**
* @param name the name to set
*/
public void setName(String name) {
this.name = name;
}
}

View File

@@ -27,17 +27,12 @@ import java.util.logging.Logger;
import javax.swing.JPanel;
import javax.swing.SwingUtilities;
//import org.jfree.data.xy.XYSeries;
import org.apache.commons.lang.SerializationUtils;
import org.jeromq.ZMQ;
import com.google.common.eventbus.Subscribe;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.StreamDelimiterMessage;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
import ch.psi.fda.model.v1.ArrayDetector;
import ch.psi.fda.model.v1.ContinuousPositioner;
import ch.psi.fda.model.v1.Detector;
@@ -49,309 +44,204 @@ import ch.psi.plot.Plot;
import ch.psi.plot.xy.LinePlot;
import ch.psi.plot.xy.XYSeriesCollectionP;
import ch.psi.plot.xy.XYSeriesP;
//import ch.psi.plot.xyz.JFreeMatrixPlot;
//import ch.psi.plot.xyz.JFreeMatrixPlotData;
//import ch.psi.plot.xyz.JFreeMatrixPlotMetadata;
import ch.psi.plot.xyz.MatrixPlot;
import ch.psi.plot.xyz.MatrixPlotData;
/**
* Serialize data received by a DataQueue
* Visualizer for visualizing data
* @author ebner
*
*/
public class Visualizer {
// Logger
private static Logger logger = Logger.getLogger(Visualizer.class.getName());
private FilterSet filterSet = null;
private List<Plot> plots = new ArrayList<Plot>();
private Thread visualizationThread = null;
/**
* Terminate at end of stream
*/
private boolean terminateAtEOS = false;
private volatile boolean terminate = false;
private boolean updateAtStreamElement = true;
private boolean updateAtStreamDelimiter = true;
private boolean updateAtEndOfStream = false;
public Visualizer(DataQueue queue, List<Visualization> vl){
filterSet = mapVisualizations(queue, vl);
private int ecount;
private boolean clearPlot;
private List<SeriesDataFilter> filters;
public Visualizer(DataMessageMetadata meta, List<Visualization> vl){
filters = mapVisualizations(meta, vl);
}
/**
* Visualize data
* Method blocks until visualization is done
*/
public void visualize() {
if(filterSet != null ){
ZMQ.Context context = ZMQ.context();
ZMQ.Socket socket = context.socket(ZMQ.SUB);
socket.subscribe("");
socket.connect("tcp://emac:9090");
// socket.connect("inproc://visualize");
// while(true){
// Object message = SerializationUtils.deserialize(socket.recv());
//// Object message = socket.recv();
// logger.info(""+message);
// }
// DataQueue queue = filterSet.getQueue();
List<SeriesDataFilter> filters = filterSet.getFilters();
int ecount = 0;
boolean clearPlot = false;
terminate = false;
// Read Messages
Message message = null;
// try {
// message = queue.getQueue().take();
message = (Message) SerializationUtils.deserialize(socket.recv());
// } catch (InterruptedException e) {
// terminate = true;
// // Reset interrupted status
// Thread.currentThread().interrupt();
//
// }
while ( (!Thread.currentThread().isInterrupted()) && (!terminate) ) {
if (message instanceof DataMessage) {
final DataMessage m = (DataMessage) message;
// Clear is here as the plot should not be cleared after the last point is plotted
// but just before the first point of the next plot (cycle)
if (clearPlot) {
for (Plot plot: plots) {
if(plot instanceof MatrixPlot){
((MatrixPlotData) ((MatrixPlot)plot).getData()).clear();
}
}
clearPlot = false;
}
for(SeriesDataFilter filter: filters){
if(filter instanceof XYSeriesDataFilter){
XYSeriesDataFilter xyfilter = (XYSeriesDataFilter) filter;
if(xyfilter.getActualSeries()==null || xyfilter.isNewseries()){
// First series that is filled by this filter!
XYSeriesP s = new XYSeriesP(xyfilter.getSeriesName() + " " + ecount + "-" + xyfilter.getCount());
((LinePlot)xyfilter.getPlot()).getData().addSeries(s);
xyfilter.setActualSeries(s);
xyfilter.setNewseries(false);
}
// XYSeriesP series = ((LinePlot) xyfilter.getPlot()).getData().getSeries(xyfilter.getCount()); // TODO Does not work with multiple series filter per plot !!!!
XYSeriesP series = xyfilter.getActualSeries(); // TODO Does not work with multiple series filter per plot !!!!
// series.add((Double) m.getData().get(xyfilter.getIndexX()), (Double) m.getData().get(xyfilter.getIndexY()));
// There might be other values than double in the data, therefore we have to check for it
Object dX = m.getData().get(xyfilter.getIndexX());
Object dY = m.getData().get(xyfilter.getIndexY());
Double dataX = Double.NaN;
Double dataY = Double.NaN;
if(dX instanceof Double){
dataX = (Double) dX;
}
if(dY instanceof Double){
dataY = (Double) dY;
}
// Add Data to the series
series.add(dataX , dataY, updateAtStreamElement);
}
if(filter instanceof XYSeriesArrayDataFilter){
final XYSeriesArrayDataFilter xyfilter = (XYSeriesArrayDataFilter) filter;
// Ensure that there is no concurrent modification exception or synchronization problems with the
// Swing update task
SwingUtilities.invokeLater(new Runnable(){
@Override
public void run() {
XYSeriesP series = new XYSeriesP(xyfilter.getSeriesName() + "-" + xyfilter.getCount()); // Series name must be unique
xyfilter.incrementCount();
// ((LinePlot)xyfilter.getPlot()).getData().removeAllSeries(); // Remove all series from the data
// If we can agree only to display one series at a time also a clear() on the actual series is better
XYSeriesCollectionP sc = ((LinePlot)xyfilter.getPlot()).getData();
sc.addSeries(series);
// Remove outdated series
if(sc.getSeriesCount()>xyfilter.getMaxSeries()){
// Remove oldest series
sc.removeSeries(0);
}
double[] data = (double[]) m.getData().get(xyfilter.getIndexY());
// Copy data starting from offset to size
int size = data.length;
int offset = xyfilter.getOffset();
if(xyfilter.getSize()>0 && offset+xyfilter.getSize()<data.length){
size = offset + xyfilter.getSize();
}
for(int i=offset;i<size;i++){
series.add(i,data[i], false); // Do not fire change event - this would degrade performance drastically
}
series.fireSeriesChanged();
}
});
}
else if(filter instanceof XYZSeriesDataFilter){
XYZSeriesDataFilter xyzfilter = (XYZSeriesDataFilter) filter;
try{
((MatrixPlot)xyzfilter.getPlot()).getData().addData((Double) m.getData().get(xyzfilter.getIndexX()),(Double) m.getData().get(xyzfilter.getIndexY()), (Double) m.getData().get(xyzfilter.getIndexZ()));
}
catch (Exception e) {
// Ignore if something goes wrong while adding a datapoint
logger.log(Level.WARNING, "Unable to plot datapoint in matrix plot", e);
}
}
else if(filter instanceof XYZSeriesArrayDataFilter){
XYZSeriesArrayDataFilter xyzfilter = (XYZSeriesArrayDataFilter) filter;
try{
double[] data = (double[]) m.getData().get(xyzfilter.getIndexZ());
double y = (Double) m.getData().get(xyzfilter.getIndexY());
int offset = xyzfilter.getOffset();
int size = xyzfilter.getSize();
for(int i=offset;i<offset+size; i++){
((MatrixPlot)xyzfilter.getPlot()).getData().addData(i, y, data[i]);
}
// Update data if update by point is enabled
if(updateAtStreamElement){
xyzfilter.getPlot().update();
}
}
catch (Exception e) {
// Ignore if something goes wrong while adding a datapoint
logger.log(Level.WARNING, "Unable to plot datapoint in matrix plot", e);
}
}
}
} else if (message instanceof StreamDelimiterMessage) {
StreamDelimiterMessage ddm = (StreamDelimiterMessage) message;
for(SeriesDataFilter filter: filters){
if(filter instanceof XYSeriesDataFilter){
// Create new series
XYSeriesDataFilter xyfilter = (XYSeriesDataFilter) filter;
if (ddm.getNumber() == xyfilter.getDimensionX()) {
// Indicate to create new series at the next message
xyfilter.setCount(xyfilter.getCount()+1); // Increment count of the filter
xyfilter.setNewseries(true);
}
}
}
// Update matrix plot at the end of each line
if(updateAtStreamDelimiter){
for (Plot plot: plots) {
if(plot instanceof MatrixPlot){
((MatrixPlot)plot).update();
}
else if(plot instanceof LinePlot){
((LinePlot) plot).update();
}
}
}
// Clear matrix plot if iflag is encountered
// TODO: One need to check whether the iflag belongs to a delimiter
// of a higher dimension than the highest dimension (axis) that
// is involved in the plot
if (ddm.isIflag()) {
clearPlot = true;
}
} else if (message instanceof EndOfStreamMessage) {
System.out.println("END");
ecount++;
if(terminateAtEOS){
terminate = true;
}
// Update plots if updateAtEndOfStream flag is set
if(updateAtEndOfStream){
// Update matrix plots
for (Plot plot: plots) {
if(plot instanceof MatrixPlot){
((MatrixPlot)plot).update();
((MatrixPlot)plot).adaptColorMapScale(); // Update color scale at the end
}
else if(plot instanceof LinePlot){
((LinePlot) plot).update();
}
}
@Subscribe
public void onDataMessage(final DataMessage message){
// Clear is here as the plot should not be cleared after the last point is plotted
// but just before the first point of the next plot (cycle)
if (clearPlot) {
for (Plot plot: plots) {
if(plot instanceof MatrixPlot){
((MatrixPlotData) ((MatrixPlot)plot).getData()).clear();
}
}
// End loop if terminate flag is true
if(terminate){
break;
clearPlot = false;
}
for(SeriesDataFilter filter: filters){
if(filter instanceof XYSeriesDataFilter){
XYSeriesDataFilter xyfilter = (XYSeriesDataFilter) filter;
if(xyfilter.getActualSeries()==null || xyfilter.isNewseries()){
// First series that is filled by this filter!
XYSeriesP s = new XYSeriesP(xyfilter.getSeriesName() + " " + ecount + "-" + xyfilter.getCount());
((LinePlot)xyfilter.getPlot()).getData().addSeries(s);
xyfilter.setActualSeries(s);
xyfilter.setNewseries(false);
}
XYSeriesP series = xyfilter.getActualSeries(); // TODO Does not work with multiple series filter per plot !!!!
// There might be other values than double in the data, therefore we have to check for it
Object dX = message.getData().get(xyfilter.getIndexX());
Object dY = message.getData().get(xyfilter.getIndexY());
Double dataX = Double.NaN;
Double dataY = Double.NaN;
if(dX instanceof Double){
dataX = (Double) dX;
}
if(dY instanceof Double){
dataY = (Double) dY;
}
// Add Data to the series
series.add(dataX , dataY, updateAtStreamElement);
}
if(filter instanceof XYSeriesArrayDataFilter){
final XYSeriesArrayDataFilter xyfilter = (XYSeriesArrayDataFilter) filter;
// Ensure that there is no concurrent modification exception or synchronization problems with the
// Swing update task
SwingUtilities.invokeLater(new Runnable(){
@Override
public void run() {
XYSeriesP series = new XYSeriesP(xyfilter.getSeriesName() + "-" + xyfilter.getCount()); // Series name must be unique
xyfilter.incrementCount();
// ((LinePlot)xyfilter.getPlot()).getData().removeAllSeries(); // Remove all series from the data
// If we can agree only to display one series at a time also a clear() on the actual series is better
XYSeriesCollectionP sc = ((LinePlot)xyfilter.getPlot()).getData();
sc.addSeries(series);
// Remove outdated series
if(sc.getSeriesCount()>xyfilter.getMaxSeries()){
// Remove oldest series
sc.removeSeries(0);
}
double[] data = (double[]) message.getData().get(xyfilter.getIndexY());
// Copy data starting from offset to size
int size = data.length;
int offset = xyfilter.getOffset();
if(xyfilter.getSize()>0 && offset+xyfilter.getSize()<data.length){
size = offset + xyfilter.getSize();
}
for(int i=offset;i<size;i++){
series.add(i,data[i], false); // Do not fire change event - this would degrade performance drastically
}
series.fireSeriesChanged();
}
});
}
else if(filter instanceof XYZSeriesDataFilter){
XYZSeriesDataFilter xyzfilter = (XYZSeriesDataFilter) filter;
try{
((MatrixPlot)xyzfilter.getPlot()).getData().addData((Double) message.getData().get(xyzfilter.getIndexX()),(Double) message.getData().get(xyzfilter.getIndexY()), (Double) message.getData().get(xyzfilter.getIndexZ()));
}
catch (Exception e) {
// Ignore if something goes wrong while adding a datapoint
logger.log(Level.WARNING, "Unable to plot datapoint in matrix plot", e);
}
}
else if(filter instanceof XYZSeriesArrayDataFilter){
XYZSeriesArrayDataFilter xyzfilter = (XYZSeriesArrayDataFilter) filter;
try{
double[] data = (double[]) message.getData().get(xyzfilter.getIndexZ());
double y = (Double) message.getData().get(xyzfilter.getIndexY());
int offset = xyzfilter.getOffset();
int size = xyzfilter.getSize();
for(int i=offset;i<offset+size; i++){
((MatrixPlot)xyzfilter.getPlot()).getData().addData(i, y, data[i]);
}
// Update data if update by point is enabled
if(updateAtStreamElement){
xyzfilter.getPlot().update();
}
}
catch (Exception e) {
// Ignore if something goes wrong while adding a datapoint
logger.log(Level.WARNING, "Unable to plot datapoint in matrix plot", e);
}
}
// Read next message
// try {
// message = queue.getQueue().take();
message = (Message) SerializationUtils.deserialize(socket.recv());
// } catch (InterruptedException e) {
// terminate = true;
// // Reset interrupted status
// Thread.currentThread().interrupt();
// }
}
System.out.println("VIS DONE");
socket.close();
context.term();
}
logger.info("End visualization");
}
/**
* Starts a visualization thread for this visualizer
*/
public void startVisualization(){
// Throw an exception if visualization is already running
if(visualizationThread!=null){
throw new IllegalStateException("Visualization already running");
}
visualizationThread = new Thread(new Runnable() {
@Override
public void run() {
visualize();
@Subscribe
public void onStreamDelimiterMessage(StreamDelimiterMessage message){
for(SeriesDataFilter filter: filters){
if(filter instanceof XYSeriesDataFilter){
// Create new series
XYSeriesDataFilter xyfilter = (XYSeriesDataFilter) filter;
if (message.getNumber() == xyfilter.getDimensionX()) {
// Indicate to create new series at the next message
xyfilter.setCount(xyfilter.getCount()+1); // Increment count of the filter
xyfilter.setNewseries(true);
}
}
}
// Update matrix plot at the end of each line
if(updateAtStreamDelimiter){
for (Plot plot: plots) {
if(plot instanceof MatrixPlot){
((MatrixPlot)plot).update();
}
else if(plot instanceof LinePlot){
((LinePlot) plot).update();
}
}
}
// Clear matrix plot if iflag is encountered
// TODO: One need to check whether the iflag belongs to a delimiter
// of a higher dimension than the highest dimension (axis) that
// is involved in the plot
if (message.isIflag()) {
clearPlot = true;
}
});
// Start thread
visualizationThread.start();
}
/**
* Stop visualization thread of this visualizer
* Waits until the visualiztion thread is stopped
* @throws InterruptedException
*/
public void stopVisualization() throws InterruptedException{
// Wait until visualiztion thread is stopped
visualizationThread.interrupt();
visualizationThread.join(10000); // Wait 10 seconds to have the visualization Thread to come to an end
@Subscribe
public void onEndOfStreamMessage(EndOfStreamMessage message){
ecount++;
visualizationThread = null;
// Update plots if updateAtEndOfStream flag is set
if(updateAtEndOfStream){
// Update matrix plots
for (Plot plot: plots) {
if(plot instanceof MatrixPlot){
((MatrixPlot)plot).update();
((MatrixPlot)plot).adaptColorMapScale(); // Update color scale at the end
}
else if(plot instanceof LinePlot){
((LinePlot) plot).update();
}
}
}
}
public void configure(){
ecount = 0;
clearPlot = false;
}
@@ -387,9 +277,8 @@ public class Visualizer {
* @param vl List of configured visualizations
* @return List of visualization configuration
*/
private FilterSet mapVisualizations(DataQueue queue, List<Visualization> vl){
FilterSet fset = new FilterSet(queue);
List<SeriesDataFilter> filters = fset.getFilters();
private List<SeriesDataFilter> mapVisualizations(DataMessageMetadata meta, List<Visualization> vl){
List<SeriesDataFilter> filters = new ArrayList<SeriesDataFilter>();
for(Visualization v: vl){
if(v instanceof ch.psi.fda.model.v1.LinePlot){
@@ -401,14 +290,14 @@ public class Visualizer {
// Create data filter for visualization
String idX = getId(lp.getX());
int indexX = queue.getDataMessageMetadata().getIndex(idX);
int dimX = queue.getDataMessageMetadata().getComponents().get(indexX).getDimension();
int indexX = meta.getIndex(idX);
int dimX = meta.getComponents().get(indexX).getDimension();
List<Object> l = lp.getY();
for(Object o: l){
String idY = getId(o);
int indexY = queue.getDataMessageMetadata().getIndex(idY);
int dimY = queue.getDataMessageMetadata().getComponents().get(indexY).getDimension();
int indexY = meta.getIndex(idY);
int dimY = meta.getComponents().get(indexY).getDimension();
XYSeriesDataFilter filter = new XYSeriesDataFilter(idX, idY, indexX, indexY, plot);
filter.setDimensionX(dimX);
@@ -429,8 +318,8 @@ public class Visualizer {
List<Object> l = lp.getY();
for(Object o: l){
String idY = getId(o);
int indexY = queue.getDataMessageMetadata().getIndex(idY);
int dimY = queue.getDataMessageMetadata().getComponents().get(indexY).getDimension();
int indexY = meta.getIndex(idY);
int dimY = meta.getComponents().get(indexY).getDimension();
XYSeriesArrayDataFilter filter = new XYSeriesArrayDataFilter(idY, indexY, plot);
filter.setDimensionY(dimY);
@@ -515,13 +404,13 @@ public class Visualizer {
plots.add(plot);
// Create data filter for visualization
int indexX = queue.getDataMessageMetadata().getIndex(idX);
int indexY = queue.getDataMessageMetadata().getIndex(idY);
int indexZ = queue.getDataMessageMetadata().getIndex(idZ);
int indexX = meta.getIndex(idX);
int indexY = meta.getIndex(idY);
int indexZ = meta.getIndex(idZ);
int dimX = queue.getDataMessageMetadata().getComponents().get(indexX).getDimension();
int dimY = queue.getDataMessageMetadata().getComponents().get(indexY).getDimension();
int dimZ = queue.getDataMessageMetadata().getComponents().get(indexZ).getDimension();
int dimX = meta.getComponents().get(indexX).getDimension();
int dimY = meta.getComponents().get(indexY).getDimension();
int dimZ = meta.getComponents().get(indexZ).getDimension();
XYZSeriesDataFilter filter = new XYZSeriesDataFilter(idX, idY, idZ, indexX, indexY, indexZ, plot);
filter.setDimensionX(dimX);
@@ -604,11 +493,11 @@ public class Visualizer {
plots.add(plot);
// Create data filter for visualization
int indexY = queue.getDataMessageMetadata().getIndex(idY);
int indexZ = queue.getDataMessageMetadata().getIndex(idZ);
int indexY = meta.getIndex(idY);
int indexZ = meta.getIndex(idZ);
int dimY = queue.getDataMessageMetadata().getComponents().get(indexY).getDimension();
int dimZ = queue.getDataMessageMetadata().getComponents().get(indexZ).getDimension();
int dimY = meta.getComponents().get(indexY).getDimension();
int dimZ = meta.getComponents().get(indexZ).getDimension();
XYZSeriesArrayDataFilter filter = new XYZSeriesArrayDataFilter(idY, idZ, indexY, indexZ, offset, size, plot);
filter.setDimensionY(dimY);
@@ -620,7 +509,7 @@ public class Visualizer {
throw new RuntimeException(v.getClass().getName()+" is not supported as visualization type");
}
}
return fset;
return filters;
}
/**
@@ -636,20 +525,6 @@ public class Visualizer {
return panels;
}
/**
* @param terminateAtEOS the terminateAtEOS to set
*/
public void setTerminateAtEOS(boolean terminateAtEOS) {
this.terminateAtEOS = terminateAtEOS;
}
/**
* @return the terminateAtEOS
*/
public boolean isTerminateAtEOS() {
return terminateAtEOS;
}
/**
* @return the updateAtStreamElement
*/

View File

@@ -22,22 +22,22 @@ package ch.psi.fda;
import java.util.logging.Logger;
import org.apache.commons.lang.SerializationUtils;
import org.jeromq.ZMQ;
//import org.jeromq.ZMQ;
public class Receiver {
private static final Logger logger = Logger.getLogger(Receiver.class.getName());
public static void main(String[] args){
ZMQ.Context context = ZMQ.context();
ZMQ.Socket socket = context.socket(ZMQ.SUB);
socket.subscribe("");
socket.connect("tcp://emac:9090");
while(true){
Object message = SerializationUtils.deserialize(socket.recv());
// Object message = socket.recv();
logger.info(""+message);
}
// ZMQ.Context context = ZMQ.context();
// ZMQ.Socket socket = context.socket(ZMQ.SUB);
// socket.subscribe("");
// socket.connect("tcp://emac:9090");
// while(true){
// Object message = SerializationUtils.deserialize(socket.recv());
//// Object message = socket.recv();
// logger.info(""+message);
// }
}
}

View File

@@ -93,7 +93,7 @@ public class DataVisualizerTest {
vlist.add(p);
// Create visualizer
Visualizer visualizer = new Visualizer(queue, vlist);
Visualizer visualizer = new Visualizer(queue.getDataMessageMetadata(), vlist);
JFrame f = new JFrame();
f.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
@@ -136,7 +136,7 @@ public class DataVisualizerTest {
t.start();
visualizer.visualize();
// visualizer.visualize();
t.join();
@@ -159,7 +159,7 @@ public class DataVisualizerTest {
vlist.add(p);
// Create visualizer
Visualizer visualizer = new Visualizer(queue, vlist);
Visualizer visualizer = new Visualizer(queue.getDataMessageMetadata(), vlist);
JFrame f = new JFrame();
f.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
@@ -205,7 +205,7 @@ public class DataVisualizerTest {
t.start();
visualizer.visualize();
// visualizer.visualize();
t.join();
@@ -249,7 +249,7 @@ public class DataVisualizerTest {
vlist.add(mp);
// Create visualizer
Visualizer visualizer = new Visualizer(queue, vlist);
Visualizer visualizer = new Visualizer(queue.getDataMessageMetadata(), vlist);
JFrame f = new JFrame();
f.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
@@ -293,7 +293,7 @@ public class DataVisualizerTest {
t.start();
visualizer.visualize();
// visualizer.visualize();
t.join();