Unfinished removal of the dispatcher class. tested the migration of the

serializer to EventBus.
Test is/was successfull but things now need to be cleaned up properly!
There are various compilation errors in the code
This commit is contained in:
2013-10-03 10:57:20 +02:00
parent 4045873135
commit c3238e4d22
5 changed files with 304 additions and 319 deletions

View File

@@ -57,7 +57,6 @@ import ch.psi.fda.core.actors.JythonFunction;
import ch.psi.fda.core.actors.OTFActuator;
import ch.psi.fda.core.actors.PseudoActuatorSensor;
import ch.psi.fda.core.collector.Collector;
import ch.psi.fda.core.collector.DataDispatcher;
import ch.psi.fda.core.guard.ChannelAccessGuard;
import ch.psi.fda.core.guard.ChannelAccessGuardCondition;
import ch.psi.fda.core.loops.ActorSensorLoop;
@@ -68,6 +67,7 @@ import ch.psi.fda.core.loops.cr.ScrlogicLoop;
import ch.psi.fda.core.manipulator.JythonManipulation;
import ch.psi.fda.core.manipulator.Manipulation;
import ch.psi.fda.core.manipulator.Manipulator;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.Message;
import ch.psi.fda.core.scripting.JythonGlobalVariable;
@@ -137,7 +137,6 @@ public class Acquisition {
private ActionLoop actionLoop;
private Collector collector;
private DataDispatcher dispatcher;
private Manipulator manipulator;
private DataSerializerTXT serializer;
@@ -182,7 +181,7 @@ public class Acquisition {
* @param getQueue Flag whether to return a queue or not. If false the return value of the function will be null.
* @throws InterruptedException
*/
public DataQueue initalize(EventBus bus, Configuration smodel, boolean getQueue) {
public DataMessageMetadata initalize(EventBus bus, Configuration smodel, boolean getQueue) {
// Create notification agent with globally configured recipients
notificationAgent = new NotificationAgent(configuration.getSmptServer(), "fda.notification@psi.ch");
@@ -252,29 +251,15 @@ public class Acquisition {
Collections.reverse(collector.getQueues());
// Add manipulator into processing chain
this.manipulator = new Manipulator(collector.getOutQueue(), this.manipulations);
this.manipulator = new Manipulator(bus, collector.getOutQueue(), this.manipulations);
// // Insert dispatcher into processing chain
this.dispatcher = new DataDispatcher(bus, manipulator.getOutQueue());
DataQueue dq = new DataQueue(new LinkedBlockingQueue<Message>(1000), manipulator.getOutQueue().getDataMessageMetadata()); // Create bounded queue to
// prevent running out of
// memory ...
this.serializer = new DataSerializerTXT(dq, datafile, true);
DataQueue vdq = null;
if (getQueue) {
vdq = new DataQueue(new LinkedBlockingQueue<Message>(1000), manipulator.getOutQueue().getDataMessageMetadata()); // Create bounded queue to prevent
// running out of memory ...
// dispatcher.getOutQueues().add(vdq);
}
// Add queue for serializer to dispatcher
dispatcher.getOutQueues().add(dq);
return (vdq);
DataMessageMetadata metadata = manipulator.getMetadata();
this.serializer = new DataSerializerTXT(metadata, datafile, true);
bus.register(serializer);
return (metadata);
}
/**
@@ -294,12 +279,6 @@ public class Acquisition {
Thread tm = new Thread(manipulator);
tm.start();
Thread td = new Thread(dispatcher);
td.start();
Thread t = new Thread(serializer);
t.start();
actionLoop.prepare();
actionLoop.execute();
actionLoop.cleanup();
@@ -310,8 +289,6 @@ public class Acquisition {
// Give the threads 1 minute to catch up
tc.join(60000);
tm.join(60000);
td.join(60000);
t.join(60000);
// Send notifications out to all recipients that want to have success notifications
try {

View File

@@ -53,6 +53,7 @@ import com.google.common.eventbus.EventBus;
import sun.misc.Signal;
import sun.misc.SignalHandler;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.gui.ProgressPanel;
import ch.psi.fda.gui.ScrollableFlowPanel;
@@ -264,13 +265,13 @@ public class AcquisitionMain {
EventBus b = new AsyncEventBus(Executors.newSingleThreadExecutor());
DataQueue vdq = acquisition.initalize(b, c, vis);
DataMessageMetadata dmeta = acquisition.initalize(b, c, vis);
Visualizer visualizer = null;
// Only register data visualization task/processor if there are visualizations
if(vis){
visualizer = new Visualizer(vdq.getDataMessageMetadata(), c.getVisualization());
visualizer = new Visualizer(dmeta, c.getVisualization());
b.register(visualizer);
// If there is a continous dimension only update plot at the end of a line
if(c.getScan() != null && c.getScan().getCdimension()!=null){

View File

@@ -1,109 +0,0 @@
/**
*
* Copyright 2010 Paul Scherrer Institute. All rights reserved.
*
* This code is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This code is distributed in the hope that it will be useful,
* but without any warranty; without even the implied warranty of
* merchantability or fitness for a particular purpose. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.core.collector;
import java.util.ArrayList;
import java.util.List;
//import org.apache.commons.lang.SerializationUtils;
//import org.jeromq.ZMQ;
//import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
/**
* Serialize data received by a DataQueue
* @author ebner
*
*/
public class DataDispatcher implements Runnable{
private DataQueue queue;
private List<DataQueue> outQueues;
private EventBus bus;
public DataDispatcher(EventBus b, DataQueue queue){
this.bus = b;
this.queue = queue;
this.outQueues = new ArrayList<DataQueue>();
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try{
// ZMQ.Context context = ZMQ.context();
// ZMQ.Socket socket = context.socket(ZMQ.PUB);
// socket.bind("tcp://*:9090");
// socket.bind("inproc://visualize");
// TODO Need to synchronize message metadata
// for(DataQueue q: outQueues){
// }
// Dispatch Messages
Message message = queue.getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
// Clone message ...
for(DataQueue q: outQueues){
q.getQueue().put(message);
}
bus.post(message);
// socket.send(SerializationUtils.serialize(message));
// Read next message
message = queue.getQueue().take();
}
// Write end of stream message
for(DataQueue q: outQueues){
q.getQueue().put(message);
}
// socket.send(SerializationUtils.serialize(message));
bus.post(message);
// socket.close();
// context.term();
} catch (InterruptedException e) {
// TODO Stop loop and exit logic instead of throwing an Exception
throw new RuntimeException("Data serializer was interrupted while writing data to file",e);
}
}
/**
* @return the outQueues
*/
public List<DataQueue> getOutQueues() {
return outQueues;
}
}

View File

@@ -20,7 +20,7 @@
package ch.psi.fda.core.manipulator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
@@ -35,19 +35,10 @@ import ch.psi.fda.core.messages.Message;
*/
public class Manipulator implements Runnable{
/**
* Outgoing data queue
*/
private final DataQueue outQueue;
private EventBus bus;
private DataMessageMetadata metadata;
/**
* Incomming data queue
*/
private final DataQueue queue;
/**
* List of manipulations
*/
private final List<Manipulation> manipulations;
/**
@@ -56,32 +47,25 @@ public class Manipulator implements Runnable{
* @param manipulations
*/
// TODO need to support multiple (a list of) manipulation(s)
public Manipulator(DataQueue queue, List<Manipulation> manipulations){
public Manipulator(EventBus b, DataQueue queue, List<Manipulation> manipulations){
this.bus = b;
this.manipulations = manipulations;
this.queue = queue;
// Create outgoing data metadata
DataMessageMetadata dmetadata = queue.getDataMessageMetadata().clone();
metadata = queue.getDataMessageMetadata().clone();
// Initialize manipulations and create outgoing metadata
for(Manipulation manipulation: this.manipulations){
// Initialize manipulation
// manipulation.initialize(queue.getDataMessageMetadata());
manipulation.initialize(dmetadata);
manipulation.initialize(metadata);
// Add manipulation id to metadata
dmetadata.getComponents().add(new ComponentMetadata(manipulation.getId(),0)); // Calculated component always belongs to lowes dimension
metadata.getComponents().add(new ComponentMetadata(manipulation.getId(),0)); // Calculated component always belongs to lowes dimension
}
this.queue = queue;
this.outQueue = new DataQueue(new LinkedBlockingQueue<Message>(1000) , dmetadata ); // Create bounded queue to prevent running out of memory ...
}
/**
* @return the outQueue
*/
public DataQueue getOutQueue() {
return outQueue;
public DataMessageMetadata getMetadata() {
return metadata;
}
/* (non-Javadoc)
@@ -102,15 +86,14 @@ public class Manipulator implements Runnable{
}
}
// Put message to outgoing queue ...
outQueue.getQueue().put(message);
bus.post(message);
// Read next message
message = queue.getQueue().take();
}
// Write end of stream message
outQueue.getQueue().put(message);
bus.post(message);
} catch (InterruptedException e) {

View File

@@ -25,8 +25,11 @@ import java.io.FileWriter;
import java.io.IOException;
import java.util.logging.Logger;
import com.google.common.eventbus.Subscribe;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
@@ -43,19 +46,25 @@ public class DataSerializerTXT implements DataSerializer{
// Get Logger
private static final Logger logger = Logger.getLogger(DataSerializerTXT.class.getName());
private DataQueue queue;
// private DataQueue queue;
private File file;
private boolean appendSuffix = true;
private boolean first = true;
private File outfile;
private DataMessageMetadata meta;
/**
*
* @param queue
* @param file
* @param appendSuffix Flag whether to append a _0000 suffix after the original file name
*/
public DataSerializerTXT(DataQueue queue, File file, boolean appendSuffix){
this.queue = queue;
public DataSerializerTXT(DataMessageMetadata meta, File file, boolean appendSuffix){
this.meta = meta;
this.file = file;
this.appendSuffix = appendSuffix;
}
@@ -65,171 +74,295 @@ public class DataSerializerTXT implements DataSerializer{
*/
@Override
public void run() {
try{
// WORKAROUND BEGIN
File outfile;
// if(appendSuffix){
// // Append a count suffix to the file. If there is already a file with
// // this suffix increase the counter for the suffix
// int cnt = 0;
// String fname = this.file.getAbsolutePath(); // Determine file name
// String extension = fname.replaceAll("^.*\\.", ""); // Determine extension
// fname = fname.replaceAll("\\."+extension+"$", "");
// try{
//
// // WORKAROUND BEGIN
//
//// if(appendSuffix){
//// // Append a count suffix to the file. If there is already a file with
//// // this suffix increase the counter for the suffix
//// int cnt = 0;
//// String fname = this.file.getAbsolutePath(); // Determine file name
//// String extension = fname.replaceAll("^.*\\.", ""); // Determine extension
//// fname = fname.replaceAll("\\."+extension+"$", "");
////
//// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
////
//// while(outfile.exists()){
//// cnt++;
//// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
//// }
//// }
//// else{
//// outfile = this.file;
//// }
// // WORKAROUND END
//
//
//
//
//
// // Write header
// StringBuffer b = new StringBuffer();
// StringBuffer b1 = new StringBuffer();
// b.append("#");
// b1.append("#");
// for(ComponentMetadata c: queue.getDataMessageMetadata().getComponents()){
//
// b.append(c.getId());
// b.append("\t");
//
// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
//
// while(outfile.exists()){
// cnt++;
// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
// b1.append(c.getDimension());
// b1.append("\t");
// }
// b.setCharAt(b.length()-1, '\n');
// b1.setCharAt(b1.length()-1, '\n');
//
//
// int icount = 0;
// boolean newfile = true;
// boolean dataInBetween = false;
// BufferedWriter writer = null;
//
// // Get basename of the file
// String basename = this.file.getAbsolutePath(); // Determine file name
// String extension = basename.replaceAll("^.*\\.", ""); // Determine extension
// basename = basename.replaceAll("\\."+extension+"$", "");
//
// // Write data
// // Read Message
// Message message = queue.getQueue().take();
// while(!(message instanceof EndOfStreamMessage)){
// if(message instanceof DataMessage){
// dataInBetween = true;
// if(newfile){
// // Open new file and write header
// // Construct file name
// if(appendSuffix){
// outfile = new File(String.format("%s_%04d.%s", basename, icount, extension));
// }
// else{
// outfile = new File(String.format("%s.%s", basename, extension));
// }
//
// // Open file
// logger.fine("Open new data file: "+outfile.getAbsolutePath());
// writer = new BufferedWriter(new FileWriter(outfile));
//
// // Write header
// writer.write(b.toString());
// writer.write(b1.toString());
//
// newfile=false;
// }
//
// // Write message to file - each message will result in one line
// DataMessage m = (DataMessage) message;
// StringBuffer buffer = new StringBuffer();
// for(Object o: m.getData()){
// if(o.getClass().isArray()){
// // If the array object is of type double[] display its content
// if(o instanceof double[]){
// double[] oa = (double[]) o;
// for(double o1 : oa){
// buffer.append(o1);
// buffer.append(" "); // Use space instead of tab
// }
// buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab
// }
// else if(o instanceof Object[]){
// // TODO need to be recursive ...
// Object[] oa = (Object[])o;
// for(Object o1 : oa){
// buffer.append(o1);
// buffer.append(" "); // Use space instead of tab
// }
// buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab
// }
// else{
// buffer.append("-"); // Not supported
// }
// }
// else{
// buffer.append(o);
// buffer.append("\t");
// }
// }
//
// if(buffer.length()>0){
// buffer.deleteCharAt(buffer.length()-1); // Remove last character (i.e. \t)
// buffer.append("\n"); // Append newline
// }
// writer.write(buffer.toString());
// }
// else if(message instanceof StreamDelimiterMessage){
// StreamDelimiterMessage m = (StreamDelimiterMessage) message;
// logger.info("Delimiter - number: "+m.getNumber()+" iflag: "+m.isIflag());
// if(m.isIflag() && appendSuffix){
// // Only increase iflag counter if there was data in between
// // subsequent StreamDelimiterMessages.
// if(dataInBetween){
// icount++;
// }
// dataInBetween = false;
//
// // Set flag to open new file
// newfile = true;
//
// // Close file
// writer.close();
// }
// }
//
// // Read next message
// message = queue.getQueue().take();
// }
// else{
// outfile = this.file;
//
// if(writer!=null){
// // Close file
// writer.close(); //If the stream was closed previously this has no effect
// }
// WORKAROUND END
// // Writer can be null if a scan is defined without a dimension
//
// } catch (InterruptedException e) {
// // TODO Stop loop and exit logic instead of throwing an Exception
// throw new RuntimeException("Data serializer was interrupted while writing data to file",e);
// } catch (IOException e) {
// throw new RuntimeException("Data serializer had a problem writing to the specified file",e);
// }
//
}
int icount;
String basename;
String extension;
boolean newfile;
boolean dataInBetween;
BufferedWriter writer;
StringBuffer b;
StringBuffer b1;
@Subscribe
public void onMessage(Message message){
try{
if(first){
first=false;
// Write header
StringBuffer b = new StringBuffer();
StringBuffer b1 = new StringBuffer();
b.append("#");
b1.append("#");
for(ComponentMetadata c: queue.getDataMessageMetadata().getComponents()){
b = new StringBuffer();
b1 = new StringBuffer();
b.append("#");
b1.append("#");
for(ComponentMetadata c: meta.getComponents()){
b.append(c.getId());
b.append("\t");
b1.append(c.getDimension());
b1.append("\t");
}
b.setCharAt(b.length()-1, '\n');
b1.setCharAt(b1.length()-1, '\n');
int icount = 0;
boolean newfile = true;
boolean dataInBetween = false;
BufferedWriter writer = null;
// Get basename of the file
String basename = this.file.getAbsolutePath(); // Determine file name
String extension = basename.replaceAll("^.*\\.", ""); // Determine extension
basename = basename.replaceAll("\\."+extension+"$", "");
// Write data
// Read Message
Message message = queue.getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
if(message instanceof DataMessage){
dataInBetween = true;
if(newfile){
// Open new file and write header
// Construct file name
if(appendSuffix){
outfile = new File(String.format("%s_%04d.%s", basename, icount, extension));
}
else{
outfile = new File(String.format("%s.%s", basename, extension));
b.append(c.getId());
b.append("\t");
b1.append(c.getDimension());
b1.append("\t");
}
b.setCharAt(b.length()-1, '\n');
b1.setCharAt(b1.length()-1, '\n');
// Open file
logger.fine("Open new data file: "+outfile.getAbsolutePath());
writer = new BufferedWriter(new FileWriter(outfile));
// Write header
writer.write(b.toString());
writer.write(b1.toString());
newfile=false;
}
// Write message to file - each message will result in one line
DataMessage m = (DataMessage) message;
StringBuffer buffer = new StringBuffer();
for(Object o: m.getData()){
if(o.getClass().isArray()){
// If the array object is of type double[] display its content
if(o instanceof double[]){
double[] oa = (double[]) o;
for(double o1 : oa){
buffer.append(o1);
buffer.append(" "); // Use space instead of tab
}
buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab
}
else if(o instanceof Object[]){
// TODO need to be recursive ...
Object[] oa = (Object[])o;
for(Object o1 : oa){
buffer.append(o1);
buffer.append(" "); // Use space instead of tab
}
buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab
}
else{
buffer.append("-"); // Not supported
}
}
else{
buffer.append(o);
buffer.append("\t");
}
}
if(buffer.length()>0){
buffer.deleteCharAt(buffer.length()-1); // Remove last character (i.e. \t)
buffer.append("\n"); // Append newline
}
writer.write(buffer.toString());
}
else if(message instanceof StreamDelimiterMessage){
StreamDelimiterMessage m = (StreamDelimiterMessage) message;
logger.info("Delimiter - number: "+m.getNumber()+" iflag: "+m.isIflag());
if(m.isIflag() && appendSuffix){
// Only increase iflag counter if there was data in between
// subsequent StreamDelimiterMessages.
if(dataInBetween){
icount++;
}
dataInBetween = false;
// Set flag to open new file
icount = 0;
newfile = true;
dataInBetween = false;
writer = null;
// Close file
writer.close();
}
// Get basename of the file
basename = this.file.getAbsolutePath(); // Determine file name
extension = basename.replaceAll("^.*\\.", ""); // Determine extension
basename = basename.replaceAll("\\."+extension+"$", "");
}
if(message instanceof DataMessage){
dataInBetween = true;
if(newfile){
// Open new file and write header
// Construct file name
if(appendSuffix){
outfile = new File(String.format("%s_%04d.%s", basename, icount, extension));
}
else{
outfile = new File(String.format("%s.%s", basename, extension));
}
// Read next message
message = queue.getQueue().take();
// Open file
logger.fine("Open new data file: "+outfile.getAbsolutePath());
writer = new BufferedWriter(new FileWriter(outfile));
// Write header
writer.write(b.toString());
writer.write(b1.toString());
newfile=false;
}
// Write message to file - each message will result in one line
DataMessage m = (DataMessage) message;
StringBuffer buffer = new StringBuffer();
for(Object o: m.getData()){
if(o.getClass().isArray()){
// If the array object is of type double[] display its content
if(o instanceof double[]){
double[] oa = (double[]) o;
for(double o1 : oa){
buffer.append(o1);
buffer.append(" "); // Use space instead of tab
}
buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab
}
else if(o instanceof Object[]){
// TODO need to be recursive ...
Object[] oa = (Object[])o;
for(Object o1 : oa){
buffer.append(o1);
buffer.append(" "); // Use space instead of tab
}
buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab
}
else{
buffer.append("-"); // Not supported
}
}
else{
buffer.append(o);
buffer.append("\t");
}
}
if(buffer.length()>0){
buffer.deleteCharAt(buffer.length()-1); // Remove last character (i.e. \t)
buffer.append("\n"); // Append newline
}
writer.write(buffer.toString());
}
else if(message instanceof StreamDelimiterMessage){
StreamDelimiterMessage m = (StreamDelimiterMessage) message;
logger.info("Delimiter - number: "+m.getNumber()+" iflag: "+m.isIflag());
if(m.isIflag() && appendSuffix){
// Only increase iflag counter if there was data in between
// subsequent StreamDelimiterMessages.
if(dataInBetween){
icount++;
}
dataInBetween = false;
// Set flag to open new file
newfile = true;
// Close file
writer.close();
}
}
else if (message instanceof EndOfStreamMessage){
if(writer!=null){
// Close file
writer.close(); //If the stream was closed previously this has no effect
}
// Writer can be null if a scan is defined without a dimension
} catch (InterruptedException e) {
// TODO Stop loop and exit logic instead of throwing an Exception
throw new RuntimeException("Data serializer was interrupted while writing data to file",e);
}
} catch (IOException e) {
throw new RuntimeException("Data serializer had a problem writing to the specified file",e);
}
}
// /**
// * Enable/disable the generation of the _0000 suffix in front of the extension of the out file
// * @param appendSuffix the appendSuffix to set
// */
// public void setAppendSuffix(boolean appendSuffix) {
// this.appendSuffix = appendSuffix;
// }
}