Fixed all compile problems and migrated all serializers and

deserializers to Guava EventBus.
This commit is contained in:
2013-10-03 13:44:08 +02:00
parent c3238e4d22
commit 2a6018c4f5
17 changed files with 756 additions and 1055 deletions
@@ -33,6 +33,8 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.deserializer.DataDeserializer;
import ch.psi.fda.deserializer.DataDeserializerMDA;
import ch.psi.fda.deserializer.DataDeserializerTXT;
@@ -82,14 +84,17 @@ public class ConversionEngine {
else if(output.exists()){
throw new IllegalArgumentException("Output file ["+output.getAbsolutePath()+"] already exists");
}
EventBus bus = new EventBus();
// Create deserializer
DataDeserializer deserializer;
if(reader.equals(Reader.TXT)){
deserializer = new DataDeserializerTXT(input);
deserializer = new DataDeserializerTXT(bus, input);
}
else if(reader.equals(Reader.MDA)){
deserializer = new DataDeserializerMDA(input);
deserializer = new DataDeserializerMDA(bus, input);
}
else{
throw new IllegalArgumentException("Reader of type "+reader+" not supported.");
@@ -97,39 +102,33 @@ public class ConversionEngine {
DataSerializer serializer;
if(writer.equals(Writer.MAT)){
serializer = new DataSerializerMAT(deserializer.getQueue(), output);
serializer = new DataSerializerMAT(deserializer.getMetadata(), output);
}
else if(writer.equals(Writer.MAT_2D)){
serializer = new DataSerializerMAT2D(deserializer.getQueue(), output);
serializer = new DataSerializerMAT2D(deserializer.getMetadata(), output);
}
else if(writer.equals(Writer.TXT)){
serializer = new DataSerializerTXT(deserializer.getQueue(), output, false);
serializer = new DataSerializerTXT(deserializer.getMetadata(), output, false);
}
else if(writer.equals(Writer.TXT_2D)){
serializer = new DataSerializerTXT2D(deserializer.getQueue(), output);
serializer = new DataSerializerTXT2D(deserializer.getMetadata(), output);
}
else if(writer.equals(Writer.TXT_SPLIT)){
serializer = new DataSerializerTXTSplit(deserializer.getQueue(), output);
serializer = new DataSerializerTXTSplit(deserializer.getMetadata(), output);
}
else if(writer.equals(Writer.MDA)){
serializer = new DataSerializerMDA(deserializer.getQueue(), output);
serializer = new DataSerializerMDA(deserializer.getMetadata(), output);
}
else if(writer.equals(Writer.MAT_2D_Z)){
serializer = new DataSerializerMAT2DZigZag(deserializer.getQueue(), output);
serializer = new DataSerializerMAT2DZigZag(deserializer.getMetadata(), output);
}
else{
throw new IllegalArgumentException("Writer of type "+writer+" not supported.");
}
// Start deserializer and serializer
Thread td = new Thread(deserializer);
Thread ts = new Thread(serializer);
td.start();
ts.start();
td.join();
ts.join();
// Start conversion
bus.register(serializer);
deserializer.read();
}
/**
@@ -19,18 +19,20 @@
package ch.psi.fda.deserializer;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.DataMessageMetadata;
/**
* Data deserializer
* @author ebner
*
*/
public interface DataDeserializer extends Runnable {
public interface DataDeserializer {
/**
* Get data queue of deserializer
* @return data queue of deserializer
* Get message metadata
* @return metadata/information of the message format
*/
public DataQueue getQueue();
public DataMessageMetadata getMetadata();
public void read();
}
@@ -9,45 +9,45 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
import ch.psi.fda.core.messages.StreamDelimiterMessage;
/**
* TODO Need to be optimized as currently the while file is read into memory when creating this object.
* @author ebner
*
*/
public class DataDeserializerMDA implements DataDeserializer {
private static Logger logger = Logger.getLogger(DataDeserializerMDA.class.getName());
private DataQueue queue;
private EventBus bus;
private DataMessageMetadata metadata;
private RecursiveReturnContainer c;
public DataDeserializerMDA(File file){
public DataDeserializerMDA(EventBus b, File file){
this.bus = b;
try {
RecursiveReturnContainer c = read(new FileInputStream(file));
this.queue = new DataQueue(new LinkedBlockingQueue<Message>(), c.getMetadata());
// Add data to queue
for(Message m: c.getMessage()){
queue.getQueue().put(m);
}
queue.getQueue().put(new EndOfStreamMessage());
try{
c = read(new FileInputStream(file));
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.metadata = c.getMetadata();
}
public RecursiveReturnContainer read(InputStream in) throws IOException {
private RecursiveReturnContainer read(InputStream in) throws IOException {
logger.fine("Read MDA input stream");
XDRInputStream x = new XDRInputStream(in);
@@ -443,21 +443,18 @@ public class DataDeserializerMDA implements DataDeserializer {
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
// TODO Auto-generated method stub
public void read() {
// Add data to queue
for(Message m: c.getMessage()){
bus.post(m);
}
bus.post(new EndOfStreamMessage());
}
/* (non-Javadoc)
* @see ch.psi.fda.deserializer.DataDeserializer#getQueue()
*/
@Override
public DataQueue getQueue() {
return queue;
public DataMessageMetadata getMetadata() {
return metadata;
}
}
@@ -25,16 +25,15 @@ import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.StreamDelimiterMessage;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
/**
* Deserialize file data and put it into the DataQueue
@@ -43,10 +42,10 @@ import ch.psi.fda.core.messages.Message;
*/
public class DataDeserializerTXT implements DataDeserializer {
// Get Logger
private static Logger logger = Logger.getLogger(DataDeserializerTXT.class.getName());
private DataQueue queue;
private EventBus bus;
private DataMessageMetadata metadata;
private File file;
private List<Integer> dindex;
@@ -56,12 +55,12 @@ public class DataDeserializerTXT implements DataDeserializer {
* Default Constructor
* @param file
*/
public DataDeserializerTXT(File file){
public DataDeserializerTXT(EventBus b, File file){
this.bus = b;
this.file = file;
this.dindex = new ArrayList<Integer>();
this.iindex = new ArrayList<Integer>();
DataMessageMetadata metadata;
try{
// Read metadata
// Open file
@@ -103,23 +102,15 @@ public class DataDeserializerTXT implements DataDeserializer {
catch(Exception e){
throw new RuntimeException("Unable to read file metadata and initialize data queue",e);
}
this.queue = new DataQueue(new LinkedBlockingQueue<Message>(10000000), metadata);
}
/* (non-Javadoc)
* @see ch.psi.fda.deserializer.DataDeserializer#getQueue()
*/
@Override
public DataQueue getQueue(){
return(queue);
public DataMessageMetadata getMetadata(){
return(metadata);
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
public void read() {
try{
List<Double> checklist = new ArrayList<Double>(dindex.size());
@@ -195,14 +186,14 @@ public class DataDeserializerTXT implements DataDeserializer {
// }
if(checklist.get(i)!=null &&!checklist.get(i).equals(d)){
// If value changes issue a dimension delimiter message
queue.getQueue().put(new StreamDelimiterMessage(dindex.get(t)-1));
bus.post(new StreamDelimiterMessage(dindex.get(t)-1));
}
checklist.set(i, d);
}
}
// Put message to queue
queue.getQueue().put(message);
bus.post(message);
// TODO Need to detect dimension boundaries
@@ -210,20 +201,17 @@ public class DataDeserializerTXT implements DataDeserializer {
// Add delimiter for all the dimensions
for(int i=dindex.size()-1;i>=0;i--){
queue.getQueue().put(new StreamDelimiterMessage(dindex.get(i)));
bus.post(new StreamDelimiterMessage(dindex.get(i)));
}
// queue.getQueue().put(new DimensionDelimiterMessage(dindex.get(0)-1));
// queue.getQueue().put(new DimensionDelimiterMessage(dindex.get(0)));
// Place end of stream message
queue.getQueue().put(new EndOfStreamMessage());
bus.post(new EndOfStreamMessage());
// Close file
reader.close();
} catch (InterruptedException e) {
throw new RuntimeException("Data deserializer was interrupted while reading the datafile",e);
} catch (IOException e) {
throw new RuntimeException("Data deserializer had a problem reading the specified datafile",e);
}
@@ -20,9 +20,7 @@
package ch.psi.fda.serializer;
/**
* Data Serializer
* @author ebner
*
* Data Serializer marker interface
*/
public interface DataSerializer extends Runnable {
public interface DataSerializer {
}
@@ -24,13 +24,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.google.common.eventbus.Subscribe;
import com.jmatio.io.MatFileWriter;
import com.jmatio.types.MLArray;
import com.jmatio.types.MLDouble;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
@@ -41,120 +42,109 @@ import ch.psi.fda.core.messages.Message;
*/
public class DataSerializerMAT implements DataSerializer{
private DataQueue queue;
private DataMessageMetadata metadata;
private File file;
private boolean appendSuffix = false;
private boolean first = true;
/**
* Construtor
* @param queue Data queue holding the data to serialize
* @param file Name of the Matlab file to serialize the data to
*/
public DataSerializerMAT(DataQueue queue, File file){
this.queue = queue;
public DataSerializerMAT(DataMessageMetadata metadata, File file){
this.metadata = metadata;
this.file = file;
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try{
// WORKAROUND BEGIN
File outfile;
if(appendSuffix){
// Append a count suffix to the file. If there is already a file with
// this suffix increase the counter for the suffix
int cnt = 0;
String fname = this.file.getAbsolutePath(); // Determine file name
String extension = fname.replaceAll("^.*\\.", ""); // Determine extension
fname = fname.replaceAll("\\."+extension+"$", "");
outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
while(outfile.exists()){
cnt++;
outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
}
}
else{
outfile = this.file;
}
// WORKAROUND END
File outfile;
List<List<Object>> dlist;
List<Class<?>> clist;
boolean firstF;
@Subscribe
public void onMessage(Message message) {
try {
// Transposed data list
List<List<Object>> dlist = new ArrayList<List<Object>>();
List<Class<?>> clist = new ArrayList<Class<?>>();
boolean firstF = true;
// Write data
// Read Message
Message message = queue.getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
if(message instanceof DataMessage){
DataMessage m = (DataMessage) message;
// Initialize list
if(firstF){
for(Object o: m.getData()){
dlist.add(new ArrayList<Object>());
clist.add(o.getClass());
}
firstF=false;
if (first) {
first = false;
// WORKAROUND BEGIN
if (appendSuffix) {
// Append a count suffix to the file. If there is already a
// file with
// this suffix increase the counter for the suffix
int cnt = 0;
String fname = this.file.getAbsolutePath(); // Determine
// file name
String extension = fname.replaceAll("^.*\\.", ""); // Determine
// extension
fname = fname.replaceAll("\\." + extension + "$", "");
outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
while (outfile.exists()) {
cnt++;
outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
}
// Put data into data list
for(int i=0;i< m.getData().size();i++){
Object object = m.getData().get(i);
dlist.get(i).add(object);
} else {
outfile = this.file;
}
// WORKAROUND END
// Transposed data list
dlist = new ArrayList<List<Object>>();
clist = new ArrayList<Class<?>>();
firstF = true;
}
if (message instanceof DataMessage) {
DataMessage m = (DataMessage) message;
// Initialize list
if (firstF) {
for (Object o : m.getData()) {
dlist.add(new ArrayList<Object>());
clist.add(o.getClass());
}
firstF = false;
}
// Read next message
message = queue.getQueue().take();
// Put data into data list
for (int i = 0; i < m.getData().size(); i++) {
Object object = m.getData().get(i);
dlist.get(i).add(object);
}
} else if (message instanceof EndOfStreamMessage) {
// Create Matlab vectors
ArrayList<MLArray> matlablist = new ArrayList<MLArray>();
for (int t = 0; t < dlist.size(); t++) {
// Get component metadata
ComponentMetadata c = metadata.getComponents().get(t);
c.getId();
List<Object> list = dlist.get(t);
if (clist.get(t).isArray()) {
// Array Handling
} else if (clist.get(t).equals(Double.class)) {
// Data is of type Double
MLDouble darray = new MLDouble(escapeString(c.getId()), (Double[]) list.toArray(new Double[list.size()]), 1);
matlablist.add(darray);
}
}
// Write Matlab file
MatFileWriter writerr = new MatFileWriter();
writerr.write(outfile, matlablist);
}
// Create Matlab vectors
ArrayList<MLArray> matlablist = new ArrayList<MLArray>();
for(int t=0; t<dlist.size(); t++ ){
// Get component metadata
ComponentMetadata c = queue.getDataMessageMetadata().getComponents().get(t);
c.getId();
List<Object> list = dlist.get(t);
if(clist.get(t).isArray()){
// Array Handling
}
else if(clist.get(t).equals(Double.class)){
// Data is of type Double
MLDouble darray = new MLDouble(escapeString(c.getId()),(Double[])list.toArray(new Double[list.size()]),1);
matlablist.add(darray);
}
}
// Write Matlab file
MatFileWriter writerr = new MatFileWriter();
writerr.write(outfile, matlablist);
} catch (InterruptedException e) {
// TODO Stop loop and exit logic instead of throwing an Exception
throw new RuntimeException("Data serializer was interrupted while writing data to file",e);
} catch (IOException e) {
throw new RuntimeException("Data serializer had a problem writing to the specified file",e);
throw new RuntimeException("Data serializer had a problem writing to the specified file", e);
}
}
/**
@@ -25,13 +25,14 @@ import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
import com.google.common.eventbus.Subscribe;
import com.jmatio.io.MatFileWriter;
import com.jmatio.types.MLArray;
import com.jmatio.types.MLDouble;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.StreamDelimiterMessage;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
@@ -43,25 +44,34 @@ import ch.psi.fda.core.messages.Message;
*/
public class DataSerializerMAT2D implements DataSerializer{
// Get Logger
private static final Logger logger = Logger.getLogger(DataSerializerMAT2D.class.getName());
private DataQueue queue;
private DataMessageMetadata metadata;
private File file;
private boolean appendSuffix = false;
private boolean first = true;
private List<List<List<Object>>> dlist;
private List<Class<?>> clist;
private int dsize;
private int dcount;
private Integer mindsize;
private boolean firstF;
private File outfile;
/**
* Construtor
* @param queue Data queue holding the data to serialize
* @param file Name of the Matlab file to serialize the data to
*/
public DataSerializerMAT2D(DataQueue queue, File file){
this.queue = queue;
public DataSerializerMAT2D(DataMessageMetadata metadata, File file){
this.metadata = metadata;
this.file = file;
// Check if input queue does only hold 2D data
int maxdim=0;
for(ComponentMetadata m: queue.getDataMessageMetadata().getComponents()){
for(ComponentMetadata m: metadata.getComponents()){
if(m.getDimension()>maxdim){
maxdim=m.getDimension();
}
@@ -76,49 +86,42 @@ public class DataSerializerMAT2D implements DataSerializer{
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
@Subscribe
public void onMessage(Message message) {
try{
// WORKAROUND BEGIN
File outfile;
if(appendSuffix){
// Append a count suffix to the file. If there is already a file with
// this suffix increase the counter for the suffix
int cnt = 0;
String fname = this.file.getAbsolutePath(); // Determine file name
String extension = fname.replaceAll("^.*\\.", ""); // Determine extension
fname = fname.replaceAll("\\."+extension+"$", "");
outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
while(outfile.exists()){
cnt++;
if(first){
first=false;
// WORKAROUND BEGIN
if(appendSuffix){
// Append a count suffix to the file. If there is already a file with
// this suffix increase the counter for the suffix
int cnt = 0;
String fname = this.file.getAbsolutePath(); // Determine file name
String extension = fname.replaceAll("^.*\\.", ""); // Determine extension
fname = fname.replaceAll("\\."+extension+"$", "");
outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
while(outfile.exists()){
cnt++;
outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
}
}
else{
outfile = this.file;
}
// WORKAROUND END
// Transposed data list
dlist = new ArrayList<List<List<Object>>>();
clist = new ArrayList<Class<?>>();
dsize = 0; // Size of the dimension
dcount = 0;
mindsize = null;
firstF = true;
}
else{
outfile = this.file;
}
// WORKAROUND END
// Transposed data list
List<List<List<Object>>> dlist = new ArrayList<List<List<Object>>>();
List<Class<?>> clist = new ArrayList<Class<?>>();
int dsize = 0; // Size of the dimension
int dcount = 0;
Integer mindsize = null;
boolean firstF = true;
// Write data
// Read Message
Message message = queue.getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
if(message instanceof DataMessage){
DataMessage m = (DataMessage) message;
@@ -170,62 +173,55 @@ public class DataSerializerMAT2D implements DataSerializer{
}
}
// Read next message
message = queue.getQueue().take();
}
logger.info("dsize: "+dsize + " mindsize:"+mindsize);
// Create Matlab vectors
ArrayList<MLArray> matlablist = new ArrayList<MLArray>();
logger.info("dlist size: "+dlist.size());
for(int t=0; t<dlist.size(); t++ ){
else if(message instanceof EndOfStreamMessage){
logger.info("dsize: "+dsize + " mindsize:"+mindsize);
// Get component metadata
ComponentMetadata c = queue.getDataMessageMetadata().getComponents().get(t);
// Combine all lists to one big list (pad if there are data points missing)
List<Object> list = new ArrayList<Object>();
List<List<Object>> ol = dlist.get(t);
// Remove last array list as it is empty
ol.remove(ol.size()-1);
for(List<Object> li: ol){
list.addAll(li);
// Pad list if there are missing data points for some lines
for(int i=li.size();i<dsize;i++){
logger.info("Pad data point: "+i);
list.add(Double.NaN);
// Create Matlab vectors
ArrayList<MLArray> matlablist = new ArrayList<MLArray>();
logger.info("dlist size: "+dlist.size());
for(int t=0; t<dlist.size(); t++ ){
// Get component metadata
ComponentMetadata c = metadata.getComponents().get(t);
// Combine all lists to one big list (pad if there are data points missing)
List<Object> list = new ArrayList<Object>();
List<List<Object>> ol = dlist.get(t);
// Remove last array list as it is empty
ol.remove(ol.size()-1);
for(List<Object> li: ol){
list.addAll(li);
// Pad list if there are missing data points for some lines
for(int i=li.size();i<dsize;i++){
logger.info("Pad data point: "+i);
list.add(Double.NaN);
}
}
// List<Object> list = dlist.get(t);
logger.info("List: "+list.size());
if(clist.get(t).isArray()){
// Array Handling
}
else if(clist.get(t).equals(Double.class)){
// Data is of type Double
MLDouble darray = new MLDouble(escapeString(c.getId()),(Double[])list.toArray(new Double[list.size()]), dsize);
matlablist.add(darray);
}
}
// List<Object> list = dlist.get(t);
logger.info("List: "+list.size());
if(clist.get(t).isArray()){
// Array Handling
}
else if(clist.get(t).equals(Double.class)){
// Data is of type Double
MLDouble darray = new MLDouble(escapeString(c.getId()),(Double[])list.toArray(new Double[list.size()]), dsize);
matlablist.add(darray);
}
// Write Matlab file
MatFileWriter writerr = new MatFileWriter();
writerr.write(outfile, matlablist);
}
// Write Matlab file
MatFileWriter writerr = new MatFileWriter();
writerr.write(outfile, matlablist);
} catch (InterruptedException e) {
// TODO Stop loop and exit logic instead of throwing an Exception
throw new RuntimeException("Data serializer was interrupted while writing data to file",e);
} catch (IOException e) {
throw new RuntimeException("Data serializer had a problem writing to the specified file",e);
}
@@ -26,13 +26,14 @@ import java.util.Collections;
import java.util.List;
import java.util.logging.Logger;
import com.google.common.eventbus.Subscribe;
import com.jmatio.io.MatFileWriter;
import com.jmatio.types.MLArray;
import com.jmatio.types.MLDouble;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.StreamDelimiterMessage;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
@@ -47,22 +48,35 @@ public class DataSerializerMAT2DZigZag implements DataSerializer{
// Get Logger
private static final Logger logger = Logger.getLogger(DataSerializerMAT2DZigZag.class.getName());
private DataQueue queue;
private DataMessageMetadata metadata;
private File file;
private boolean appendSuffix = false;
private boolean first = true;
private File outfile;
private List<List<Object>> dlist;
private List<List<Object>> dlistTmp;
private List<Class<?>> clist;
private int dsize; // Size of the dimension
private int dcount;
private int delimiterCount;
private boolean firstF;
private boolean firstC;
/**
* Construtor
* @param queue Data queue holding the data to serialize
* @param file Name of the Matlab file to serialize the data to
*/
public DataSerializerMAT2DZigZag(DataQueue queue, File file){
this.queue = queue;
public DataSerializerMAT2DZigZag(DataMessageMetadata metadata, File file){
this.metadata = metadata;
this.file = file;
// Check if input queue does only hold 2D data
int maxdim=0;
for(ComponentMetadata m: queue.getDataMessageMetadata().getComponents()){
for(ComponentMetadata m: metadata.getComponents()){
if(m.getDimension()>maxdim){
maxdim=m.getDimension();
}
@@ -77,52 +91,49 @@ public class DataSerializerMAT2DZigZag implements DataSerializer{
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
@Subscribe
public void onMessage(Message message) {
try{
// WORKAROUND BEGIN
File outfile;
if(appendSuffix){
// Append a count suffix to the file. If there is already a file with
// this suffix increase the counter for the suffix
int cnt = 0;
String fname = this.file.getAbsolutePath(); // Determine file name
String extension = fname.replaceAll("^.*\\.", ""); // Determine extension
fname = fname.replaceAll("\\."+extension+"$", "");
if(first){
first=false;
outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
while(outfile.exists()){
cnt++;
// WORKAROUND BEGIN
if(appendSuffix){
// Append a count suffix to the file. If there is already a file with
// this suffix increase the counter for the suffix
int cnt = 0;
String fname = this.file.getAbsolutePath(); // Determine file name
String extension = fname.replaceAll("^.*\\.", ""); // Determine extension
fname = fname.replaceAll("\\."+extension+"$", "");
outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
while(outfile.exists()){
cnt++;
outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
}
}
else{
outfile = this.file;
}
// WORKAROUND END
// Transposed data list
dlist = new ArrayList<List<Object>>();
dlistTmp = new ArrayList<List<Object>>();
clist = new ArrayList<Class<?>>();
dsize = 0; // Size of the dimension
dcount = 0;
delimiterCount = 0;
firstF = true;
firstC = true;
}
else{
outfile = this.file;
}
// WORKAROUND END
// Transposed data list
List<List<Object>> dlist = new ArrayList<List<Object>>();
List<List<Object>> dlistTmp = new ArrayList<List<Object>>();
List<Class<?>> clist = new ArrayList<Class<?>>();
int dsize = 0; // Size of the dimension
int dcount = 0;
int delimiterCount = 0;
boolean firstF = true;
boolean firstC = true;
// Write data
// Read Message
Message message = queue.getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
if(message instanceof DataMessage){
DataMessage m = (DataMessage) message;
@@ -172,41 +183,35 @@ public class DataSerializerMAT2DZigZag implements DataSerializer{
delimiterCount++;
}
}
// Read next message
message = queue.getQueue().take();
}
// Create Matlab vectors
ArrayList<MLArray> matlablist = new ArrayList<MLArray>();
logger.info("dlist size: "+dlist.size());
for(int t=0; t<dlist.size(); t++ ){
// Get component metadata
ComponentMetadata c = queue.getDataMessageMetadata().getComponents().get(t);
List<Object> list = dlist.get(t);
if(clist.get(t).isArray()){
// Array Handling
}
else if(clist.get(t).equals(Double.class)){
// Data is of type Double
MLDouble darray = new MLDouble(escapeString(c.getId()),(Double[])list.toArray(new Double[list.size()]), dsize);
matlablist.add(darray);
else if(message instanceof EndOfStreamMessage){
// Create Matlab vectors
ArrayList<MLArray> matlablist = new ArrayList<MLArray>();
logger.info("dlist size: "+dlist.size());
for(int t=0; t<dlist.size(); t++ ){
// Get component metadata
ComponentMetadata c = metadata.getComponents().get(t);
List<Object> list = dlist.get(t);
if(clist.get(t).isArray()){
// Array Handling
}
else if(clist.get(t).equals(Double.class)){
// Data is of type Double
MLDouble darray = new MLDouble(escapeString(c.getId()),(Double[])list.toArray(new Double[list.size()]), dsize);
matlablist.add(darray);
}
}
// Write Matlab file
MatFileWriter writerr = new MatFileWriter();
writerr.write(outfile, matlablist);
}
// Write Matlab file
MatFileWriter writerr = new MatFileWriter();
writerr.write(outfile, matlablist);
} catch (InterruptedException e) {
// TODO Stop loop and exit logic instead of throwing an Exception
throw new RuntimeException("Data serializer was interrupted while writing data to file",e);
} catch (IOException e) {
throw new RuntimeException("Data serializer had a problem writing to the specified file",e);
}
@@ -30,9 +30,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.logging.Logger;
import com.google.common.eventbus.Subscribe;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
import ch.psi.fda.core.messages.StreamDelimiterMessage;
@@ -47,81 +49,75 @@ import ch.psi.fda.core.messages.StreamDelimiterMessage;
*/
public class DataSerializerMDA implements DataSerializer{
// Get Logger
private static final Logger logger = Logger.getLogger(DataSerializerMDA.class.getName());
private DataQueue queue;
private DataMessageMetadata metadata;
private File file;
public DataSerializerMDA(DataQueue queue, File file){
this.queue = queue;
private boolean first = true;
private List<Boolean> firstL;
private List<Boolean> takeData;
private List<Integer> dcountL;
private List<List<List<List<Double>>>> dimensionList;
private HashMap<Integer,List<Integer>> dMap;
private HashMap<Integer,List<String>> idMap;
private int numberOfDimensions;
public DataSerializerMDA(DataMessageMetadata metadata, File file){
this.metadata = metadata;
this.file = file;
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try{
// Analyze header
// Map holding all indexes for a given dimension
HashMap<Integer,List<Integer>> dMap = new HashMap<Integer, List<Integer>>();
// Map holding all ids for a given dimension
HashMap<Integer,List<String>> idMap = new HashMap<Integer, List<String>>();
List<ComponentMetadata> mlist = queue.getDataMessageMetadata().getComponents();
for(int index=0;index<mlist.size();index++){
ComponentMetadata m = mlist.get(index);
if(!dMap.containsKey(m.getDimension())){
dMap.put(m.getDimension(), new ArrayList<Integer>());
@Subscribe
public void onMessage(Message message) {
if(first){
first = false;
// Analyze header
// Map holding all indexes for a given dimension
dMap = new HashMap<Integer, List<Integer>>();
// Map holding all ids for a given dimension
idMap = new HashMap<Integer, List<String>>();
List<ComponentMetadata> mlist = metadata.getComponents();
for(int index=0;index<mlist.size();index++){
ComponentMetadata m = mlist.get(index);
if(!dMap.containsKey(m.getDimension())){
dMap.put(m.getDimension(), new ArrayList<Integer>());
}
if(!idMap.containsKey(m.getDimension())){
idMap.put(m.getDimension(), new ArrayList<String>());
}
dMap.get(m.getDimension()).add(index);
idMap.get(m.getDimension()).add(m.getId());
}
if(!idMap.containsKey(m.getDimension())){
idMap.put(m.getDimension(), new ArrayList<String>());
//dimensions/dimension/dimensioncomponents/component/componentvalue
dimensionList = new ArrayList<List<List<List<Double>>>>();
numberOfDimensions = dMap.size();
logger.info("Number of dimensions: "+numberOfDimensions);
for(int i=0;i<numberOfDimensions; i++){
// For each dimension add an list
dimensionList.add(new ArrayList<List<List<Double>>>());
}
firstL = new ArrayList<Boolean>();
takeData = new ArrayList<Boolean>(); // Flag whether to take data for this dimension
dcountL = new ArrayList<Integer>(); // How many times this dimension is there
for(int i=0;i<numberOfDimensions;i++){
firstL.add(true);
takeData.add(true);
dcountL.add(0);
}
dMap.get(m.getDimension()).add(index);
idMap.get(m.getDimension()).add(m.getId());
}
//dimensions/dimension/dimensioncomponents/component/componentvalue
List<List<List<List<Double>>>> dimensionList = new ArrayList<List<List<List<Double>>>>();
int numberOfDimensions = dMap.size();
logger.info("Number of dimensions: "+numberOfDimensions);
for(int i=0;i<numberOfDimensions; i++){
// For each dimension add an list
dimensionList.add(new ArrayList<List<List<Double>>>());
}
// // Transposed data list
// List<List<Double>> dlist = new ArrayList<List<Double>>();
//// List<Class<?>> clist = new ArrayList<Class<?>>();
// int dsize = 0; // Size of the dimension
// int dcount = 0;
// boolean firstF = true;
List<Boolean> firstL = new ArrayList<Boolean>();
List<Boolean> takeData = new ArrayList<Boolean>(); // Flag whether to take data for this dimension
List<Integer> dcountL = new ArrayList<Integer>(); // How many times this dimension is there
for(int i=0;i<numberOfDimensions;i++){
firstL.add(true);
takeData.add(true);
dcountL.add(0);
}
// Write data
// Read Message
Message message = queue.getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
if(message instanceof DataMessage){
DataMessage m = (DataMessage) message;
@@ -172,63 +168,53 @@ public class DataSerializerMDA implements DataSerializer{
}
}
// Read next message
message = queue.getQueue().take();
}
// Write MDA file
logger.info("Write MDA data file");
try {
if(file.exists()){
file.delete();
}
// XDROutputStream x = new XDROutputStream(new FileOutputStream(file));
XDRRandomAccessFile x = new XDRRandomAccessFile(file, "rw");
else if(message instanceof EndOfStreamMessage){
// Write MDA file
logger.info("Write MDA data file");
/**
* Read file header
*/
x.writeFloat(0f); // Version
x.writeInt(0); // Scan number
x.writeInt(numberOfDimensions); // Rank/Number of dimensions
for(int i=numberOfDimensions-1;i>=0; i--){
int s = dimensionList.get(i).get(0).get(0).size();
x.writeInt(s); // Dimension size
logger.info("Size: "+i+" - "+s+" ");
}
x.writeInt(1); // Is Regular (true=1, false=0)
x.writeInt(0); // Number of extra pvs
// Write data
HashMap<Integer,Integer> indexCount = new HashMap<Integer,Integer>();
for(int i=0;i<dimensionList.size();i++){
indexCount.put(i, 0);
}
// The highest dimension only consist of one data list
int dnum = dimensionList.size()-1;
writeDimension(x, dimensionList, indexCount, idMap, dnum);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (InterruptedException e) {
// TODO Stop loop and exit logic instead of throwing an Exception
throw new RuntimeException("Data serializer was interrupted while writing data to file",e);
// } catch (IOException e) {
// throw new RuntimeException("Data serializer had a problem writing to the specified file",e);
}
try {
if(file.exists()){
file.delete();
}
// XDROutputStream x = new XDROutputStream(new FileOutputStream(file));
XDRRandomAccessFile x = new XDRRandomAccessFile(file, "rw");
/**
* Read file header
*/
x.writeFloat(0f); // Version
x.writeInt(0); // Scan number
x.writeInt(numberOfDimensions); // Rank/Number of dimensions
for(int i=numberOfDimensions-1;i>=0; i--){
int s = dimensionList.get(i).get(0).get(0).size();
x.writeInt(s); // Dimension size
logger.info("Size: "+i+" - "+s+" ");
}
x.writeInt(1); // Is Regular (true=1, false=0)
x.writeInt(0); // Number of extra pvs
// Write data
HashMap<Integer,Integer> indexCount = new HashMap<Integer,Integer>();
for(int i=0;i<dimensionList.size();i++){
indexCount.put(i, 0);
}
// The highest dimension only consist of one data list
int dnum = dimensionList.size()-1;
writeDimension(x, dimensionList, indexCount, idMap, dnum);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private long writeDimension(XDRRandomAccessFile x, List<List<List<List<Double>>>> dimensionList, HashMap<Integer,Integer> icount, HashMap<Integer,List<String>> idMap, int dnum) throws IOException{
@@ -30,7 +30,6 @@ import com.google.common.eventbus.Subscribe;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
import ch.psi.fda.core.messages.StreamDelimiterMessage;
@@ -42,11 +41,8 @@ import ch.psi.fda.core.messages.StreamDelimiterMessage;
*/
public class DataSerializerTXT implements DataSerializer{
// Get Logger
private static final Logger logger = Logger.getLogger(DataSerializerTXT.class.getName());
// private DataQueue queue;
private File file;
private boolean appendSuffix = true;
@@ -57,9 +53,18 @@ public class DataSerializerTXT implements DataSerializer{
private DataMessageMetadata meta;
private int icount;
private String basename;
private String extension;
private boolean newfile;
private boolean dataInBetween;
private BufferedWriter writer;
private StringBuffer b;
private StringBuffer b1;
/**
*
* @param queue
* @param metadata
* @param file
* @param appendSuffix Flag whether to append a _0000 suffix after the original file name
*/
@@ -69,179 +74,6 @@ public class DataSerializerTXT implements DataSerializer{
this.appendSuffix = appendSuffix;
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
// try{
//
// // WORKAROUND BEGIN
//
//// if(appendSuffix){
//// // Append a count suffix to the file. If there is already a file with
//// // this suffix increase the counter for the suffix
//// int cnt = 0;
//// String fname = this.file.getAbsolutePath(); // Determine file name
//// String extension = fname.replaceAll("^.*\\.", ""); // Determine extension
//// fname = fname.replaceAll("\\."+extension+"$", "");
////
//// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
////
//// while(outfile.exists()){
//// cnt++;
//// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
//// }
//// }
//// else{
//// outfile = this.file;
//// }
// // WORKAROUND END
//
//
//
//
//
// // Write header
// StringBuffer b = new StringBuffer();
// StringBuffer b1 = new StringBuffer();
// b.append("#");
// b1.append("#");
// for(ComponentMetadata c: queue.getDataMessageMetadata().getComponents()){
//
// b.append(c.getId());
// b.append("\t");
//
// b1.append(c.getDimension());
// b1.append("\t");
// }
// b.setCharAt(b.length()-1, '\n');
// b1.setCharAt(b1.length()-1, '\n');
//
//
// int icount = 0;
// boolean newfile = true;
// boolean dataInBetween = false;
// BufferedWriter writer = null;
//
// // Get basename of the file
// String basename = this.file.getAbsolutePath(); // Determine file name
// String extension = basename.replaceAll("^.*\\.", ""); // Determine extension
// basename = basename.replaceAll("\\."+extension+"$", "");
//
// // Write data
// // Read Message
// Message message = queue.getQueue().take();
// while(!(message instanceof EndOfStreamMessage)){
// if(message instanceof DataMessage){
// dataInBetween = true;
// if(newfile){
// // Open new file and write header
// // Construct file name
// if(appendSuffix){
// outfile = new File(String.format("%s_%04d.%s", basename, icount, extension));
// }
// else{
// outfile = new File(String.format("%s.%s", basename, extension));
// }
//
// // Open file
// logger.fine("Open new data file: "+outfile.getAbsolutePath());
// writer = new BufferedWriter(new FileWriter(outfile));
//
// // Write header
// writer.write(b.toString());
// writer.write(b1.toString());
//
// newfile=false;
// }
//
// // Write message to file - each message will result in one line
// DataMessage m = (DataMessage) message;
// StringBuffer buffer = new StringBuffer();
// for(Object o: m.getData()){
// if(o.getClass().isArray()){
// // If the array object is of type double[] display its content
// if(o instanceof double[]){
// double[] oa = (double[]) o;
// for(double o1 : oa){
// buffer.append(o1);
// buffer.append(" "); // Use space instead of tab
// }
// buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab
// }
// else if(o instanceof Object[]){
// // TODO need to be recursive ...
// Object[] oa = (Object[])o;
// for(Object o1 : oa){
// buffer.append(o1);
// buffer.append(" "); // Use space instead of tab
// }
// buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab
// }
// else{
// buffer.append("-"); // Not supported
// }
// }
// else{
// buffer.append(o);
// buffer.append("\t");
// }
// }
//
// if(buffer.length()>0){
// buffer.deleteCharAt(buffer.length()-1); // Remove last character (i.e. \t)
// buffer.append("\n"); // Append newline
// }
// writer.write(buffer.toString());
// }
// else if(message instanceof StreamDelimiterMessage){
// StreamDelimiterMessage m = (StreamDelimiterMessage) message;
// logger.info("Delimiter - number: "+m.getNumber()+" iflag: "+m.isIflag());
// if(m.isIflag() && appendSuffix){
// // Only increase iflag counter if there was data in between
// // subsequent StreamDelimiterMessages.
// if(dataInBetween){
// icount++;
// }
// dataInBetween = false;
//
// // Set flag to open new file
// newfile = true;
//
// // Close file
// writer.close();
// }
// }
//
// // Read next message
// message = queue.getQueue().take();
// }
//
// if(writer!=null){
// // Close file
// writer.close(); //If the stream was closed previously this has no effect
// }
// // Writer can be null if a scan is defined without a dimension
//
// } catch (InterruptedException e) {
// // TODO Stop loop and exit logic instead of throwing an Exception
// throw new RuntimeException("Data serializer was interrupted while writing data to file",e);
// } catch (IOException e) {
// throw new RuntimeException("Data serializer had a problem writing to the specified file",e);
// }
//
}
int icount;
String basename;
String extension;
boolean newfile;
boolean dataInBetween;
BufferedWriter writer;
StringBuffer b;
StringBuffer b1;
@Subscribe
public void onMessage(Message message){
try{
@@ -26,9 +26,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.google.common.eventbus.Subscribe;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.StreamDelimiterMessage;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
@@ -40,22 +42,31 @@ import ch.psi.fda.core.messages.Message;
*/
public class DataSerializerTXT2D implements DataSerializer{
private DataQueue queue;
private DataMessageMetadata metadata;
private File file;
private boolean appendSuffix = false;
private boolean first = true;
File outfile;
List<List<Object>> dlist;
List<Class<?>> clist;
int dsize;
int dcount;
boolean firstF;
/**
* Construtor
* @param queue Data queue holding the data to serialize
* @param file Name of the Matlab file to serialize the data to
*/
public DataSerializerTXT2D(DataQueue queue, File file){
this.queue = queue;
public DataSerializerTXT2D(DataMessageMetadata metadata, File file){
this.metadata = metadata;
this.file = file;
// Check if input queue does only hold 2D data
int maxdim=0;
for(ComponentMetadata m: queue.getDataMessageMetadata().getComponents()){
for(ComponentMetadata m: metadata.getComponents()){
if(m.getDimension()>maxdim){
maxdim=m.getDimension();
}
@@ -70,49 +81,46 @@ public class DataSerializerTXT2D implements DataSerializer{
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
@Subscribe
public void onMessage(Message message) {
try{
// WORKAROUND BEGIN
File outfile;
if(appendSuffix){
// Append a count suffix to the file. If there is already a file with
// this suffix increase the counter for the suffix
int cnt = 0;
String fname = this.file.getAbsolutePath(); // Determine file name
String extension = fname.replaceAll("^.*\\.", ""); // Determine extension
// fname = fname.replaceAll("(_[0-9]+)?\\."+extension+"$", "");
fname = fname.replaceAll("\\."+extension+"$", "");
if(first){
first = false;
// WORKAROUND BEGIN
outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
while(outfile.exists()){
cnt++;
if(appendSuffix){
// Append a count suffix to the file. If there is already a file with
// this suffix increase the counter for the suffix
int cnt = 0;
String fname = this.file.getAbsolutePath(); // Determine file name
String extension = fname.replaceAll("^.*\\.", ""); // Determine extension
// fname = fname.replaceAll("(_[0-9]+)?\\."+extension+"$", "");
fname = fname.replaceAll("\\."+extension+"$", "");
outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
while(outfile.exists()){
cnt++;
outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
}
}
else{
outfile = this.file;
}
// WORKAROUND END
// Transposed data list
dlist = new ArrayList<List<Object>>();
clist = new ArrayList<Class<?>>();
dsize = 0; // Size of the dimension
dcount = 0;
firstF = true;
}
else{
outfile = this.file;
}
// WORKAROUND END
// Transposed data list
List<List<Object>> dlist = new ArrayList<List<Object>>();
List<Class<?>> clist = new ArrayList<Class<?>>();
int dsize = 0; // Size of the dimension
int dcount = 0;
boolean firstF = true;
// Write data
// Read Message
Message message = queue.getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
if(message instanceof DataMessage){
DataMessage m = (DataMessage) message;
@@ -143,59 +151,53 @@ public class DataSerializerTXT2D implements DataSerializer{
}
}
// Read next message
message = queue.getQueue().take();
}
// Open file
BufferedWriter writer = new BufferedWriter(new FileWriter(outfile));
// Create text images
for(int t=0; t<dlist.size(); t++ ){
// Get component metadata
ComponentMetadata c = queue.getDataMessageMetadata().getComponents().get(t);
else if(message instanceof EndOfStreamMessage){
// Open file
BufferedWriter writer = new BufferedWriter(new FileWriter(outfile));
writer.write(c.getId()+"\n");
List<Object> list = dlist.get(t);
if(clist.get(t).isArray()){
// Array Handling
}
else if(clist.get(t).equals(Double.class)){
// Data is of type Double
// Create text images
for(int t=0; t<dlist.size(); t++ ){
// Get component metadata
ComponentMetadata c = metadata.getComponents().get(t);
StringBuffer b = new StringBuffer();
int counter = 0;
for(Object o: list){
b.append(o);
counter++;
if(counter==dsize){
b.append("\n");
counter=0;
}
else{
b.append(" ");
writer.write(c.getId()+"\n");
List<Object> list = dlist.get(t);
if(clist.get(t).isArray()){
// Array Handling
}
else if(clist.get(t).equals(Double.class)){
// Data is of type Double
StringBuffer b = new StringBuffer();
int counter = 0;
for(Object o: list){
b.append(o);
counter++;
if(counter==dsize){
b.append("\n");
counter=0;
}
else{
b.append(" ");
}
}
writer.write(b.toString());
}
writer.write(b.toString());
writer.write("\n");
}
writer.write("\n");
// Close file
writer.close();
}
// Close file
writer.close();
} catch (InterruptedException e) {
// TODO Stop loop and exit logic instead of throwing an Exception
throw new RuntimeException("Data serializer was interrupted while writing data to file",e);
} catch (IOException e) {
throw new RuntimeException("Data serializer had a problem writing to the specified file",e);
}
@@ -26,11 +26,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.google.common.eventbus.Subscribe;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.StreamDelimiterMessage;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
/**
@@ -40,20 +41,23 @@ import ch.psi.fda.core.messages.Message;
*/
public class DataSerializerTXTSplit implements DataSerializer{
private DataQueue queue;
private DataMessageMetadata metadata;
private File file;
private int maxdim = 0;
// private boolean appendSuffix = false;
private boolean first = true;
private List<String> header;
private List<String> data;
public DataSerializerTXTSplit(DataQueue queue, File file){
this.queue = queue;
public DataSerializerTXTSplit(DataMessageMetadata metadata, File file){
this.metadata = metadata;
this.file = file;
// Determine maximum dimension
for(ComponentMetadata m: queue.getDataMessageMetadata().getComponents()){
for(ComponentMetadata m: metadata.getComponents()){
if(m.getDimension()>maxdim){
maxdim=m.getDimension();
}
@@ -64,62 +68,42 @@ public class DataSerializerTXTSplit implements DataSerializer{
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
@Subscribe
public void onMessage(Message message) {
try{
// WORKAROUND BEGIN
// File outfile;
// if(appendSuffix){
// // Append a count suffix to the file. If there is already a file with
// // this suffix increase the counter for the suffix
// int cnt = 0;
// String fname = this.file.getAbsolutePath(); // Determine file name
// String extension = fname.replaceAll("^.*\\.", ""); // Determine extension
// fname = fname.replaceAll("\\."+extension+"$", "");
//
// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
//
// while(outfile.exists()){
// cnt++;
// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension));
// }
// }
// else{
// outfile = this.file;
// }
// WORKAROUND END
List<String> header = new ArrayList<String>();
// Write header
StringBuffer b = new StringBuffer();
StringBuffer b1 = new StringBuffer();
b.append("#");
b1.append("#");
for(ComponentMetadata c: queue.getDataMessageMetadata().getComponents()){
b.append(c.getId());
b.append("\t");
if(first){
first=false;
header = new ArrayList<String>();
data = new ArrayList<String>();
// Write header
StringBuffer b = new StringBuffer();
StringBuffer b1 = new StringBuffer();
b.append("#");
b1.append("#");
for(ComponentMetadata c: metadata.getComponents()){
b.append(c.getId());
b.append("\t");
b1.append(c.getDimension());
b1.append("\t");
}
b.setCharAt(b.length()-1, '\n');
b1.setCharAt(b1.length()-1, '\n');
header.add(b.toString());
header.add(b1.toString());
b1.append(c.getDimension());
b1.append("\t");
}
b.setCharAt(b.length()-1, '\n');
b1.setCharAt(b1.length()-1, '\n');
header.add(b.toString());
header.add(b1.toString());
List<String> data = new ArrayList<String>();
// Write data
// Read Message
Message message = queue.getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
if(message instanceof DataMessage){
// Write message to file - each message will result in one line
@@ -168,19 +152,6 @@ public class DataSerializerTXTSplit implements DataSerializer{
}
}
// Read next message
message = queue.getQueue().take();
}
// // Open file
// BufferedWriter writer = new BufferedWriter(new FileWriter(outfile));
//
// // Close file
// writer.close();
} catch (InterruptedException e) {
// TODO Stop loop and exit logic instead of throwing an Exception
throw new RuntimeException("Data serializer was interrupted while writing data to file",e);
} catch (IOException e) {
throw new RuntimeException("Data serializer had a problem writing to the specified file",e);
}
@@ -24,6 +24,7 @@ import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent;
import java.io.File;
import java.io.PrintWriter;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -44,6 +45,9 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.xml.sax.SAXException;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.deserializer.DataDeserializer;
import ch.psi.fda.deserializer.DataDeserializerTXT;
import ch.psi.fda.gui.ScrollableFlowPanel;
@@ -112,11 +116,12 @@ public class VisualizationEngine {
throw new IllegalArgumentException("Data file ["+data.getAbsolutePath()+"] does not exist");
}
EventBus bus = new AsyncEventBus(Executors.newCachedThreadPool());
// Create deserializer
DataDeserializer deserializer = new DataDeserializerTXT(data);
DataDeserializer deserializer = new DataDeserializerTXT(bus, data);
// Create Visualizer
Visualizer visualizer = new Visualizer(deserializer.getQueue().getDataMessageMetadata(), configuration.getVisualization());
Visualizer visualizer = new Visualizer(deserializer.getMetadata(), configuration.getVisualization());
// visualizer.setTerminateAtEOS(true);
// Adapt default visualizer behavior to optimize performance for visualization
@@ -155,12 +160,10 @@ public class VisualizationEngine {
// Start deserializer and visualizer
Thread td = new Thread(deserializer);
td.start();
bus.register(visualizer);
visualizer.configure();
deserializer.read();
td.join();
logger.info("Deserializer finished");
// visualizer.stopVisualization();
}
@@ -20,7 +20,6 @@
package ch.psi.fda.core.manipulator;
import static org.junit.Assert.*;
import gov.aps.jca.CAException;
import java.util.ArrayList;
@@ -33,6 +32,9 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import ch.psi.fda.TestChannels;
import ch.psi.fda.core.manipulator.JythonManipulation;
import ch.psi.fda.core.manipulator.Manipulation;
@@ -58,12 +60,14 @@ public class ManipulatorTest {
// Get Logger
private static Logger logger = Logger.getLogger(ManipulatorTest.class.getName());
private EventBus bus;
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
bus = new EventBus();
}
/**
@@ -91,7 +95,7 @@ public class ManipulatorTest {
// id "myid" which is expected in the mapping
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
new Manipulator(inQueue, manipulations);
new Manipulator(bus, inQueue, manipulations);
}
@@ -114,7 +118,7 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
new Manipulator(inQueue, manipulations);
new Manipulator(bus, inQueue, manipulations);
// Expect IllegalArgument Exception as there is no mapping for the parameter c
}
@@ -139,7 +143,7 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
new Manipulator(inQueue, manipulations);
new Manipulator(bus, inQueue, manipulations);
// Expect IllegalArgument Exception as there is no mapping for the parameter c
}
@@ -167,7 +171,7 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
new Manipulator(inQueue, manipulations);
new Manipulator(bus, inQueue, manipulations);
// Expect IllegalArgument Exception as there is no mapping for the parameter c
}
@@ -194,10 +198,10 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
Manipulator manipulator = new Manipulator(inQueue, manipulations);
Manipulator manipulator = new Manipulator(bus, inQueue, manipulations);
// Check whether output queue message structur complies to expected one
DataMessageMetadata outMeta = manipulator.getOutQueue().getDataMessageMetadata();
DataMessageMetadata outMeta = manipulator.getMetadata();
// Test whether only the expected components are within the outgoing queue
if(outMeta.getComponents().size()!=2){
@@ -214,25 +218,30 @@ public class ManipulatorTest {
fail("Id of the second component does not match the expected id 'cid'");
}
manipulator.run();
Message message = manipulator.getOutQueue().getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
logger.info(message.toString());
if(message instanceof DataMessage){
DataMessage dm = (DataMessage) message;
dm.getData().get(0);
double res = ((Double)dm.getData().get(1)) - (Math.cos(10.0)+Math.sin(((Double)dm.getData().get(0))));
if( Math.abs(res) > 0.000000001){
fail("Calculation failed");
bus.register(new Object(){
@Subscribe
public void onMessage(Message message){
logger.info(message.toString());
if(message instanceof DataMessage){
DataMessage dm = (DataMessage) message;
dm.getData().get(0);
double res = ((Double)dm.getData().get(1)) - (Math.cos(10.0)+Math.sin(((Double)dm.getData().get(0))));
if( Math.abs(res) > 0.000000001){
fail("Calculation failed");
}
}
}
message = manipulator.getOutQueue().getQueue().take();
}
});
manipulator.run();
// Message message = manipulator.getOutQueue().getQueue().take();
// while(!(message instanceof EndOfStreamMessage)){
//
//
//
// message = manipulator.getOutQueue().getQueue().take();
// }
logger.info(""+(Math.cos(10.0)+Math.sin(10)));
}
@@ -260,10 +269,10 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
Manipulator manipulator = new Manipulator(inQueue, manipulations);
Manipulator manipulator = new Manipulator(bus, inQueue, manipulations);
// Check whether output queue message structur complies to expected one
DataMessageMetadata outMeta = manipulator.getOutQueue().getDataMessageMetadata();
DataMessageMetadata outMeta = manipulator.getMetadata();
// Test whether only the expected components are within the outgoing queue
if(outMeta.getComponents().size()!=2){
@@ -280,21 +289,18 @@ public class ManipulatorTest {
fail("Id of the second component does not match the expected id 'cid'");
}
manipulator.run();
Message message = manipulator.getOutQueue().getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
logger.info(message.toString());
if(message instanceof DataMessage){
DataMessage dm = (DataMessage) message;
dm.getData().get(0);
bus.register(new Object(){
@Subscribe
public void onMessage(Message message){
logger.info(message.toString());
if(message instanceof DataMessage){
DataMessage dm = (DataMessage) message;
dm.getData().get(0);
}
}
message = manipulator.getOutQueue().getQueue().take();
}
});
manipulator.run();
}
/**
@@ -339,44 +345,31 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
Manipulator manipulator = new Manipulator(inQueue, manipulations);
Manipulator manipulator = new Manipulator(bus, inQueue, manipulations);
Thread t = new Thread(manipulator);
final DataQueue outQueue = manipulator.getOutQueue();
Thread tp = new Thread(new Runnable() {
@Override
public void run() {
try{
int count=0;
Message message;
while((message = outQueue.getQueue().take()) != null){
if(!(message instanceof EndOfStreamMessage)){
logger.info(count+" - "+message.toString());
}
else{
break;
}
count++;
}
}
catch (Exception e) {
e.printStackTrace();
bus.register(new Object(){
int count=0;
@Subscribe
public void onMessage(Message message){
if(!(message instanceof EndOfStreamMessage)){
logger.info(count+" - "+message.toString());
count++;
}
}
});
tf.start();
t.start();
tp.start();
tf.join();
t.join();
tp.join();
}
/**
@@ -405,10 +398,10 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
Manipulator manipulator = new Manipulator(inQueue, manipulations);
Manipulator manipulator = new Manipulator(bus, inQueue, manipulations);
// Check whether output queue message structur complies to expected one
DataMessageMetadata outMeta = manipulator.getOutQueue().getDataMessageMetadata();
DataMessageMetadata outMeta = manipulator.getMetadata();
// Test whether only the expected components are within the outgoing queue
if(outMeta.getComponents().size()!=3){
@@ -429,26 +422,22 @@ public class ManipulatorTest {
fail("Id of the second component does not match the expected id 'cid'");
}
manipulator.run();
Message message = manipulator.getOutQueue().getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
logger.info(message.toString());
if(message instanceof DataMessage){
DataMessage dm = (DataMessage) message;
dm.getData().get(0);
double res = ((Double)dm.getData().get(2)) - (Math.cos(((Double)dm.getData().get(1)))+Math.sin(((Double)dm.getData().get(0))));
if( Math.abs(res) > 0.000000001){
fail("Calculation failed");
bus.register(new Object(){
@Subscribe
public void onMessage(Message message){
logger.info(message.toString());
if(message instanceof DataMessage){
DataMessage dm = (DataMessage) message;
dm.getData().get(0);
double res = ((Double)dm.getData().get(2)) - (Math.cos(((Double)dm.getData().get(1)))+Math.sin(((Double)dm.getData().get(0))));
if( Math.abs(res) > 0.000000001){
fail("Calculation failed");
}
}
}
message = manipulator.getOutQueue().getQueue().take();
}
});
manipulator.run();
logger.info(""+(Math.cos(0.2)+Math.sin(10)));
@@ -488,10 +477,10 @@ public class ManipulatorTest {
List<Manipulation> manipulations = new ArrayList<Manipulation>();
manipulations.add(manipulation);
Manipulator manipulator = new Manipulator(inQueue, manipulations);
Manipulator manipulator = new Manipulator(bus, inQueue, manipulations);
// Check whether output queue message structur complies to expected one
DataMessageMetadata outMeta = manipulator.getOutQueue().getDataMessageMetadata();
DataMessageMetadata outMeta = manipulator.getMetadata();
// Test whether only the expected components are within the outgoing queue
if(outMeta.getComponents().size()!=3){
@@ -515,25 +504,23 @@ public class ManipulatorTest {
// Change something different on the channel than the value that will be set in the manipulator script
cbean.setValue(setValue+1);
manipulator.run();
Message message = manipulator.getOutQueue().getQueue().take();
while(!(message instanceof EndOfStreamMessage)){
logger.info(message.toString());
if(message instanceof DataMessage){
DataMessage dm = (DataMessage) message;
dm.getData().get(0);
double res = ((Double)dm.getData().get(2)) - (Math.cos(((Double)dm.getData().get(1)))+Math.sin(((Double)dm.getData().get(0))));
if( Math.abs(res) > 0.000000001){
fail("Calculation failed");
bus.register(new Object(){
@Subscribe
public void onMessage(Message message){
logger.info(message.toString());
if(message instanceof DataMessage){
DataMessage dm = (DataMessage) message;
dm.getData().get(0);
double res = ((Double)dm.getData().get(2)) - (Math.cos(((Double)dm.getData().get(1)))+Math.sin(((Double)dm.getData().get(0))));
if( Math.abs(res) > 0.000000001){
fail("Calculation failed");
}
}
}
message = manipulator.getOutQueue().getQueue().take();
}
});
manipulator.run();
logger.info(""+(Math.cos(0.2)+Math.sin(10)));
@@ -22,18 +22,18 @@ package ch.psi.fda.deserializer;
import java.io.File;
import java.net.URI;
import java.net.URL;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.ControlMessage;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
/**
@@ -44,84 +44,59 @@ import ch.psi.fda.core.messages.Message;
*/
public class DataDeserializerMDATest {
// Get Logger
private static Logger logger = Logger.getLogger(DataDeserializerMDATest.class.getName());
private EventBus bus;
private DataDeserializerMDA deserializer;
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
bus = new EventBus();
URL url = this.getClass().getClassLoader().getResource("testdata/mda/mdadata7.mda");
deserializer = new DataDeserializerMDA(new File(new URI(url.toString())));
deserializer = new DataDeserializerMDA(bus, new File(new URI(url.toString())));
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {
}
/**
* Test method for {@link ch.psi.fda.deserializer.DataDeserializerTXT#run()}.
* @throws InterruptedException
*/
@Test
public void testRun() throws InterruptedException {
public void testRead() throws InterruptedException {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
BlockingQueue<Message> q = deserializer.getQueue().getQueue();
while(true){
Message m = q.take();
if(m instanceof DataMessage){
DataMessage x = (DataMessage) m;
logger.info( x.toString() );
}
else if(m instanceof ControlMessage){
if(m instanceof EndOfStreamMessage){
break;
}
logger.info("---- "+m.toString()+" ----");
}
}
StringBuilder b = new StringBuilder();
b.append("[");
StringBuilder b1 = new StringBuilder();
b1.append("[");
for(ComponentMetadata cm : deserializer.getQueue().getDataMessageMetadata().getComponents()){
b.append(" ");
b.append(cm.getId());
b1.append(" ");
b1.append(cm.getDimension());
}
b.append(" ]");
b1.append(" ]");
logger.info("Metadata "+b.toString());
logger.info("Metadata "+b1.toString());
} catch (InterruptedException e) {
logger.log(Level.SEVERE, "An Exception occured while reading data from the data queue", e);
// Visualize metadata
StringBuilder b = new StringBuilder();
b.append("[");
StringBuilder b1 = new StringBuilder();
b1.append("[");
for(ComponentMetadata cm : deserializer.getMetadata().getComponents()){
b.append(" ");
b.append(cm.getId());
b1.append(" ");
b1.append(cm.getDimension());
}
b.append(" ]");
b1.append(" ]");
logger.info("Metadata "+b.toString());
logger.info("Metadata "+b1.toString());
// Do "read" data
bus.register(new Object(){
@Subscribe
public void onMessage(Message m){
if(m instanceof DataMessage){
DataMessage x = (DataMessage) m;
logger.info( x.toString() );
}
else if(m instanceof ControlMessage){
logger.info("---- "+m.toString()+" ----");
}
}
});
deserializer.read();
Thread tt = new Thread(deserializer);
tt.start();
t.start();
tt.join();
t.join();
}
}
@@ -22,38 +22,36 @@ package ch.psi.fda.deserializer;
import java.io.File;
import java.net.URI;
import java.net.URL;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import ch.psi.fda.core.messages.ControlMessage;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
import ch.psi.fda.deserializer.DataDeserializer;
import ch.psi.fda.deserializer.DataDeserializerTXT;
/**
* @author ebner
*
*/
public class DataDeserializerTest {
// Get Logger
private static Logger logger = Logger.getLogger(DataDeserializerTest.class.getName());
private DataDeserializer deserializer;
private EventBus bus;
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
bus = new EventBus();
URL url = this.getClass().getClassLoader().getResource("testdata/text/textdata2.txt");
deserializer = new DataDeserializerTXT(new File(new URI(url.toString())));
deserializer = new DataDeserializerTXT(bus, new File(new URI(url.toString())));
}
/**
@@ -70,38 +68,18 @@ public class DataDeserializerTest {
@Test
public void testRun() throws InterruptedException {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
while(true){
Message m = deserializer.getQueue().getQueue().take();
if(m instanceof DataMessage){
DataMessage x = (DataMessage) m;
logger.info( x.toString() );
}
else if(m instanceof ControlMessage){
if(m instanceof EndOfStreamMessage){
break;
}
logger.info("---- "+m.toString()+" ----");
}
}
} catch (InterruptedException e) {
logger.log(Level.SEVERE, "An Exception occured while reading data from the data queue", e);
bus.register(new Object(){
@Subscribe
public void onMessage(Message m){
if(m instanceof DataMessage){
DataMessage x = (DataMessage) m;
logger.info( x.toString() );
}
else if(m instanceof ControlMessage){
logger.info("---- "+m.toString()+" ----");
}
}
});
Thread tt = new Thread(deserializer);
tt.start();
t.start();
tt.join();
t.join();
deserializer.read();
}
}
@@ -20,20 +20,18 @@
package ch.psi.fda.serializer;
import java.io.File;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.DataQueue;
import ch.psi.fda.core.messages.StreamDelimiterMessage;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
import ch.psi.fda.serializer.DataSerializer;
import ch.psi.fda.serializer.DataSerializerMAT;
import ch.psi.fda.serializer.DataSerializerMAT2D;
@@ -41,16 +39,13 @@ import ch.psi.fda.serializer.DataSerializerTXT;
import ch.psi.fda.serializer.DataSerializerTXT2D;
import ch.psi.fda.serializer.DataSerializerTXTSplit;
/**
* @author ebner
*
*/
public class DataSerializerTest {
private static final String tmpDirectory = "target/tmp";
private DataQueue queue;
private DataMessageMetadata metadata;
private EventBus bus;
/**
* @throws java.lang.Exception
@@ -58,11 +53,8 @@ public class DataSerializerTest {
@Before
public void setUp() throws Exception {
new File(tmpDirectory).mkdirs();
BlockingQueue<Message> q3 = new LinkedBlockingQueue<Message>();
DataMessageMetadata m3 = new DataMessageMetadata();
this.queue = new DataQueue(q3, m3);
metadata = new DataMessageMetadata();
bus = new EventBus();
}
/**
@@ -71,23 +63,23 @@ public class DataSerializerTest {
*/
private void generate1DData() throws InterruptedException{
queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id0", 0));
queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id1", 0));
queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id2", 0));
metadata.getComponents().add(new ComponentMetadata("id0", 0));
metadata.getComponents().add(new ComponentMetadata("id1", 0));
metadata.getComponents().add(new ComponentMetadata("id2", 0));
// Dimension
DataMessage m = new DataMessage();
m.getData().add(0.000000000000000001);
m.getData().add(0.1);
m.getData().add(1d); // have this value as double
queue.getQueue().put(m);
bus.post(m);
m = new DataMessage();
m.getData().add(0.02);
m.getData().add(0.2);
m.getData().add(2d); // have this value as double
queue.getQueue().put(m);
queue.getQueue().put(new EndOfStreamMessage());
bus.post(m);
bus.post(new EndOfStreamMessage());
}
/**
@@ -96,9 +88,9 @@ public class DataSerializerTest {
*/
private void generate2DData() throws InterruptedException{
queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id0", 1));
queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id1", 0));
queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id2", 0));
metadata.getComponents().add(new ComponentMetadata("id0", 1));
metadata.getComponents().add(new ComponentMetadata("id1", 0));
metadata.getComponents().add(new ComponentMetadata("id2", 0));
for(double i=0;i<5;i++){
for(double t=0.1; t<10; t=t+0.1){
@@ -107,14 +99,14 @@ public class DataSerializerTest {
m.getData().add(i);
m.getData().add(t);
m.getData().add(Math.log(t)); // have this value as double
queue.getQueue().put(m);
bus.post(m);
}
queue.getQueue().put(new StreamDelimiterMessage(0));
bus.post(new StreamDelimiterMessage(0));
}
queue.getQueue().put(new StreamDelimiterMessage(1));
bus.post(new StreamDelimiterMessage(1));
queue.getQueue().put(new EndOfStreamMessage());
bus.post(new EndOfStreamMessage());
}
/**
@@ -123,10 +115,10 @@ public class DataSerializerTest {
*/
private void generate3DData() throws InterruptedException{
queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id0", 2));
queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id1", 1));
queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id2", 0));
queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id3", 0));
metadata.getComponents().add(new ComponentMetadata("id0", 2));
metadata.getComponents().add(new ComponentMetadata("id1", 1));
metadata.getComponents().add(new ComponentMetadata("id2", 0));
metadata.getComponents().add(new ComponentMetadata("id3", 0));
for(double z=30;z<36;z++){
for(double i=0;i<6;i++){
@@ -137,16 +129,15 @@ public class DataSerializerTest {
m.getData().add(i);
m.getData().add(t);
m.getData().add(Math.log(t)); // have this value as double
queue.getQueue().put(m);
bus.post(m);
}
queue.getQueue().put(new StreamDelimiterMessage(0));
bus.post(new StreamDelimiterMessage(0));
}
queue.getQueue().put(new StreamDelimiterMessage(1));
bus.post(new StreamDelimiterMessage(1));
}
queue.getQueue().put(new StreamDelimiterMessage(2));
bus.post(new StreamDelimiterMessage(2));
queue.getQueue().put(new EndOfStreamMessage());
bus.post(new EndOfStreamMessage());
}
/**
@@ -162,9 +153,9 @@ public class DataSerializerTest {
*/
@Test
public void testRunTXT() throws InterruptedException {
DataSerializer serializer = new DataSerializerTXT(metadata, new File(tmpDirectory+"/test.txt"), true);
bus.register(serializer);
generate1DData();
DataSerializer serializer = new DataSerializerTXT(queue, new File(tmpDirectory+"/test.txt"), true);
serializer.run();
}
/**
@@ -173,9 +164,10 @@ public class DataSerializerTest {
*/
@Test
public void testRunMAT() throws InterruptedException {
DataSerializer serializer = new DataSerializerMAT(metadata, new File(tmpDirectory+"/test.mat"));
bus.register(serializer);
generate1DData();
DataSerializer serializer = new DataSerializerMAT(queue, new File(tmpDirectory+"/test.mat"));
serializer.run();
}
/**
@@ -184,9 +176,9 @@ public class DataSerializerTest {
*/
@Test
public void testRunMAT2D() throws InterruptedException {
DataSerializer serializer = new DataSerializerMAT2D(metadata, new File(tmpDirectory+"/test-2d.mat"));
bus.register(serializer);
generate2DData();
DataSerializer serializer = new DataSerializerMAT2D(queue, new File(tmpDirectory+"/test-2d.mat"));
serializer.run();
}
/**
@@ -195,9 +187,9 @@ public class DataSerializerTest {
*/
@Test
public void testRunTXT2D() throws InterruptedException {
DataSerializer serializer = new DataSerializerTXT2D(metadata, new File(tmpDirectory+"/test-2d.txt"));
bus.register(serializer);
generate2DData();
DataSerializer serializer = new DataSerializerTXT2D(queue, new File(tmpDirectory+"/test-2d.txt"));
serializer.run();
}
/**
@@ -206,9 +198,9 @@ public class DataSerializerTest {
*/
@Test
public void testRunSplitTXT() throws InterruptedException {
DataSerializer serializer = new DataSerializerTXTSplit(metadata, new File(tmpDirectory+"/test-2d-split.txt"));
bus.register(serializer);
generate2DData();
DataSerializer serializer = new DataSerializerTXTSplit(queue, new File(tmpDirectory+"/test-2d-split.txt"));
serializer.run();
}
/**
@@ -217,9 +209,9 @@ public class DataSerializerTest {
*/
@Test
public void testRun2D() throws InterruptedException {
DataSerializer serializer = new DataSerializerMDA(metadata, new File(tmpDirectory+"/test-2d.mda"));
bus.register(serializer);
generate2DData();
DataSerializer serializer = new DataSerializerMDA(queue, new File(tmpDirectory+"/test-2d.mda"));
serializer.run();
}
/**
@@ -228,8 +220,8 @@ public class DataSerializerTest {
*/
@Test
public void testRun3D() throws InterruptedException {
DataSerializer serializer = new DataSerializerMDA(metadata, new File(tmpDirectory+"/test-3d.mda"));
bus.register(serializer);
generate3DData();
DataSerializer serializer = new DataSerializerMDA(queue, new File(tmpDirectory+"/test-3d.mda"));
serializer.run();
}
}