From c3238e4d225fcf3bdb876db53edc790a433e4b90 Mon Sep 17 00:00:00 2001 From: Simon Ebner Date: Thu, 3 Oct 2013 10:57:20 +0200 Subject: [PATCH] Unfinished removal of the dispatcher class. tested the migration of the serializer to EventBus. Test is/was successfull but things now need to be cleaned up properly! There are various compilation errors in the code --- .../main/java/ch/psi/fda/aq/Acquisition.java | 39 +- .../java/ch/psi/fda/aq/AcquisitionMain.java | 5 +- .../fda/core/collector/DataDispatcher.java | 109 ----- .../psi/fda/core/manipulator/Manipulator.java | 43 +- .../psi/fda/serializer/DataSerializerTXT.java | 427 ++++++++++++------ 5 files changed, 304 insertions(+), 319 deletions(-) delete mode 100644 ch.psi.fda/src/main/java/ch/psi/fda/core/collector/DataDispatcher.java diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java index e1dfa1d..67d1bde 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java @@ -57,7 +57,6 @@ import ch.psi.fda.core.actors.JythonFunction; import ch.psi.fda.core.actors.OTFActuator; import ch.psi.fda.core.actors.PseudoActuatorSensor; import ch.psi.fda.core.collector.Collector; -import ch.psi.fda.core.collector.DataDispatcher; import ch.psi.fda.core.guard.ChannelAccessGuard; import ch.psi.fda.core.guard.ChannelAccessGuardCondition; import ch.psi.fda.core.loops.ActorSensorLoop; @@ -68,6 +67,7 @@ import ch.psi.fda.core.loops.cr.ScrlogicLoop; import ch.psi.fda.core.manipulator.JythonManipulation; import ch.psi.fda.core.manipulator.Manipulation; import ch.psi.fda.core.manipulator.Manipulator; +import ch.psi.fda.core.messages.DataMessageMetadata; import ch.psi.fda.core.messages.DataQueue; import ch.psi.fda.core.messages.Message; import ch.psi.fda.core.scripting.JythonGlobalVariable; @@ -137,7 +137,6 @@ public class Acquisition { private ActionLoop actionLoop; private Collector collector; - private DataDispatcher dispatcher; private Manipulator manipulator; private DataSerializerTXT serializer; @@ -182,7 +181,7 @@ public class Acquisition { * @param getQueue Flag whether to return a queue or not. If false the return value of the function will be null. * @throws InterruptedException */ - public DataQueue initalize(EventBus bus, Configuration smodel, boolean getQueue) { + public DataMessageMetadata initalize(EventBus bus, Configuration smodel, boolean getQueue) { // Create notification agent with globally configured recipients notificationAgent = new NotificationAgent(configuration.getSmptServer(), "fda.notification@psi.ch"); @@ -252,29 +251,15 @@ public class Acquisition { Collections.reverse(collector.getQueues()); // Add manipulator into processing chain - this.manipulator = new Manipulator(collector.getOutQueue(), this.manipulations); + this.manipulator = new Manipulator(bus, collector.getOutQueue(), this.manipulations); - // // Insert dispatcher into processing chain - this.dispatcher = new DataDispatcher(bus, manipulator.getOutQueue()); - - DataQueue dq = new DataQueue(new LinkedBlockingQueue(1000), manipulator.getOutQueue().getDataMessageMetadata()); // Create bounded queue to - // prevent running out of - // memory ... - this.serializer = new DataSerializerTXT(dq, datafile, true); - - DataQueue vdq = null; - if (getQueue) { - vdq = new DataQueue(new LinkedBlockingQueue(1000), manipulator.getOutQueue().getDataMessageMetadata()); // Create bounded queue to prevent - // running out of memory ... -// dispatcher.getOutQueues().add(vdq); - } - - // Add queue for serializer to dispatcher - dispatcher.getOutQueues().add(dq); - - return (vdq); + DataMessageMetadata metadata = manipulator.getMetadata(); + this.serializer = new DataSerializerTXT(metadata, datafile, true); + bus.register(serializer); + + return (metadata); } /** @@ -294,12 +279,6 @@ public class Acquisition { Thread tm = new Thread(manipulator); tm.start(); - Thread td = new Thread(dispatcher); - td.start(); - - Thread t = new Thread(serializer); - t.start(); - actionLoop.prepare(); actionLoop.execute(); actionLoop.cleanup(); @@ -310,8 +289,6 @@ public class Acquisition { // Give the threads 1 minute to catch up tc.join(60000); tm.join(60000); - td.join(60000); - t.join(60000); // Send notifications out to all recipients that want to have success notifications try { diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/AcquisitionMain.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/AcquisitionMain.java index 2af7f91..e286750 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/AcquisitionMain.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/AcquisitionMain.java @@ -53,6 +53,7 @@ import com.google.common.eventbus.EventBus; import sun.misc.Signal; import sun.misc.SignalHandler; +import ch.psi.fda.core.messages.DataMessageMetadata; import ch.psi.fda.core.messages.DataQueue; import ch.psi.fda.gui.ProgressPanel; import ch.psi.fda.gui.ScrollableFlowPanel; @@ -264,13 +265,13 @@ public class AcquisitionMain { EventBus b = new AsyncEventBus(Executors.newSingleThreadExecutor()); - DataQueue vdq = acquisition.initalize(b, c, vis); + DataMessageMetadata dmeta = acquisition.initalize(b, c, vis); Visualizer visualizer = null; // Only register data visualization task/processor if there are visualizations if(vis){ - visualizer = new Visualizer(vdq.getDataMessageMetadata(), c.getVisualization()); + visualizer = new Visualizer(dmeta, c.getVisualization()); b.register(visualizer); // If there is a continous dimension only update plot at the end of a line if(c.getScan() != null && c.getScan().getCdimension()!=null){ diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/collector/DataDispatcher.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/collector/DataDispatcher.java deleted file mode 100644 index e2a8c30..0000000 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/collector/DataDispatcher.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * - * Copyright 2010 Paul Scherrer Institute. All rights reserved. - * - * This code is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This code is distributed in the hope that it will be useful, - * but without any warranty; without even the implied warranty of - * merchantability or fitness for a particular purpose. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this code. If not, see . - * - */ - -package ch.psi.fda.core.collector; - -import java.util.ArrayList; -import java.util.List; - -//import org.apache.commons.lang.SerializationUtils; -//import org.jeromq.ZMQ; - -//import com.google.common.eventbus.AsyncEventBus; -import com.google.common.eventbus.EventBus; - -import ch.psi.fda.core.messages.DataQueue; -import ch.psi.fda.core.messages.EndOfStreamMessage; -import ch.psi.fda.core.messages.Message; - -/** - * Serialize data received by a DataQueue - * @author ebner - * - */ -public class DataDispatcher implements Runnable{ - - private DataQueue queue; - private List outQueues; - - private EventBus bus; - - public DataDispatcher(EventBus b, DataQueue queue){ - this.bus = b; - this.queue = queue; - this.outQueues = new ArrayList(); - } - - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - try{ - - -// ZMQ.Context context = ZMQ.context(); -// ZMQ.Socket socket = context.socket(ZMQ.PUB); -// socket.bind("tcp://*:9090"); -// socket.bind("inproc://visualize"); - - // TODO Need to synchronize message metadata -// for(DataQueue q: outQueues){ -// } - - // Dispatch Messages - Message message = queue.getQueue().take(); - while(!(message instanceof EndOfStreamMessage)){ - // Clone message ... - for(DataQueue q: outQueues){ - q.getQueue().put(message); - } - bus.post(message); -// socket.send(SerializationUtils.serialize(message)); - - // Read next message - message = queue.getQueue().take(); - } - - // Write end of stream message - for(DataQueue q: outQueues){ - q.getQueue().put(message); - } -// socket.send(SerializationUtils.serialize(message)); - bus.post(message); - -// socket.close(); -// context.term(); - - } catch (InterruptedException e) { - // TODO Stop loop and exit logic instead of throwing an Exception - throw new RuntimeException("Data serializer was interrupted while writing data to file",e); - } - } - - /** - * @return the outQueues - */ - public List getOutQueues() { - return outQueues; - } - - - -} diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/core/manipulator/Manipulator.java b/ch.psi.fda/src/main/java/ch/psi/fda/core/manipulator/Manipulator.java index e05a101..8315546 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/core/manipulator/Manipulator.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/core/manipulator/Manipulator.java @@ -20,7 +20,7 @@ package ch.psi.fda.core.manipulator; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; +import com.google.common.eventbus.EventBus; import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; @@ -35,19 +35,10 @@ import ch.psi.fda.core.messages.Message; */ public class Manipulator implements Runnable{ - /** - * Outgoing data queue - */ - private final DataQueue outQueue; + private EventBus bus; + private DataMessageMetadata metadata; - /** - * Incomming data queue - */ private final DataQueue queue; - - /** - * List of manipulations - */ private final List manipulations; /** @@ -56,32 +47,25 @@ public class Manipulator implements Runnable{ * @param manipulations */ // TODO need to support multiple (a list of) manipulation(s) - public Manipulator(DataQueue queue, List manipulations){ - + public Manipulator(EventBus b, DataQueue queue, List manipulations){ + this.bus = b; this.manipulations = manipulations; + this.queue = queue; // Create outgoing data metadata - DataMessageMetadata dmetadata = queue.getDataMessageMetadata().clone(); + metadata = queue.getDataMessageMetadata().clone(); // Initialize manipulations and create outgoing metadata for(Manipulation manipulation: this.manipulations){ - // Initialize manipulation -// manipulation.initialize(queue.getDataMessageMetadata()); - manipulation.initialize(dmetadata); + manipulation.initialize(metadata); // Add manipulation id to metadata - dmetadata.getComponents().add(new ComponentMetadata(manipulation.getId(),0)); // Calculated component always belongs to lowes dimension + metadata.getComponents().add(new ComponentMetadata(manipulation.getId(),0)); // Calculated component always belongs to lowes dimension } - - this.queue = queue; - this.outQueue = new DataQueue(new LinkedBlockingQueue(1000) , dmetadata ); // Create bounded queue to prevent running out of memory ... } - /** - * @return the outQueue - */ - public DataQueue getOutQueue() { - return outQueue; + public DataMessageMetadata getMetadata() { + return metadata; } /* (non-Javadoc) @@ -102,15 +86,14 @@ public class Manipulator implements Runnable{ } } - // Put message to outgoing queue ... - outQueue.getQueue().put(message); + bus.post(message); // Read next message message = queue.getQueue().take(); } // Write end of stream message - outQueue.getQueue().put(message); + bus.post(message); } catch (InterruptedException e) { diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXT.java b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXT.java index f517090..cc6db33 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXT.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/serializer/DataSerializerTXT.java @@ -25,8 +25,11 @@ import java.io.FileWriter; import java.io.IOException; import java.util.logging.Logger; +import com.google.common.eventbus.Subscribe; + import ch.psi.fda.core.messages.ComponentMetadata; import ch.psi.fda.core.messages.DataMessage; +import ch.psi.fda.core.messages.DataMessageMetadata; import ch.psi.fda.core.messages.DataQueue; import ch.psi.fda.core.messages.EndOfStreamMessage; import ch.psi.fda.core.messages.Message; @@ -43,19 +46,25 @@ public class DataSerializerTXT implements DataSerializer{ // Get Logger private static final Logger logger = Logger.getLogger(DataSerializerTXT.class.getName()); - private DataQueue queue; +// private DataQueue queue; private File file; private boolean appendSuffix = true; + + private boolean first = true; + private File outfile; + + private DataMessageMetadata meta; + /** * * @param queue * @param file * @param appendSuffix Flag whether to append a _0000 suffix after the original file name */ - public DataSerializerTXT(DataQueue queue, File file, boolean appendSuffix){ - this.queue = queue; + public DataSerializerTXT(DataMessageMetadata meta, File file, boolean appendSuffix){ + this.meta = meta; this.file = file; this.appendSuffix = appendSuffix; } @@ -65,171 +74,295 @@ public class DataSerializerTXT implements DataSerializer{ */ @Override public void run() { - try{ - - // WORKAROUND BEGIN - File outfile; -// if(appendSuffix){ -// // Append a count suffix to the file. If there is already a file with -// // this suffix increase the counter for the suffix -// int cnt = 0; -// String fname = this.file.getAbsolutePath(); // Determine file name -// String extension = fname.replaceAll("^.*\\.", ""); // Determine extension -// fname = fname.replaceAll("\\."+extension+"$", ""); +// try{ +// +// // WORKAROUND BEGIN +// +//// if(appendSuffix){ +//// // Append a count suffix to the file. If there is already a file with +//// // this suffix increase the counter for the suffix +//// int cnt = 0; +//// String fname = this.file.getAbsolutePath(); // Determine file name +//// String extension = fname.replaceAll("^.*\\.", ""); // Determine extension +//// fname = fname.replaceAll("\\."+extension+"$", ""); +//// +//// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); +//// +//// while(outfile.exists()){ +//// cnt++; +//// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); +//// } +//// } +//// else{ +//// outfile = this.file; +//// } +// // WORKAROUND END +// +// +// +// +// +// // Write header +// StringBuffer b = new StringBuffer(); +// StringBuffer b1 = new StringBuffer(); +// b.append("#"); +// b1.append("#"); +// for(ComponentMetadata c: queue.getDataMessageMetadata().getComponents()){ +// +// b.append(c.getId()); +// b.append("\t"); // -// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); -// -// while(outfile.exists()){ -// cnt++; -// outfile = new File(String.format("%s_%04d.%s", fname, cnt, extension)); +// b1.append(c.getDimension()); +// b1.append("\t"); +// } +// b.setCharAt(b.length()-1, '\n'); +// b1.setCharAt(b1.length()-1, '\n'); +// +// +// int icount = 0; +// boolean newfile = true; +// boolean dataInBetween = false; +// BufferedWriter writer = null; +// +// // Get basename of the file +// String basename = this.file.getAbsolutePath(); // Determine file name +// String extension = basename.replaceAll("^.*\\.", ""); // Determine extension +// basename = basename.replaceAll("\\."+extension+"$", ""); +// +// // Write data +// // Read Message +// Message message = queue.getQueue().take(); +// while(!(message instanceof EndOfStreamMessage)){ +// if(message instanceof DataMessage){ +// dataInBetween = true; +// if(newfile){ +// // Open new file and write header +// // Construct file name +// if(appendSuffix){ +// outfile = new File(String.format("%s_%04d.%s", basename, icount, extension)); +// } +// else{ +// outfile = new File(String.format("%s.%s", basename, extension)); +// } +// +// // Open file +// logger.fine("Open new data file: "+outfile.getAbsolutePath()); +// writer = new BufferedWriter(new FileWriter(outfile)); +// +// // Write header +// writer.write(b.toString()); +// writer.write(b1.toString()); +// +// newfile=false; +// } +// +// // Write message to file - each message will result in one line +// DataMessage m = (DataMessage) message; +// StringBuffer buffer = new StringBuffer(); +// for(Object o: m.getData()){ +// if(o.getClass().isArray()){ +// // If the array object is of type double[] display its content +// if(o instanceof double[]){ +// double[] oa = (double[]) o; +// for(double o1 : oa){ +// buffer.append(o1); +// buffer.append(" "); // Use space instead of tab +// } +// buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab +// } +// else if(o instanceof Object[]){ +// // TODO need to be recursive ... +// Object[] oa = (Object[])o; +// for(Object o1 : oa){ +// buffer.append(o1); +// buffer.append(" "); // Use space instead of tab +// } +// buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab +// } +// else{ +// buffer.append("-"); // Not supported +// } +// } +// else{ +// buffer.append(o); +// buffer.append("\t"); +// } +// } +// +// if(buffer.length()>0){ +// buffer.deleteCharAt(buffer.length()-1); // Remove last character (i.e. \t) +// buffer.append("\n"); // Append newline +// } +// writer.write(buffer.toString()); // } +// else if(message instanceof StreamDelimiterMessage){ +// StreamDelimiterMessage m = (StreamDelimiterMessage) message; +// logger.info("Delimiter - number: "+m.getNumber()+" iflag: "+m.isIflag()); +// if(m.isIflag() && appendSuffix){ +// // Only increase iflag counter if there was data in between +// // subsequent StreamDelimiterMessages. +// if(dataInBetween){ +// icount++; +// } +// dataInBetween = false; +// +// // Set flag to open new file +// newfile = true; +// +// // Close file +// writer.close(); +// } +// } +// +// // Read next message +// message = queue.getQueue().take(); // } -// else{ -// outfile = this.file; +// +// if(writer!=null){ +// // Close file +// writer.close(); //If the stream was closed previously this has no effect // } - // WORKAROUND END - - - - - +// // Writer can be null if a scan is defined without a dimension +// +// } catch (InterruptedException e) { +// // TODO Stop loop and exit logic instead of throwing an Exception +// throw new RuntimeException("Data serializer was interrupted while writing data to file",e); +// } catch (IOException e) { +// throw new RuntimeException("Data serializer had a problem writing to the specified file",e); +// } +// + } + + int icount; + String basename; + String extension; + boolean newfile; + boolean dataInBetween; + BufferedWriter writer; + StringBuffer b; + StringBuffer b1; + + @Subscribe + public void onMessage(Message message){ + try{ + if(first){ + first=false; // Write header - StringBuffer b = new StringBuffer(); - StringBuffer b1 = new StringBuffer(); - b.append("#"); - b1.append("#"); - for(ComponentMetadata c: queue.getDataMessageMetadata().getComponents()){ + b = new StringBuffer(); + b1 = new StringBuffer(); + b.append("#"); + b1.append("#"); + for(ComponentMetadata c: meta.getComponents()){ - b.append(c.getId()); - b.append("\t"); - - b1.append(c.getDimension()); - b1.append("\t"); - } - b.setCharAt(b.length()-1, '\n'); - b1.setCharAt(b1.length()-1, '\n'); - - - int icount = 0; - boolean newfile = true; - boolean dataInBetween = false; - BufferedWriter writer = null; - - // Get basename of the file - String basename = this.file.getAbsolutePath(); // Determine file name - String extension = basename.replaceAll("^.*\\.", ""); // Determine extension - basename = basename.replaceAll("\\."+extension+"$", ""); - - // Write data - // Read Message - Message message = queue.getQueue().take(); - while(!(message instanceof EndOfStreamMessage)){ - if(message instanceof DataMessage){ - dataInBetween = true; - if(newfile){ - // Open new file and write header - // Construct file name - if(appendSuffix){ - outfile = new File(String.format("%s_%04d.%s", basename, icount, extension)); - } - else{ - outfile = new File(String.format("%s.%s", basename, extension)); + b.append(c.getId()); + b.append("\t"); + + b1.append(c.getDimension()); + b1.append("\t"); } + b.setCharAt(b.length()-1, '\n'); + b1.setCharAt(b1.length()-1, '\n'); - // Open file - logger.fine("Open new data file: "+outfile.getAbsolutePath()); - writer = new BufferedWriter(new FileWriter(outfile)); - // Write header - writer.write(b.toString()); - writer.write(b1.toString()); - - newfile=false; - } - - // Write message to file - each message will result in one line - DataMessage m = (DataMessage) message; - StringBuffer buffer = new StringBuffer(); - for(Object o: m.getData()){ - if(o.getClass().isArray()){ - // If the array object is of type double[] display its content - if(o instanceof double[]){ - double[] oa = (double[]) o; - for(double o1 : oa){ - buffer.append(o1); - buffer.append(" "); // Use space instead of tab - } - buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab - } - else if(o instanceof Object[]){ - // TODO need to be recursive ... - Object[] oa = (Object[])o; - for(Object o1 : oa){ - buffer.append(o1); - buffer.append(" "); // Use space instead of tab - } - buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab - } - else{ - buffer.append("-"); // Not supported - } - } - else{ - buffer.append(o); - buffer.append("\t"); - } - } - - if(buffer.length()>0){ - buffer.deleteCharAt(buffer.length()-1); // Remove last character (i.e. \t) - buffer.append("\n"); // Append newline - } - writer.write(buffer.toString()); - } - else if(message instanceof StreamDelimiterMessage){ - StreamDelimiterMessage m = (StreamDelimiterMessage) message; - logger.info("Delimiter - number: "+m.getNumber()+" iflag: "+m.isIflag()); - if(m.isIflag() && appendSuffix){ - // Only increase iflag counter if there was data in between - // subsequent StreamDelimiterMessages. - if(dataInBetween){ - icount++; - } - dataInBetween = false; - - // Set flag to open new file + icount = 0; newfile = true; + dataInBetween = false; + writer = null; - // Close file - writer.close(); - } + // Get basename of the file + basename = this.file.getAbsolutePath(); // Determine file name + extension = basename.replaceAll("^.*\\.", ""); // Determine extension + basename = basename.replaceAll("\\."+extension+"$", ""); + } + + if(message instanceof DataMessage){ + dataInBetween = true; + if(newfile){ + // Open new file and write header + // Construct file name + if(appendSuffix){ + outfile = new File(String.format("%s_%04d.%s", basename, icount, extension)); + } + else{ + outfile = new File(String.format("%s.%s", basename, extension)); } - // Read next message - message = queue.getQueue().take(); + // Open file + logger.fine("Open new data file: "+outfile.getAbsolutePath()); + writer = new BufferedWriter(new FileWriter(outfile)); + + // Write header + writer.write(b.toString()); + writer.write(b1.toString()); + + newfile=false; } + // Write message to file - each message will result in one line + DataMessage m = (DataMessage) message; + StringBuffer buffer = new StringBuffer(); + for(Object o: m.getData()){ + if(o.getClass().isArray()){ + // If the array object is of type double[] display its content + if(o instanceof double[]){ + double[] oa = (double[]) o; + for(double o1 : oa){ + buffer.append(o1); + buffer.append(" "); // Use space instead of tab + } + buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab + } + else if(o instanceof Object[]){ + // TODO need to be recursive ... + Object[] oa = (Object[])o; + for(Object o1 : oa){ + buffer.append(o1); + buffer.append(" "); // Use space instead of tab + } + buffer.replace(buffer.length()-1,buffer.length()-1 , "\t"); // Replace last space with tab + } + else{ + buffer.append("-"); // Not supported + } + } + else{ + buffer.append(o); + buffer.append("\t"); + } + } + + if(buffer.length()>0){ + buffer.deleteCharAt(buffer.length()-1); // Remove last character (i.e. \t) + buffer.append("\n"); // Append newline + } + writer.write(buffer.toString()); + } + else if(message instanceof StreamDelimiterMessage){ + StreamDelimiterMessage m = (StreamDelimiterMessage) message; + logger.info("Delimiter - number: "+m.getNumber()+" iflag: "+m.isIflag()); + if(m.isIflag() && appendSuffix){ + // Only increase iflag counter if there was data in between + // subsequent StreamDelimiterMessages. + if(dataInBetween){ + icount++; + } + dataInBetween = false; + + // Set flag to open new file + newfile = true; + + // Close file + writer.close(); + } + } + else if (message instanceof EndOfStreamMessage){ if(writer!=null){ // Close file writer.close(); //If the stream was closed previously this has no effect } - // Writer can be null if a scan is defined without a dimension - - } catch (InterruptedException e) { - // TODO Stop loop and exit logic instead of throwing an Exception - throw new RuntimeException("Data serializer was interrupted while writing data to file",e); + } } catch (IOException e) { throw new RuntimeException("Data serializer had a problem writing to the specified file",e); } - } - - -// /** -// * Enable/disable the generation of the _0000 suffix in front of the extension of the out file -// * @param appendSuffix the appendSuffix to set -// */ -// public void setAppendSuffix(boolean appendSuffix) { -// this.appendSuffix = appendSuffix; -// } }