From 2a6018c4f597faf068628327d8dabdd417f3b12e Mon Sep 17 00:00:00 2001 From: Simon Ebner Date: Thu, 3 Oct 2013 13:44:08 +0200 Subject: [PATCH] Fixed all compile problems and migrated all serializers and deserializers to Guava EventBus. --- .../java/ch/psi/fda/co/ConversionEngine.java | 37 ++- .../fda/deserializer/DataDeserializer.java | 12 +- .../fda/deserializer/DataDeserializerMDA.java | 53 ++--- .../fda/deserializer/DataDeserializerTXT.java | 40 ++-- .../ch/psi/fda/serializer/DataSerializer.java | 6 +- .../psi/fda/serializer/DataSerializerMAT.java | 182 +++++++------- .../fda/serializer/DataSerializerMAT2D.java | 184 +++++++-------- .../serializer/DataSerializerMAT2DZigZag.java | 149 ++++++------ .../psi/fda/serializer/DataSerializerMDA.java | 222 ++++++++---------- .../psi/fda/serializer/DataSerializerTXT.java | 190 +-------------- .../fda/serializer/DataSerializerTXT2D.java | 168 ++++++------- .../serializer/DataSerializerTXTSplit.java | 109 ++++----- .../ch/psi/fda/vis/VisualizationEngine.java | 15 +- .../fda/core/manipulator/ManipulatorTest.java | 195 +++++++-------- .../deserializer/DataDeserializerMDATest.java | 99 +++----- .../deserializer/DataDeserializerTest.java | 54 ++--- .../fda/serializer/DataSerializerTest.java | 96 ++++---- 17 files changed, 756 insertions(+), 1055 deletions(-) diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/co/ConversionEngine.java b/ch.psi.fda/src/main/java/ch/psi/fda/co/ConversionEngine.java index 7918619..bff5fbd 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/co/ConversionEngine.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/co/ConversionEngine.java @@ -33,6 +33,8 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import com.google.common.eventbus.EventBus; + import ch.psi.fda.deserializer.DataDeserializer; import ch.psi.fda.deserializer.DataDeserializerMDA; import ch.psi.fda.deserializer.DataDeserializerTXT; @@ -82,14 +84,17 @@ public class ConversionEngine { else if(output.exists()){ throw new IllegalArgumentException("Output file ["+output.getAbsolutePath()+"] already exists"); } + + + EventBus bus = new EventBus(); // Create deserializer DataDeserializer deserializer; if(reader.equals(Reader.TXT)){ - deserializer = new DataDeserializerTXT(input); + deserializer = new DataDeserializerTXT(bus, input); } else if(reader.equals(Reader.MDA)){ - deserializer = new DataDeserializerMDA(input); + deserializer = new DataDeserializerMDA(bus, input); } else{ throw new IllegalArgumentException("Reader of type "+reader+" not supported."); @@ -97,39 +102,33 @@ public class ConversionEngine { DataSerializer serializer; if(writer.equals(Writer.MAT)){ - serializer = new DataSerializerMAT(deserializer.getQueue(), output); + serializer = new DataSerializerMAT(deserializer.getMetadata(), output); } else if(writer.equals(Writer.MAT_2D)){ - serializer = new DataSerializerMAT2D(deserializer.getQueue(), output); + serializer = new DataSerializerMAT2D(deserializer.getMetadata(), output); } else if(writer.equals(Writer.TXT)){ - serializer = new DataSerializerTXT(deserializer.getQueue(), output, false); + serializer = new DataSerializerTXT(deserializer.getMetadata(), output, false); } else if(writer.equals(Writer.TXT_2D)){ - serializer = new DataSerializerTXT2D(deserializer.getQueue(), output); + serializer = new DataSerializerTXT2D(deserializer.getMetadata(), output); } else if(writer.equals(Writer.TXT_SPLIT)){ - serializer = new DataSerializerTXTSplit(deserializer.getQueue(), output); + serializer = new DataSerializerTXTSplit(deserializer.getMetadata(), output); } else if(writer.equals(Writer.MDA)){ - serializer = new DataSerializerMDA(deserializer.getQueue(), output); + serializer = new DataSerializerMDA(deserializer.getMetadata(), output); } else if(writer.equals(Writer.MAT_2D_Z)){ - serializer = new DataSerializerMAT2DZigZag(deserializer.getQueue(), output); + serializer = new DataSerializerMAT2DZigZag(deserializer.getMetadata(), output); } else{ throw new IllegalArgumentException("Writer of type "+writer+" not supported."); } - - // Start deserializer and serializer - Thread td = new Thread(deserializer); - Thread ts = new Thread(serializer); - - td.start(); - ts.start(); - - td.join(); - ts.join(); + + // Start conversion + bus.register(serializer); + deserializer.read(); } /** diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/deserializer/DataDeserializer.java b/ch.psi.fda/src/main/java/ch/psi/fda/deserializer/DataDeserializer.java index a7e56c5..987585d 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/deserializer/DataDeserializer.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/deserializer/DataDeserializer.java @@ -19,18 +19,20 @@ package ch.psi.fda.deserializer; -import ch.psi.fda.core.messages.DataQueue; +import ch.psi.fda.core.messages.DataMessageMetadata; /** * Data deserializer * @author ebner * */ -public interface DataDeserializer extends Runnable { +public interface DataDeserializer { /** - * Get data queue of deserializer - * @return data queue of deserializer + * Get message metadata + * @return metadata/information of the message format */ - public DataQueue getQueue(); + public DataMessageMetadata getMetadata(); + + public void read(); } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/deserializer/DataDeserializerMDA.java b/ch.psi.fda/src/main/java/ch/psi/fda/deserializer/DataDeserializerMDA.java index 0e9deec..191ba51 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/deserializer/DataDeserializerMDA.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/deserializer/DataDeserializerMDA.java @@ -9,45 +9,45 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Logger; +import com.google.common.eventbus.EventBus; + import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; import ch.psi.fda.core.messages.DataMessageMetadata; -import ch.psi.fda.core.messages.DataQueue; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; import ch.psi.fda.core.messages.StreamDelimiterMessage; +/** + * TODO Need to be optimized as currently the while file is read into memory when creating this object. + * @author ebner + * + */ public class DataDeserializerMDA implements DataDeserializer { private static Logger logger = Logger.getLogger(DataDeserializerMDA.class.getName()); - private DataQueue queue; + private EventBus bus; + private DataMessageMetadata metadata; + private RecursiveReturnContainer c; - public DataDeserializerMDA(File file){ + public DataDeserializerMDA(EventBus b, File file){ + this.bus = b; - try { - RecursiveReturnContainer c = read(new FileInputStream(file)); - - this.queue = new DataQueue(new LinkedBlockingQueue(), c.getMetadata()); - - // Add data to queue - for(Message m: c.getMessage()){ - queue.getQueue().put(m); - } - queue.getQueue().put(new EndOfStreamMessage()); + try{ + c = read(new FileInputStream(file)); } catch (FileNotFoundException e) { throw new RuntimeException(e); } catch (IOException e) { throw new RuntimeException(e); - } catch (InterruptedException e) { - throw new RuntimeException(e); } + + this.metadata = c.getMetadata(); } - public RecursiveReturnContainer read(InputStream in) throws IOException { + private RecursiveReturnContainer read(InputStream in) throws IOException { logger.fine("Read MDA input stream"); XDRInputStream x = new XDRInputStream(in); @@ -443,21 +443,18 @@ public class DataDeserializerMDA implements DataDeserializer { } - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ @Override - public void run() { - // TODO Auto-generated method stub - + public void read() { + // Add data to queue + for(Message m: c.getMessage()){ + bus.post(m); + } + bus.post(new EndOfStreamMessage()); } - /* (non-Javadoc) - * @see ch.psi.fda.deserializer.DataDeserializer#getQueue() - */ @Override - public DataQueue getQueue() { - return queue; + public DataMessageMetadata getMetadata() { + return metadata; } } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/deserializer/DataDeserializerTXT.java b/ch.psi.fda/src/main/java/ch/psi/fda/deserializer/DataDeserializerTXT.java index 04195ba..e37757a 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/deserializer/DataDeserializerTXT.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/deserializer/DataDeserializerTXT.java @@ -25,16 +25,15 @@ import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Logger; +import com.google.common.eventbus.EventBus; + import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; import ch.psi.fda.core.messages.DataMessageMetadata; -import ch.psi.fda.core.messages.DataQueue; import ch.psi.fda.core.messages.StreamDelimiterMessage; import ch.psi.fda.core.messages.EndOfStreamMessage; -import ch.psi.fda.core.messages.Message; /** * Deserialize file data and put it into the DataQueue @@ -43,10 +42,10 @@ import ch.psi.fda.core.messages.Message; */ public class DataDeserializerTXT implements DataDeserializer { - // Get Logger private static Logger logger = Logger.getLogger(DataDeserializerTXT.class.getName()); - - private DataQueue queue; + + private EventBus bus; + private DataMessageMetadata metadata; private File file; private List dindex; @@ -56,12 +55,12 @@ public class DataDeserializerTXT implements DataDeserializer { * Default Constructor * @param file */ - public DataDeserializerTXT(File file){ + public DataDeserializerTXT(EventBus b, File file){ + this.bus = b; this.file = file; this.dindex = new ArrayList(); this.iindex = new ArrayList(); - DataMessageMetadata metadata; try{ // Read metadata // Open file @@ -103,23 +102,15 @@ public class DataDeserializerTXT implements DataDeserializer { catch(Exception e){ throw new RuntimeException("Unable to read file metadata and initialize data queue",e); } - - this.queue = new DataQueue(new LinkedBlockingQueue(10000000), metadata); } - /* (non-Javadoc) - * @see ch.psi.fda.deserializer.DataDeserializer#getQueue() - */ @Override - public DataQueue getQueue(){ - return(queue); + public DataMessageMetadata getMetadata(){ + return(metadata); } - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ @Override - public void run() { + public void read() { try{ List checklist = new ArrayList(dindex.size()); @@ -195,14 +186,14 @@ public class DataDeserializerTXT implements DataDeserializer { // } if(checklist.get(i)!=null &&!checklist.get(i).equals(d)){ // If value changes issue a dimension delimiter message - queue.getQueue().put(new StreamDelimiterMessage(dindex.get(t)-1)); + bus.post(new StreamDelimiterMessage(dindex.get(t)-1)); } checklist.set(i, d); } } // Put message to queue - queue.getQueue().put(message); + bus.post(message); // TODO Need to detect dimension boundaries @@ -210,20 +201,17 @@ public class DataDeserializerTXT implements DataDeserializer { // Add delimiter for all the dimensions for(int i=dindex.size()-1;i>=0;i--){ - queue.getQueue().put(new StreamDelimiterMessage(dindex.get(i))); + bus.post(new StreamDelimiterMessage(dindex.get(i))); } // queue.getQueue().put(new DimensionDelimiterMessage(dindex.get(0)-1)); // queue.getQueue().put(new DimensionDelimiterMessage(dindex.get(0))); // Place end of stream message - queue.getQueue().put(new EndOfStreamMessage()); + bus.post(new EndOfStreamMessage()); // Close file reader.close(); - - } catch (InterruptedException e) { - throw new RuntimeException("Data deserializer was interrupted while reading the datafile",e); } catch (IOException e) { throw new RuntimeException("Data deserializer had a problem reading the specified datafile",e); } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializer.java b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializer.java index cfd01d1..e34bc3e 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializer.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializer.java @@ -20,9 +20,7 @@ package ch.psi.fda.serializer; /** - * Data Serializer - * @author ebner - * + * Data Serializer marker interface */ -public interface DataSerializer extends Runnable { +public interface DataSerializer { } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMAT.java b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMAT.java index 262a1b0..bd0f1c5 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMAT.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMAT.java @@ -24,13 +24,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import com.google.common.eventbus.Subscribe; import com.jmatio.io.MatFileWriter; import com.jmatio.types.MLArray; import com.jmatio.types.MLDouble; import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; -import ch.psi.fda.core.messages.DataQueue; +import ch.psi.fda.core.messages.DataMessageMetadata; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; @@ -41,120 +42,109 @@ import ch.psi.fda.core.messages.Message; */ public class DataSerializerMAT implements DataSerializer{ - private DataQueue queue; + private DataMessageMetadata metadata; private File file; private boolean appendSuffix = false; + private boolean first = true; /** * Construtor * @param queue Data queue holding the data to serialize * @param file Name of the Matlab file to serialize the data to */ - public DataSerializerMAT(DataQueue queue, File file){ - this.queue = queue; + public DataSerializerMAT(DataMessageMetadata metadata, File file){ + this.metadata = metadata; this.file = file; } - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - try{ - - // WORKAROUND BEGIN - File outfile; - if(appendSuffix){ - // Append a count suffix to the file. If there is already a file with - // this suffix increase the counter for the suffix - int cnt = 0; - String fname = this.file.getAbsolutePath(); // Determine file name - String extension = fname.replaceAll("^.*\\.", ""); // Determine extension - fname = fname.replaceAll("\\."+extension+"$", ""); - - outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); - - while(outfile.exists()){ - cnt++; - outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); - } - } - else{ - outfile = this.file; - } - // WORKAROUND END + File outfile; + List> dlist; + List> clist; + boolean firstF; + + @Subscribe + public void onMessage(Message message) { + try { - - // Transposed data list - List> dlist = new ArrayList>(); - List> clist = new ArrayList>(); - - boolean firstF = true; - - // Write data - // Read Message - Message message = queue.getQueue().take(); - while(!(message instanceof EndOfStreamMessage)){ - if(message instanceof DataMessage){ - DataMessage m = (DataMessage) message; - - // Initialize list - if(firstF){ - for(Object o: m.getData()){ - dlist.add(new ArrayList()); - clist.add(o.getClass()); - } - firstF=false; + if (first) { + first = false; + // WORKAROUND BEGIN + if (appendSuffix) { + // Append a count suffix to the file. If there is already a + // file with + // this suffix increase the counter for the suffix + int cnt = 0; + String fname = this.file.getAbsolutePath(); // Determine + // file name + String extension = fname.replaceAll("^.*\\.", ""); // Determine + // extension + fname = fname.replaceAll("\\." + extension + "$", ""); + + outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); + + while (outfile.exists()) { + cnt++; + outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); } - - // Put data into data list - for(int i=0;i< m.getData().size();i++){ - Object object = m.getData().get(i); - dlist.get(i).add(object); + } else { + outfile = this.file; + } + // WORKAROUND END + + // Transposed data list + dlist = new ArrayList>(); + clist = new ArrayList>(); + + firstF = true; + } + + if (message instanceof DataMessage) { + DataMessage m = (DataMessage) message; + + // Initialize list + if (firstF) { + for (Object o : m.getData()) { + dlist.add(new ArrayList()); + clist.add(o.getClass()); } + firstF = false; } - - // Read next message - message = queue.getQueue().take(); + + // Put data into data list + for (int i = 0; i < m.getData().size(); i++) { + Object object = m.getData().get(i); + dlist.get(i).add(object); + } + } else if (message instanceof EndOfStreamMessage) { + + // Create Matlab vectors + ArrayList matlablist = new ArrayList(); + for (int t = 0; t < dlist.size(); t++) { + // Get component metadata + ComponentMetadata c = metadata.getComponents().get(t); + c.getId(); + + List list = dlist.get(t); + + if (clist.get(t).isArray()) { + // Array Handling + } else if (clist.get(t).equals(Double.class)) { + // Data is of type Double + MLDouble darray = new MLDouble(escapeString(c.getId()), (Double[]) list.toArray(new Double[list.size()]), 1); + matlablist.add(darray); + } + + } + + // Write Matlab file + MatFileWriter writerr = new MatFileWriter(); + writerr.write(outfile, matlablist); } - - // Create Matlab vectors - ArrayList matlablist = new ArrayList(); - for(int t=0; t list = dlist.get(t); - - - - if(clist.get(t).isArray()){ - // Array Handling - } - else if(clist.get(t).equals(Double.class)){ - // Data is of type Double - MLDouble darray = new MLDouble(escapeString(c.getId()),(Double[])list.toArray(new Double[list.size()]),1); - matlablist.add(darray); - } - - - } - - // Write Matlab file - MatFileWriter writerr = new MatFileWriter(); - writerr.write(outfile, matlablist); - - - - } catch (InterruptedException e) { - // TODO Stop loop and exit logic instead of throwing an Exception - throw new RuntimeException("Data serializer was interrupted while writing data to file",e); + } catch (IOException e) { - throw new RuntimeException("Data serializer had a problem writing to the specified file",e); + throw new RuntimeException("Data serializer had a problem writing to the specified file", e); } - } /** diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMAT2D.java b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMAT2D.java index 70f15c1..e30e1cd 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMAT2D.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMAT2D.java @@ -25,13 +25,14 @@ import java.util.ArrayList; import java.util.List; import java.util.logging.Logger; +import com.google.common.eventbus.Subscribe; import com.jmatio.io.MatFileWriter; import com.jmatio.types.MLArray; import com.jmatio.types.MLDouble; import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; -import ch.psi.fda.core.messages.DataQueue; +import ch.psi.fda.core.messages.DataMessageMetadata; import ch.psi.fda.core.messages.StreamDelimiterMessage; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; @@ -43,25 +44,34 @@ import ch.psi.fda.core.messages.Message; */ public class DataSerializerMAT2D implements DataSerializer{ - // Get Logger private static final Logger logger = Logger.getLogger(DataSerializerMAT2D.class.getName()); - private DataQueue queue; + private DataMessageMetadata metadata; private File file; private boolean appendSuffix = false; + private boolean first = true; + + private List>> dlist; + private List> clist; + private int dsize; + private int dcount; + private Integer mindsize; + private boolean firstF; + private File outfile; + /** * Construtor * @param queue Data queue holding the data to serialize * @param file Name of the Matlab file to serialize the data to */ - public DataSerializerMAT2D(DataQueue queue, File file){ - this.queue = queue; + public DataSerializerMAT2D(DataMessageMetadata metadata, File file){ + this.metadata = metadata; this.file = file; // Check if input queue does only hold 2D data int maxdim=0; - for(ComponentMetadata m: queue.getDataMessageMetadata().getComponents()){ + for(ComponentMetadata m: metadata.getComponents()){ if(m.getDimension()>maxdim){ maxdim=m.getDimension(); } @@ -76,49 +86,42 @@ public class DataSerializerMAT2D implements DataSerializer{ } } - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ - @Override - public void run() { + @Subscribe + public void onMessage(Message message) { try{ - - // WORKAROUND BEGIN - File outfile; - if(appendSuffix){ - // Append a count suffix to the file. If there is already a file with - // this suffix increase the counter for the suffix - int cnt = 0; - String fname = this.file.getAbsolutePath(); // Determine file name - String extension = fname.replaceAll("^.*\\.", ""); // Determine extension - fname = fname.replaceAll("\\."+extension+"$", ""); - - outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); - - while(outfile.exists()){ - cnt++; + if(first){ + first=false; + // WORKAROUND BEGIN + if(appendSuffix){ + // Append a count suffix to the file. If there is already a file with + // this suffix increase the counter for the suffix + int cnt = 0; + String fname = this.file.getAbsolutePath(); // Determine file name + String extension = fname.replaceAll("^.*\\.", ""); // Determine extension + fname = fname.replaceAll("\\."+extension+"$", ""); + outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); + + while(outfile.exists()){ + cnt++; + outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); + } } + else{ + outfile = this.file; + } + // WORKAROUND END + + + // Transposed data list + dlist = new ArrayList>>(); + clist = new ArrayList>(); + dsize = 0; // Size of the dimension + dcount = 0; + mindsize = null; + firstF = true; } - else{ - outfile = this.file; - } - // WORKAROUND END - - // Transposed data list - List>> dlist = new ArrayList>>(); - List> clist = new ArrayList>(); - int dsize = 0; // Size of the dimension - int dcount = 0; - Integer mindsize = null; - - boolean firstF = true; - - // Write data - // Read Message - Message message = queue.getQueue().take(); - while(!(message instanceof EndOfStreamMessage)){ if(message instanceof DataMessage){ DataMessage m = (DataMessage) message; @@ -170,62 +173,55 @@ public class DataSerializerMAT2D implements DataSerializer{ } } - // Read next message - message = queue.getQueue().take(); - } - - logger.info("dsize: "+dsize + " mindsize:"+mindsize); - - // Create Matlab vectors - ArrayList matlablist = new ArrayList(); - logger.info("dlist size: "+dlist.size()); - for(int t=0; t list = new ArrayList(); - List> ol = dlist.get(t); - - // Remove last array list as it is empty - ol.remove(ol.size()-1); - - for(List li: ol){ - list.addAll(li); - // Pad list if there are missing data points for some lines - for(int i=li.size();i matlablist = new ArrayList(); + logger.info("dlist size: "+dlist.size()); + for(int t=0; t list = new ArrayList(); + List> ol = dlist.get(t); + + // Remove last array list as it is empty + ol.remove(ol.size()-1); + + for(List li: ol){ + list.addAll(li); + // Pad list if there are missing data points for some lines + for(int i=li.size();i list = dlist.get(t); + logger.info("List: "+list.size()); + + + if(clist.get(t).isArray()){ + // Array Handling + } + else if(clist.get(t).equals(Double.class)){ + // Data is of type Double + MLDouble darray = new MLDouble(escapeString(c.getId()),(Double[])list.toArray(new Double[list.size()]), dsize); + matlablist.add(darray); + } + + } - -// List list = dlist.get(t); - logger.info("List: "+list.size()); - - - if(clist.get(t).isArray()){ - // Array Handling - } - else if(clist.get(t).equals(Double.class)){ - // Data is of type Double - MLDouble darray = new MLDouble(escapeString(c.getId()),(Double[])list.toArray(new Double[list.size()]), dsize); - matlablist.add(darray); - } - - + // Write Matlab file + MatFileWriter writerr = new MatFileWriter(); + writerr.write(outfile, matlablist); } - // Write Matlab file - MatFileWriter writerr = new MatFileWriter(); - writerr.write(outfile, matlablist); - - - - } catch (InterruptedException e) { - // TODO Stop loop and exit logic instead of throwing an Exception - throw new RuntimeException("Data serializer was interrupted while writing data to file",e); } catch (IOException e) { throw new RuntimeException("Data serializer had a problem writing to the specified file",e); } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMAT2DZigZag.java b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMAT2DZigZag.java index 58147fd..5725a7e 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMAT2DZigZag.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMAT2DZigZag.java @@ -26,13 +26,14 @@ import java.util.Collections; import java.util.List; import java.util.logging.Logger; +import com.google.common.eventbus.Subscribe; import com.jmatio.io.MatFileWriter; import com.jmatio.types.MLArray; import com.jmatio.types.MLDouble; import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; -import ch.psi.fda.core.messages.DataQueue; +import ch.psi.fda.core.messages.DataMessageMetadata; import ch.psi.fda.core.messages.StreamDelimiterMessage; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; @@ -47,22 +48,35 @@ public class DataSerializerMAT2DZigZag implements DataSerializer{ // Get Logger private static final Logger logger = Logger.getLogger(DataSerializerMAT2DZigZag.class.getName()); - private DataQueue queue; + private DataMessageMetadata metadata; private File file; private boolean appendSuffix = false; + private boolean first = true; + + private File outfile; + private List> dlist; + private List> dlistTmp; + private List> clist; + private int dsize; // Size of the dimension + private int dcount; + + private int delimiterCount; + + private boolean firstF; + private boolean firstC; /** * Construtor * @param queue Data queue holding the data to serialize * @param file Name of the Matlab file to serialize the data to */ - public DataSerializerMAT2DZigZag(DataQueue queue, File file){ - this.queue = queue; + public DataSerializerMAT2DZigZag(DataMessageMetadata metadata, File file){ + this.metadata = metadata; this.file = file; // Check if input queue does only hold 2D data int maxdim=0; - for(ComponentMetadata m: queue.getDataMessageMetadata().getComponents()){ + for(ComponentMetadata m: metadata.getComponents()){ if(m.getDimension()>maxdim){ maxdim=m.getDimension(); } @@ -77,52 +91,49 @@ public class DataSerializerMAT2DZigZag implements DataSerializer{ } } - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ - @Override - public void run() { + @Subscribe + public void onMessage(Message message) { try{ - // WORKAROUND BEGIN - File outfile; - if(appendSuffix){ - // Append a count suffix to the file. If there is already a file with - // this suffix increase the counter for the suffix - int cnt = 0; - String fname = this.file.getAbsolutePath(); // Determine file name - String extension = fname.replaceAll("^.*\\.", ""); // Determine extension - fname = fname.replaceAll("\\."+extension+"$", ""); + if(first){ + first=false; - outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); - - while(outfile.exists()){ - cnt++; + // WORKAROUND BEGIN + if(appendSuffix){ + // Append a count suffix to the file. If there is already a file with + // this suffix increase the counter for the suffix + int cnt = 0; + String fname = this.file.getAbsolutePath(); // Determine file name + String extension = fname.replaceAll("^.*\\.", ""); // Determine extension + fname = fname.replaceAll("\\."+extension+"$", ""); + outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); + + while(outfile.exists()){ + cnt++; + outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); + } } + else{ + outfile = this.file; + } + // WORKAROUND END + + + // Transposed data list + dlist = new ArrayList>(); + dlistTmp = new ArrayList>(); + clist = new ArrayList>(); + dsize = 0; // Size of the dimension + dcount = 0; + + delimiterCount = 0; + + firstF = true; + firstC = true; } - else{ - outfile = this.file; - } - // WORKAROUND END - - // Transposed data list - List> dlist = new ArrayList>(); - List> dlistTmp = new ArrayList>(); - List> clist = new ArrayList>(); - int dsize = 0; // Size of the dimension - int dcount = 0; - int delimiterCount = 0; - - boolean firstF = true; - boolean firstC = true; - - // Write data - // Read Message - Message message = queue.getQueue().take(); - while(!(message instanceof EndOfStreamMessage)){ if(message instanceof DataMessage){ DataMessage m = (DataMessage) message; @@ -172,41 +183,35 @@ public class DataSerializerMAT2DZigZag implements DataSerializer{ delimiterCount++; } } - - // Read next message - message = queue.getQueue().take(); - } - // Create Matlab vectors - ArrayList matlablist = new ArrayList(); - logger.info("dlist size: "+dlist.size()); - for(int t=0; t list = dlist.get(t); - - if(clist.get(t).isArray()){ - // Array Handling - } - else if(clist.get(t).equals(Double.class)){ - // Data is of type Double - MLDouble darray = new MLDouble(escapeString(c.getId()),(Double[])list.toArray(new Double[list.size()]), dsize); - matlablist.add(darray); + else if(message instanceof EndOfStreamMessage){ + // Create Matlab vectors + ArrayList matlablist = new ArrayList(); + logger.info("dlist size: "+dlist.size()); + for(int t=0; t list = dlist.get(t); + + if(clist.get(t).isArray()){ + // Array Handling + } + else if(clist.get(t).equals(Double.class)){ + // Data is of type Double + MLDouble darray = new MLDouble(escapeString(c.getId()),(Double[])list.toArray(new Double[list.size()]), dsize); + matlablist.add(darray); + } + + } - + // Write Matlab file + MatFileWriter writerr = new MatFileWriter(); + writerr.write(outfile, matlablist); } - // Write Matlab file - MatFileWriter writerr = new MatFileWriter(); - writerr.write(outfile, matlablist); - - - } catch (InterruptedException e) { - // TODO Stop loop and exit logic instead of throwing an Exception - throw new RuntimeException("Data serializer was interrupted while writing data to file",e); } catch (IOException e) { throw new RuntimeException("Data serializer had a problem writing to the specified file",e); } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMDA.java b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMDA.java index af2a3de..a427d15 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMDA.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerMDA.java @@ -30,9 +30,11 @@ import java.util.HashMap; import java.util.List; import java.util.logging.Logger; +import com.google.common.eventbus.Subscribe; + import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; -import ch.psi.fda.core.messages.DataQueue; +import ch.psi.fda.core.messages.DataMessageMetadata; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; import ch.psi.fda.core.messages.StreamDelimiterMessage; @@ -47,81 +49,75 @@ import ch.psi.fda.core.messages.StreamDelimiterMessage; */ public class DataSerializerMDA implements DataSerializer{ - - // Get Logger private static final Logger logger = Logger.getLogger(DataSerializerMDA.class.getName()); - private DataQueue queue; + private DataMessageMetadata metadata; private File file; - public DataSerializerMDA(DataQueue queue, File file){ - this.queue = queue; + private boolean first = true; + + private List firstL; + private List takeData; + private List dcountL; + private List>>> dimensionList; + private HashMap> dMap; + private HashMap> idMap; + private int numberOfDimensions; + + public DataSerializerMDA(DataMessageMetadata metadata, File file){ + this.metadata = metadata; this.file = file; } - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - try{ - - - - // Analyze header - - // Map holding all indexes for a given dimension - HashMap> dMap = new HashMap>(); - // Map holding all ids for a given dimension - HashMap> idMap = new HashMap>(); - - List mlist = queue.getDataMessageMetadata().getComponents(); - for(int index=0;index()); + + + @Subscribe + public void onMessage(Message message) { + if(first){ + first = false; + // Analyze header + + // Map holding all indexes for a given dimension + dMap = new HashMap>(); + // Map holding all ids for a given dimension + idMap = new HashMap>(); + + List mlist = metadata.getComponents(); + for(int index=0;index()); + } + if(!idMap.containsKey(m.getDimension())){ + idMap.put(m.getDimension(), new ArrayList()); + } + dMap.get(m.getDimension()).add(index); + idMap.get(m.getDimension()).add(m.getId()); } - if(!idMap.containsKey(m.getDimension())){ - idMap.put(m.getDimension(), new ArrayList()); + + //dimensions/dimension/dimensioncomponents/component/componentvalue + dimensionList = new ArrayList>>>(); + + numberOfDimensions = dMap.size(); + logger.info("Number of dimensions: "+numberOfDimensions); + for(int i=0;i>>()); + } + + firstL = new ArrayList(); + takeData = new ArrayList(); // Flag whether to take data for this dimension + dcountL = new ArrayList(); // How many times this dimension is there + + for(int i=0;i>>> dimensionList = new ArrayList>>>(); - - int numberOfDimensions = dMap.size(); - logger.info("Number of dimensions: "+numberOfDimensions); - for(int i=0;i>>()); - } - - -// // Transposed data list -// List> dlist = new ArrayList>(); -//// List> clist = new ArrayList>(); -// int dsize = 0; // Size of the dimension -// int dcount = 0; -// boolean firstF = true; - - List firstL = new ArrayList(); - List takeData = new ArrayList(); // Flag whether to take data for this dimension - List dcountL = new ArrayList(); // How many times this dimension is there - - for(int i=0;i=0; i--){ - int s = dimensionList.get(i).get(0).get(0).size(); - x.writeInt(s); // Dimension size - logger.info("Size: "+i+" - "+s+" "); - } - - x.writeInt(1); // Is Regular (true=1, false=0) - x.writeInt(0); // Number of extra pvs - - // Write data - HashMap indexCount = new HashMap(); - for(int i=0;i=0; i--){ + int s = dimensionList.get(i).get(0).get(0).size(); + x.writeInt(s); // Dimension size + logger.info("Size: "+i+" - "+s+" "); + } + + x.writeInt(1); // Is Regular (true=1, false=0) + x.writeInt(0); // Number of extra pvs + // Write data + HashMap indexCount = new HashMap(); + for(int i=0;i>>> dimensionList, HashMap icount, HashMap> idMap, int dnum) throws IOException{ diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXT.java b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXT.java index cc6db33..f9d300e 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXT.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXT.java @@ -30,7 +30,6 @@ import com.google.common.eventbus.Subscribe; import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; import ch.psi.fda.core.messages.DataMessageMetadata; -import ch.psi.fda.core.messages.DataQueue; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; import ch.psi.fda.core.messages.StreamDelimiterMessage; @@ -42,11 +41,8 @@ import ch.psi.fda.core.messages.StreamDelimiterMessage; */ public class DataSerializerTXT implements DataSerializer{ - - // Get Logger private static final Logger logger = Logger.getLogger(DataSerializerTXT.class.getName()); -// private DataQueue queue; private File file; private boolean appendSuffix = true; @@ -57,9 +53,18 @@ public class DataSerializerTXT implements DataSerializer{ private DataMessageMetadata meta; + private int icount; + private String basename; + private String extension; + private boolean newfile; + private boolean dataInBetween; + private BufferedWriter writer; + private StringBuffer b; + private StringBuffer b1; + + /** - * - * @param queue + * @param metadata * @param file * @param appendSuffix Flag whether to append a _0000 suffix after the original file name */ @@ -69,179 +74,6 @@ public class DataSerializerTXT implements DataSerializer{ this.appendSuffix = appendSuffix; } - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ - @Override - public void run() { -// try{ -// -// // WORKAROUND BEGIN -// -//// if(appendSuffix){ -//// // Append a count suffix to the file. If there is already a file with -//// // this suffix increase the counter for the suffix -//// int cnt = 0; -//// String fname = this.file.getAbsolutePath(); // Determine file name -//// String extension = fname.replaceAll("^.*\\.", ""); // Determine extension -//// fname = fname.replaceAll("\\."+extension+"$", ""); -//// -//// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); -//// -//// while(outfile.exists()){ -//// cnt++; -//// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); -//// } -//// } -//// else{ -//// outfile = this.file; -//// } -// // WORKAROUND END -// -// -// -// -// -// // Write header -// StringBuffer b = new StringBuffer(); -// StringBuffer b1 = new StringBuffer(); -// b.append("#"); -// b1.append("#"); -// for(ComponentMetadata c: queue.getDataMessageMetadata().getComponents()){ -// -// b.append(c.getId()); -// b.append("\t"); -// -// b1.append(c.getDimension()); -// b1.append("\t"); -// } -// b.setCharAt(b.length()-1, '\n'); -// b1.setCharAt(b1.length()-1, '\n'); -// -// -// int icount = 0; -// boolean newfile = true; -// boolean dataInBetween = false; -// BufferedWriter writer = null; -// -// // Get basename of the file -// String basename = this.file.getAbsolutePath(); // Determine file name -// String extension = basename.replaceAll("^.*\\.", ""); // Determine extension -// basename = basename.replaceAll("\\."+extension+"$", ""); -// -// // Write data -// // Read Message -// Message message = queue.getQueue().take(); -// while(!(message instanceof EndOfStreamMessage)){ -// if(message instanceof DataMessage){ -// dataInBetween = true; -// if(newfile){ -// // Open new file and write header -// // Construct file name -// if(appendSuffix){ -// outfile = new File(String.format("%s_%04d.%s", basename, icount, extension)); -// } -// else{ -// outfile = new File(String.format("%s.%s", basename, extension)); -// } -// -// // Open file -// logger.fine("Open new data file: "+outfile.getAbsolutePath()); -// writer = new BufferedWriter(new FileWriter(outfile)); -// -// // Write header -// writer.write(b.toString()); -// writer.write(b1.toString()); -// -// newfile=false; -// } -// -// // Write message to file - each message will result in one line -// DataMessage m = (DataMessage) message; -// StringBuffer buffer = new StringBuffer(); -// for(Object o: m.getData()){ -// if(o.getClass().isArray()){ -// // If the array object is of type double[] display its content -// if(o instanceof double[]){ -// double[] oa = (double[]) o; -// for(double o1 : oa){ -// buffer.append(o1); -// buffer.append(" "); // Use space instead of tab -// } -// buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab -// } -// else if(o instanceof Object[]){ -// // TODO need to be recursive ... -// Object[] oa = (Object[])o; -// for(Object o1 : oa){ -// buffer.append(o1); -// buffer.append(" "); // Use space instead of tab -// } -// buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab -// } -// else{ -// buffer.append("-"); // Not supported -// } -// } -// else{ -// buffer.append(o); -// buffer.append("\t"); -// } -// } -// -// if(buffer.length()>0){ -// buffer.deleteCharAt(buffer.length()-1); // Remove last character (i.e. \t) -// buffer.append("\n"); // Append newline -// } -// writer.write(buffer.toString()); -// } -// else if(message instanceof StreamDelimiterMessage){ -// StreamDelimiterMessage m = (StreamDelimiterMessage) message; -// logger.info("Delimiter - number: "+m.getNumber()+" iflag: "+m.isIflag()); -// if(m.isIflag() && appendSuffix){ -// // Only increase iflag counter if there was data in between -// // subsequent StreamDelimiterMessages. -// if(dataInBetween){ -// icount++; -// } -// dataInBetween = false; -// -// // Set flag to open new file -// newfile = true; -// -// // Close file -// writer.close(); -// } -// } -// -// // Read next message -// message = queue.getQueue().take(); -// } -// -// if(writer!=null){ -// // Close file -// writer.close(); //If the stream was closed previously this has no effect -// } -// // Writer can be null if a scan is defined without a dimension -// -// } catch (InterruptedException e) { -// // TODO Stop loop and exit logic instead of throwing an Exception -// throw new RuntimeException("Data serializer was interrupted while writing data to file",e); -// } catch (IOException e) { -// throw new RuntimeException("Data serializer had a problem writing to the specified file",e); -// } -// - } - - int icount; - String basename; - String extension; - boolean newfile; - boolean dataInBetween; - BufferedWriter writer; - StringBuffer b; - StringBuffer b1; - @Subscribe public void onMessage(Message message){ try{ diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXT2D.java b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXT2D.java index bf24aae..87ce9f0 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXT2D.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXT2D.java @@ -26,9 +26,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import com.google.common.eventbus.Subscribe; + import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; -import ch.psi.fda.core.messages.DataQueue; +import ch.psi.fda.core.messages.DataMessageMetadata; import ch.psi.fda.core.messages.StreamDelimiterMessage; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; @@ -40,22 +42,31 @@ import ch.psi.fda.core.messages.Message; */ public class DataSerializerTXT2D implements DataSerializer{ - private DataQueue queue; + private DataMessageMetadata metadata; private File file; private boolean appendSuffix = false; + private boolean first = true; + + File outfile; + List> dlist; + List> clist; + int dsize; + int dcount; + boolean firstF; + /** * Construtor * @param queue Data queue holding the data to serialize * @param file Name of the Matlab file to serialize the data to */ - public DataSerializerTXT2D(DataQueue queue, File file){ - this.queue = queue; + public DataSerializerTXT2D(DataMessageMetadata metadata, File file){ + this.metadata = metadata; this.file = file; // Check if input queue does only hold 2D data int maxdim=0; - for(ComponentMetadata m: queue.getDataMessageMetadata().getComponents()){ + for(ComponentMetadata m: metadata.getComponents()){ if(m.getDimension()>maxdim){ maxdim=m.getDimension(); } @@ -70,49 +81,46 @@ public class DataSerializerTXT2D implements DataSerializer{ } } - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ - @Override - public void run() { + @Subscribe + public void onMessage(Message message) { try{ - // WORKAROUND BEGIN - File outfile; - if(appendSuffix){ - // Append a count suffix to the file. If there is already a file with - // this suffix increase the counter for the suffix - int cnt = 0; - String fname = this.file.getAbsolutePath(); // Determine file name - String extension = fname.replaceAll("^.*\\.", ""); // Determine extension - // fname = fname.replaceAll("(_[0-9]+)?\\."+extension+"$", ""); - fname = fname.replaceAll("\\."+extension+"$", ""); + if(first){ + first = false; + // WORKAROUND BEGIN - outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); - - while(outfile.exists()){ - cnt++; + if(appendSuffix){ + // Append a count suffix to the file. If there is already a file with + // this suffix increase the counter for the suffix + int cnt = 0; + String fname = this.file.getAbsolutePath(); // Determine file name + String extension = fname.replaceAll("^.*\\.", ""); // Determine extension + // fname = fname.replaceAll("(_[0-9]+)?\\."+extension+"$", ""); + fname = fname.replaceAll("\\."+extension+"$", ""); + outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); + + while(outfile.exists()){ + cnt++; + outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); + } } + else{ + outfile = this.file; + } + // WORKAROUND END + + + // Transposed data list + dlist = new ArrayList>(); + clist = new ArrayList>(); + dsize = 0; // Size of the dimension + dcount = 0; + + firstF = true; } - else{ - outfile = this.file; - } - // WORKAROUND END - - // Transposed data list - List> dlist = new ArrayList>(); - List> clist = new ArrayList>(); - int dsize = 0; // Size of the dimension - int dcount = 0; - boolean firstF = true; - - // Write data - // Read Message - Message message = queue.getQueue().take(); - while(!(message instanceof EndOfStreamMessage)){ if(message instanceof DataMessage){ DataMessage m = (DataMessage) message; @@ -143,59 +151,53 @@ public class DataSerializerTXT2D implements DataSerializer{ } } - // Read next message - message = queue.getQueue().take(); - } - // Open file - BufferedWriter writer = new BufferedWriter(new FileWriter(outfile)); - - // Create text images - for(int t=0; t list = dlist.get(t); - - - - if(clist.get(t).isArray()){ - // Array Handling - } - else if(clist.get(t).equals(Double.class)){ - // Data is of type Double + // Create text images + for(int t=0; t list = dlist.get(t); + + + + if(clist.get(t).isArray()){ + // Array Handling + } + else if(clist.get(t).equals(Double.class)){ + // Data is of type Double + + StringBuffer b = new StringBuffer(); + int counter = 0; + for(Object o: list){ + b.append(o); + counter++; + if(counter==dsize){ + b.append("\n"); + counter=0; + } + else{ + b.append(" "); + } } + + writer.write(b.toString()); } - writer.write(b.toString()); + writer.write("\n"); + } - writer.write("\n"); - + // Close file + writer.close(); } - // Close file - writer.close(); - - - - } catch (InterruptedException e) { - // TODO Stop loop and exit logic instead of throwing an Exception - throw new RuntimeException("Data serializer was interrupted while writing data to file",e); } catch (IOException e) { throw new RuntimeException("Data serializer had a problem writing to the specified file",e); } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXTSplit.java b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXTSplit.java index 444696f..0639413 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXTSplit.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXTSplit.java @@ -26,11 +26,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import com.google.common.eventbus.Subscribe; + import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; -import ch.psi.fda.core.messages.DataQueue; +import ch.psi.fda.core.messages.DataMessageMetadata; import ch.psi.fda.core.messages.StreamDelimiterMessage; -import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; /** @@ -40,20 +41,23 @@ import ch.psi.fda.core.messages.Message; */ public class DataSerializerTXTSplit implements DataSerializer{ - private DataQueue queue; + private DataMessageMetadata metadata; private File file; private int maxdim = 0; -// private boolean appendSuffix = false; + private boolean first = true; + + private List header; + private List data; - public DataSerializerTXTSplit(DataQueue queue, File file){ - this.queue = queue; + public DataSerializerTXTSplit(DataMessageMetadata metadata, File file){ + this.metadata = metadata; this.file = file; // Determine maximum dimension - for(ComponentMetadata m: queue.getDataMessageMetadata().getComponents()){ + for(ComponentMetadata m: metadata.getComponents()){ if(m.getDimension()>maxdim){ maxdim=m.getDimension(); } @@ -64,62 +68,42 @@ public class DataSerializerTXTSplit implements DataSerializer{ } } - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ - @Override - public void run() { + + + @Subscribe + public void onMessage(Message message) { try{ - // WORKAROUND BEGIN -// File outfile; -// if(appendSuffix){ -// // Append a count suffix to the file. If there is already a file with -// // this suffix increase the counter for the suffix -// int cnt = 0; -// String fname = this.file.getAbsolutePath(); // Determine file name -// String extension = fname.replaceAll("^.*\\.", ""); // Determine extension -// fname = fname.replaceAll("\\."+extension+"$", ""); -// -// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); -// -// while(outfile.exists()){ -// cnt++; -// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); -// } -// } -// else{ -// outfile = this.file; -// } - // WORKAROUND END - - - - List header = new ArrayList(); - - // Write header - StringBuffer b = new StringBuffer(); - StringBuffer b1 = new StringBuffer(); - b.append("#"); - b1.append("#"); - for(ComponentMetadata c: queue.getDataMessageMetadata().getComponents()){ - - b.append(c.getId()); - b.append("\t"); + if(first){ + first=false; + + header = new ArrayList(); + data = new ArrayList(); + + // Write header + StringBuffer b = new StringBuffer(); + StringBuffer b1 = new StringBuffer(); + b.append("#"); + b1.append("#"); + for(ComponentMetadata c: metadata.getComponents()){ + + b.append(c.getId()); + b.append("\t"); + + b1.append(c.getDimension()); + b1.append("\t"); + } + b.setCharAt(b.length()-1, '\n'); + b1.setCharAt(b1.length()-1, '\n'); + header.add(b.toString()); + header.add(b1.toString()); + - b1.append(c.getDimension()); - b1.append("\t"); } - b.setCharAt(b.length()-1, '\n'); - b1.setCharAt(b1.length()-1, '\n'); - header.add(b.toString()); - header.add(b1.toString()); - List data = new ArrayList(); // Write data // Read Message - Message message = queue.getQueue().take(); - while(!(message instanceof EndOfStreamMessage)){ + if(message instanceof DataMessage){ // Write message to file - each message will result in one line @@ -168,19 +152,6 @@ public class DataSerializerTXTSplit implements DataSerializer{ } } - // Read next message - message = queue.getQueue().take(); - } - -// // Open file -// BufferedWriter writer = new BufferedWriter(new FileWriter(outfile)); -// -// // Close file -// writer.close(); - - } catch (InterruptedException e) { - // TODO Stop loop and exit logic instead of throwing an Exception - throw new RuntimeException("Data serializer was interrupted while writing data to file",e); } catch (IOException e) { throw new RuntimeException("Data serializer had a problem writing to the specified file",e); } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/vis/VisualizationEngine.java b/ch.psi.fda/src/main/java/ch/psi/fda/vis/VisualizationEngine.java index 69ff7e2..764fa6a 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/vis/VisualizationEngine.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/vis/VisualizationEngine.java @@ -24,6 +24,7 @@ import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent; import java.io.File; import java.io.PrintWriter; +import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; @@ -44,6 +45,9 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.xml.sax.SAXException; +import com.google.common.eventbus.AsyncEventBus; +import com.google.common.eventbus.EventBus; + import ch.psi.fda.deserializer.DataDeserializer; import ch.psi.fda.deserializer.DataDeserializerTXT; import ch.psi.fda.gui.ScrollableFlowPanel; @@ -112,11 +116,12 @@ public class VisualizationEngine { throw new IllegalArgumentException("Data file ["+data.getAbsolutePath()+"] does not exist"); } + EventBus bus = new AsyncEventBus(Executors.newCachedThreadPool()); // Create deserializer - DataDeserializer deserializer = new DataDeserializerTXT(data); + DataDeserializer deserializer = new DataDeserializerTXT(bus, data); // Create Visualizer - Visualizer visualizer = new Visualizer(deserializer.getQueue().getDataMessageMetadata(), configuration.getVisualization()); + Visualizer visualizer = new Visualizer(deserializer.getMetadata(), configuration.getVisualization()); // visualizer.setTerminateAtEOS(true); // Adapt default visualizer behavior to optimize performance for visualization @@ -155,12 +160,10 @@ public class VisualizationEngine { // Start deserializer and visualizer - Thread td = new Thread(deserializer); - - td.start(); + bus.register(visualizer); visualizer.configure(); + deserializer.read(); - td.join(); logger.info("Deserializer finished"); // visualizer.stopVisualization(); } diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/core/manipulator/ManipulatorTest.java b/ch.psi.fda/src/test/java/ch/psi/fda/core/manipulator/ManipulatorTest.java index 6ab9024..fcfc26a 100644 --- a/ch.psi.fda/src/test/java/ch/psi/fda/core/manipulator/ManipulatorTest.java +++ b/ch.psi.fda/src/test/java/ch/psi/fda/core/manipulator/ManipulatorTest.java @@ -20,7 +20,6 @@ package ch.psi.fda.core.manipulator; import static org.junit.Assert.*; - import gov.aps.jca.CAException; import java.util.ArrayList; @@ -33,6 +32,9 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; + import ch.psi.fda.TestChannels; import ch.psi.fda.core.manipulator.JythonManipulation; import ch.psi.fda.core.manipulator.Manipulation; @@ -58,12 +60,14 @@ public class ManipulatorTest { // Get Logger private static Logger logger = Logger.getLogger(ManipulatorTest.class.getName()); + private EventBus bus; + /** * @throws java.lang.Exception */ @Before public void setUp() throws Exception { - + bus = new EventBus(); } /** @@ -91,7 +95,7 @@ public class ManipulatorTest { // id "myid" which is expected in the mapping List manipulations = new ArrayList(); manipulations.add(manipulation); - new Manipulator(inQueue, manipulations); + new Manipulator(bus, inQueue, manipulations); } @@ -114,7 +118,7 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - new Manipulator(inQueue, manipulations); + new Manipulator(bus, inQueue, manipulations); // Expect IllegalArgument Exception as there is no mapping for the parameter c } @@ -139,7 +143,7 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - new Manipulator(inQueue, manipulations); + new Manipulator(bus, inQueue, manipulations); // Expect IllegalArgument Exception as there is no mapping for the parameter c } @@ -167,7 +171,7 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - new Manipulator(inQueue, manipulations); + new Manipulator(bus, inQueue, manipulations); // Expect IllegalArgument Exception as there is no mapping for the parameter c } @@ -194,10 +198,10 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - Manipulator manipulator = new Manipulator(inQueue, manipulations); + Manipulator manipulator = new Manipulator(bus, inQueue, manipulations); // Check whether output queue message structur complies to expected one - DataMessageMetadata outMeta = manipulator.getOutQueue().getDataMessageMetadata(); + DataMessageMetadata outMeta = manipulator.getMetadata(); // Test whether only the expected components are within the outgoing queue if(outMeta.getComponents().size()!=2){ @@ -214,25 +218,30 @@ public class ManipulatorTest { fail("Id of the second component does not match the expected id 'cid'"); } - - manipulator.run(); - - Message message = manipulator.getOutQueue().getQueue().take(); - while(!(message instanceof EndOfStreamMessage)){ - - logger.info(message.toString()); - - if(message instanceof DataMessage){ - DataMessage dm = (DataMessage) message; - dm.getData().get(0); - double res = ((Double)dm.getData().get(1)) - (Math.cos(10.0)+Math.sin(((Double)dm.getData().get(0)))); - if( Math.abs(res) > 0.000000001){ - fail("Calculation failed"); + bus.register(new Object(){ + @Subscribe + public void onMessage(Message message){ + logger.info(message.toString()); + + if(message instanceof DataMessage){ + DataMessage dm = (DataMessage) message; + dm.getData().get(0); + double res = ((Double)dm.getData().get(1)) - (Math.cos(10.0)+Math.sin(((Double)dm.getData().get(0)))); + if( Math.abs(res) > 0.000000001){ + fail("Calculation failed"); + } } } - - message = manipulator.getOutQueue().getQueue().take(); - } + }); + manipulator.run(); + +// Message message = manipulator.getOutQueue().getQueue().take(); +// while(!(message instanceof EndOfStreamMessage)){ +// +// +// +// message = manipulator.getOutQueue().getQueue().take(); +// } logger.info(""+(Math.cos(10.0)+Math.sin(10))); } @@ -260,10 +269,10 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - Manipulator manipulator = new Manipulator(inQueue, manipulations); + Manipulator manipulator = new Manipulator(bus, inQueue, manipulations); // Check whether output queue message structur complies to expected one - DataMessageMetadata outMeta = manipulator.getOutQueue().getDataMessageMetadata(); + DataMessageMetadata outMeta = manipulator.getMetadata(); // Test whether only the expected components are within the outgoing queue if(outMeta.getComponents().size()!=2){ @@ -280,21 +289,18 @@ public class ManipulatorTest { fail("Id of the second component does not match the expected id 'cid'"); } - - manipulator.run(); - - Message message = manipulator.getOutQueue().getQueue().take(); - while(!(message instanceof EndOfStreamMessage)){ - - logger.info(message.toString()); - - if(message instanceof DataMessage){ - DataMessage dm = (DataMessage) message; - dm.getData().get(0); + bus.register(new Object(){ + @Subscribe + public void onMessage(Message message){ + logger.info(message.toString()); + + if(message instanceof DataMessage){ + DataMessage dm = (DataMessage) message; + dm.getData().get(0); + } } - - message = manipulator.getOutQueue().getQueue().take(); - } + }); + manipulator.run(); } /** @@ -339,44 +345,31 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - Manipulator manipulator = new Manipulator(inQueue, manipulations); + Manipulator manipulator = new Manipulator(bus, inQueue, manipulations); Thread t = new Thread(manipulator); - final DataQueue outQueue = manipulator.getOutQueue(); - - Thread tp = new Thread(new Runnable() { - - @Override - public void run() { - try{ - int count=0; - Message message; - while((message = outQueue.getQueue().take()) != null){ - if(!(message instanceof EndOfStreamMessage)){ - logger.info(count+" - "+message.toString()); - } - else{ - break; - } - count++; - } - } - catch (Exception e) { - e.printStackTrace(); + bus.register(new Object(){ + int count=0; + @Subscribe + public void onMessage(Message message){ + if(!(message instanceof EndOfStreamMessage)){ + logger.info(count+" - "+message.toString()); + count++; } + } }); + + tf.start(); t.start(); - tp.start(); tf.join(); t.join(); - tp.join(); } /** @@ -405,10 +398,10 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - Manipulator manipulator = new Manipulator(inQueue, manipulations); + Manipulator manipulator = new Manipulator(bus, inQueue, manipulations); // Check whether output queue message structur complies to expected one - DataMessageMetadata outMeta = manipulator.getOutQueue().getDataMessageMetadata(); + DataMessageMetadata outMeta = manipulator.getMetadata(); // Test whether only the expected components are within the outgoing queue if(outMeta.getComponents().size()!=3){ @@ -429,26 +422,22 @@ public class ManipulatorTest { fail("Id of the second component does not match the expected id 'cid'"); } - - manipulator.run(); - - Message message = manipulator.getOutQueue().getQueue().take(); - while(!(message instanceof EndOfStreamMessage)){ - - logger.info(message.toString()); - - if(message instanceof DataMessage){ - DataMessage dm = (DataMessage) message; - dm.getData().get(0); - double res = ((Double)dm.getData().get(2)) - (Math.cos(((Double)dm.getData().get(1)))+Math.sin(((Double)dm.getData().get(0)))); - if( Math.abs(res) > 0.000000001){ - fail("Calculation failed"); + bus.register(new Object(){ + @Subscribe + public void onMessage(Message message){ + logger.info(message.toString()); + + if(message instanceof DataMessage){ + DataMessage dm = (DataMessage) message; + dm.getData().get(0); + double res = ((Double)dm.getData().get(2)) - (Math.cos(((Double)dm.getData().get(1)))+Math.sin(((Double)dm.getData().get(0)))); + if( Math.abs(res) > 0.000000001){ + fail("Calculation failed"); + } } } - - - message = manipulator.getOutQueue().getQueue().take(); - } + }); + manipulator.run(); logger.info(""+(Math.cos(0.2)+Math.sin(10))); @@ -488,10 +477,10 @@ public class ManipulatorTest { List manipulations = new ArrayList(); manipulations.add(manipulation); - Manipulator manipulator = new Manipulator(inQueue, manipulations); + Manipulator manipulator = new Manipulator(bus, inQueue, manipulations); // Check whether output queue message structur complies to expected one - DataMessageMetadata outMeta = manipulator.getOutQueue().getDataMessageMetadata(); + DataMessageMetadata outMeta = manipulator.getMetadata(); // Test whether only the expected components are within the outgoing queue if(outMeta.getComponents().size()!=3){ @@ -515,25 +504,23 @@ public class ManipulatorTest { // Change something different on the channel than the value that will be set in the manipulator script cbean.setValue(setValue+1); - manipulator.run(); - - Message message = manipulator.getOutQueue().getQueue().take(); - while(!(message instanceof EndOfStreamMessage)){ - - logger.info(message.toString()); - - if(message instanceof DataMessage){ - DataMessage dm = (DataMessage) message; - dm.getData().get(0); - double res = ((Double)dm.getData().get(2)) - (Math.cos(((Double)dm.getData().get(1)))+Math.sin(((Double)dm.getData().get(0)))); - if( Math.abs(res) > 0.000000001){ - fail("Calculation failed"); + bus.register(new Object(){ + @Subscribe + public void onMessage(Message message){ + logger.info(message.toString()); + + if(message instanceof DataMessage){ + DataMessage dm = (DataMessage) message; + dm.getData().get(0); + double res = ((Double)dm.getData().get(2)) - (Math.cos(((Double)dm.getData().get(1)))+Math.sin(((Double)dm.getData().get(0)))); + if( Math.abs(res) > 0.000000001){ + fail("Calculation failed"); + } } } - - - message = manipulator.getOutQueue().getQueue().take(); - } + }); + + manipulator.run(); logger.info(""+(Math.cos(0.2)+Math.sin(10))); diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/deserializer/DataDeserializerMDATest.java b/ch.psi.fda/src/test/java/ch/psi/fda/deserializer/DataDeserializerMDATest.java index ebfebe8..519f4b1 100644 --- a/ch.psi.fda/src/test/java/ch/psi/fda/deserializer/DataDeserializerMDATest.java +++ b/ch.psi.fda/src/test/java/ch/psi/fda/deserializer/DataDeserializerMDATest.java @@ -22,18 +22,18 @@ package ch.psi.fda.deserializer; import java.io.File; import java.net.URI; import java.net.URL; -import java.util.concurrent.BlockingQueue; -import java.util.logging.Level; import java.util.logging.Logger; import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; + import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.ControlMessage; import ch.psi.fda.core.messages.DataMessage; -import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; /** @@ -44,84 +44,59 @@ import ch.psi.fda.core.messages.Message; */ public class DataDeserializerMDATest { - // Get Logger private static Logger logger = Logger.getLogger(DataDeserializerMDATest.class.getName()); - + + private EventBus bus; private DataDeserializerMDA deserializer; - /** - * @throws java.lang.Exception - */ @Before public void setUp() throws Exception { + bus = new EventBus(); URL url = this.getClass().getClassLoader().getResource("testdata/mda/mdadata7.mda"); - deserializer = new DataDeserializerMDA(new File(new URI(url.toString()))); + deserializer = new DataDeserializerMDA(bus, new File(new URI(url.toString()))); } - /** - * @throws java.lang.Exception - */ @After public void tearDown() throws Exception { } - /** - * Test method for {@link ch.psi.fda.deserializer.DataDeserializerTXT#run()}. - * @throws InterruptedException - */ @Test - public void testRun() throws InterruptedException { + public void testRead() throws InterruptedException { - Thread t = new Thread(new Runnable() { - @Override - public void run() { - try { - BlockingQueue q = deserializer.getQueue().getQueue(); - - while(true){ - Message m = q.take(); - if(m instanceof DataMessage){ - DataMessage x = (DataMessage) m; - logger.info( x.toString() ); - } - else if(m instanceof ControlMessage){ - if(m instanceof EndOfStreamMessage){ - break; - } - logger.info("---- "+m.toString()+" ----"); - } - } - - StringBuilder b = new StringBuilder(); - b.append("["); - StringBuilder b1 = new StringBuilder(); - b1.append("["); - for(ComponentMetadata cm : deserializer.getQueue().getDataMessageMetadata().getComponents()){ - b.append(" "); - b.append(cm.getId()); - b1.append(" "); - b1.append(cm.getDimension()); - } - b.append(" ]"); - b1.append(" ]"); - - logger.info("Metadata "+b.toString()); - logger.info("Metadata "+b1.toString()); - } catch (InterruptedException e) { - logger.log(Level.SEVERE, "An Exception occured while reading data from the data queue", e); + // Visualize metadata + StringBuilder b = new StringBuilder(); + b.append("["); + StringBuilder b1 = new StringBuilder(); + b1.append("["); + for(ComponentMetadata cm : deserializer.getMetadata().getComponents()){ + b.append(" "); + b.append(cm.getId()); + b1.append(" "); + b1.append(cm.getDimension()); + } + b.append(" ]"); + b1.append(" ]"); + + logger.info("Metadata "+b.toString()); + logger.info("Metadata "+b1.toString()); + + + // Do "read" data + bus.register(new Object(){ + @Subscribe + public void onMessage(Message m){ + if(m instanceof DataMessage){ + DataMessage x = (DataMessage) m; + logger.info( x.toString() ); + } + else if(m instanceof ControlMessage){ + logger.info("---- "+m.toString()+" ----"); } } }); + deserializer.read(); - Thread tt = new Thread(deserializer); - tt.start(); - t.start(); - - tt.join(); - t.join(); - - } } diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/deserializer/DataDeserializerTest.java b/ch.psi.fda/src/test/java/ch/psi/fda/deserializer/DataDeserializerTest.java index dd81ba3..80ebd85 100644 --- a/ch.psi.fda/src/test/java/ch/psi/fda/deserializer/DataDeserializerTest.java +++ b/ch.psi.fda/src/test/java/ch/psi/fda/deserializer/DataDeserializerTest.java @@ -22,38 +22,36 @@ package ch.psi.fda.deserializer; import java.io.File; import java.net.URI; import java.net.URL; -import java.util.logging.Level; import java.util.logging.Logger; import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; + import ch.psi.fda.core.messages.ControlMessage; import ch.psi.fda.core.messages.DataMessage; -import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; import ch.psi.fda.deserializer.DataDeserializer; import ch.psi.fda.deserializer.DataDeserializerTXT; -/** - * @author ebner - * - */ public class DataDeserializerTest { - // Get Logger private static Logger logger = Logger.getLogger(DataDeserializerTest.class.getName()); private DataDeserializer deserializer; + private EventBus bus; /** * @throws java.lang.Exception */ @Before public void setUp() throws Exception { + bus = new EventBus(); URL url = this.getClass().getClassLoader().getResource("testdata/text/textdata2.txt"); - deserializer = new DataDeserializerTXT(new File(new URI(url.toString()))); + deserializer = new DataDeserializerTXT(bus, new File(new URI(url.toString()))); } /** @@ -70,38 +68,18 @@ public class DataDeserializerTest { @Test public void testRun() throws InterruptedException { - Thread t = new Thread(new Runnable() { - @Override - public void run() { - try { - while(true){ - Message m = deserializer.getQueue().getQueue().take(); - if(m instanceof DataMessage){ - DataMessage x = (DataMessage) m; - logger.info( x.toString() ); - } - else if(m instanceof ControlMessage){ - if(m instanceof EndOfStreamMessage){ - break; - } - logger.info("---- "+m.toString()+" ----"); - } - } - } catch (InterruptedException e) { - logger.log(Level.SEVERE, "An Exception occured while reading data from the data queue", e); + bus.register(new Object(){ + @Subscribe + public void onMessage(Message m){ + if(m instanceof DataMessage){ + DataMessage x = (DataMessage) m; + logger.info( x.toString() ); + } + else if(m instanceof ControlMessage){ + logger.info("---- "+m.toString()+" ----"); } } }); - - Thread tt = new Thread(deserializer); - - tt.start(); - t.start(); - - tt.join(); - t.join(); - - + deserializer.read(); } - } diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/serializer/DataSerializerTest.java b/ch.psi.fda/src/test/java/ch/psi/fda/serializer/DataSerializerTest.java index d6bb89a..405707d 100644 --- a/ch.psi.fda/src/test/java/ch/psi/fda/serializer/DataSerializerTest.java +++ b/ch.psi.fda/src/test/java/ch/psi/fda/serializer/DataSerializerTest.java @@ -20,20 +20,18 @@ package ch.psi.fda.serializer; import java.io.File; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.eventbus.EventBus; + import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; import ch.psi.fda.core.messages.DataMessageMetadata; -import ch.psi.fda.core.messages.DataQueue; import ch.psi.fda.core.messages.StreamDelimiterMessage; import ch.psi.fda.core.messages.EndOfStreamMessage; -import ch.psi.fda.core.messages.Message; import ch.psi.fda.serializer.DataSerializer; import ch.psi.fda.serializer.DataSerializerMAT; import ch.psi.fda.serializer.DataSerializerMAT2D; @@ -41,16 +39,13 @@ import ch.psi.fda.serializer.DataSerializerTXT; import ch.psi.fda.serializer.DataSerializerTXT2D; import ch.psi.fda.serializer.DataSerializerTXTSplit; -/** - * @author ebner - * - */ + public class DataSerializerTest { private static final String tmpDirectory = "target/tmp"; - - private DataQueue queue; + private DataMessageMetadata metadata; + private EventBus bus; /** * @throws java.lang.Exception @@ -58,11 +53,8 @@ public class DataSerializerTest { @Before public void setUp() throws Exception { new File(tmpDirectory).mkdirs(); - BlockingQueue q3 = new LinkedBlockingQueue(); - DataMessageMetadata m3 = new DataMessageMetadata(); - - this.queue = new DataQueue(q3, m3); - + metadata = new DataMessageMetadata(); + bus = new EventBus(); } /** @@ -71,23 +63,23 @@ public class DataSerializerTest { */ private void generate1DData() throws InterruptedException{ - queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id0", 0)); - queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id1", 0)); - queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id2", 0)); + metadata.getComponents().add(new ComponentMetadata("id0", 0)); + metadata.getComponents().add(new ComponentMetadata("id1", 0)); + metadata.getComponents().add(new ComponentMetadata("id2", 0)); // Dimension DataMessage m = new DataMessage(); m.getData().add(0.000000000000000001); m.getData().add(0.1); m.getData().add(1d); // have this value as double - queue.getQueue().put(m); + bus.post(m); m = new DataMessage(); m.getData().add(0.02); m.getData().add(0.2); m.getData().add(2d); // have this value as double - queue.getQueue().put(m); - queue.getQueue().put(new EndOfStreamMessage()); + bus.post(m); + bus.post(new EndOfStreamMessage()); } /** @@ -96,9 +88,9 @@ public class DataSerializerTest { */ private void generate2DData() throws InterruptedException{ - queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id0", 1)); - queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id1", 0)); - queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id2", 0)); + metadata.getComponents().add(new ComponentMetadata("id0", 1)); + metadata.getComponents().add(new ComponentMetadata("id1", 0)); + metadata.getComponents().add(new ComponentMetadata("id2", 0)); for(double i=0;i<5;i++){ for(double t=0.1; t<10; t=t+0.1){ @@ -107,14 +99,14 @@ public class DataSerializerTest { m.getData().add(i); m.getData().add(t); m.getData().add(Math.log(t)); // have this value as double - queue.getQueue().put(m); + bus.post(m); } - queue.getQueue().put(new StreamDelimiterMessage(0)); + bus.post(new StreamDelimiterMessage(0)); } - queue.getQueue().put(new StreamDelimiterMessage(1)); + bus.post(new StreamDelimiterMessage(1)); - queue.getQueue().put(new EndOfStreamMessage()); + bus.post(new EndOfStreamMessage()); } /** @@ -123,10 +115,10 @@ public class DataSerializerTest { */ private void generate3DData() throws InterruptedException{ - queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id0", 2)); - queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id1", 1)); - queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id2", 0)); - queue.getDataMessageMetadata().getComponents().add(new ComponentMetadata("id3", 0)); + metadata.getComponents().add(new ComponentMetadata("id0", 2)); + metadata.getComponents().add(new ComponentMetadata("id1", 1)); + metadata.getComponents().add(new ComponentMetadata("id2", 0)); + metadata.getComponents().add(new ComponentMetadata("id3", 0)); for(double z=30;z<36;z++){ for(double i=0;i<6;i++){ @@ -137,16 +129,15 @@ public class DataSerializerTest { m.getData().add(i); m.getData().add(t); m.getData().add(Math.log(t)); // have this value as double - queue.getQueue().put(m); + bus.post(m); } - queue.getQueue().put(new StreamDelimiterMessage(0)); + bus.post(new StreamDelimiterMessage(0)); } - queue.getQueue().put(new StreamDelimiterMessage(1)); + bus.post(new StreamDelimiterMessage(1)); } - queue.getQueue().put(new StreamDelimiterMessage(2)); + bus.post(new StreamDelimiterMessage(2)); - - queue.getQueue().put(new EndOfStreamMessage()); + bus.post(new EndOfStreamMessage()); } /** @@ -162,9 +153,9 @@ public class DataSerializerTest { */ @Test public void testRunTXT() throws InterruptedException { + DataSerializer serializer = new DataSerializerTXT(metadata, new File(tmpDirectory+"/test.txt"), true); + bus.register(serializer); generate1DData(); - DataSerializer serializer = new DataSerializerTXT(queue, new File(tmpDirectory+"/test.txt"), true); - serializer.run(); } /** @@ -173,9 +164,10 @@ public class DataSerializerTest { */ @Test public void testRunMAT() throws InterruptedException { + + DataSerializer serializer = new DataSerializerMAT(metadata, new File(tmpDirectory+"/test.mat")); + bus.register(serializer); generate1DData(); - DataSerializer serializer = new DataSerializerMAT(queue, new File(tmpDirectory+"/test.mat")); - serializer.run(); } /** @@ -184,9 +176,9 @@ public class DataSerializerTest { */ @Test public void testRunMAT2D() throws InterruptedException { + DataSerializer serializer = new DataSerializerMAT2D(metadata, new File(tmpDirectory+"/test-2d.mat")); + bus.register(serializer); generate2DData(); - DataSerializer serializer = new DataSerializerMAT2D(queue, new File(tmpDirectory+"/test-2d.mat")); - serializer.run(); } /** @@ -195,9 +187,9 @@ public class DataSerializerTest { */ @Test public void testRunTXT2D() throws InterruptedException { + DataSerializer serializer = new DataSerializerTXT2D(metadata, new File(tmpDirectory+"/test-2d.txt")); + bus.register(serializer); generate2DData(); - DataSerializer serializer = new DataSerializerTXT2D(queue, new File(tmpDirectory+"/test-2d.txt")); - serializer.run(); } /** @@ -206,9 +198,9 @@ public class DataSerializerTest { */ @Test public void testRunSplitTXT() throws InterruptedException { + DataSerializer serializer = new DataSerializerTXTSplit(metadata, new File(tmpDirectory+"/test-2d-split.txt")); + bus.register(serializer); generate2DData(); - DataSerializer serializer = new DataSerializerTXTSplit(queue, new File(tmpDirectory+"/test-2d-split.txt")); - serializer.run(); } /** @@ -217,9 +209,9 @@ public class DataSerializerTest { */ @Test public void testRun2D() throws InterruptedException { + DataSerializer serializer = new DataSerializerMDA(metadata, new File(tmpDirectory+"/test-2d.mda")); + bus.register(serializer); generate2DData(); - DataSerializer serializer = new DataSerializerMDA(queue, new File(tmpDirectory+"/test-2d.mda")); - serializer.run(); } /** @@ -228,8 +220,8 @@ public class DataSerializerTest { */ @Test public void testRun3D() throws InterruptedException { + DataSerializer serializer = new DataSerializerMDA(metadata, new File(tmpDirectory+"/test-3d.mda")); + bus.register(serializer); generate3DData(); - DataSerializer serializer = new DataSerializerMDA(queue, new File(tmpDirectory+"/test-3d.mda")); - serializer.run(); } }