diff --git a/ch.psi.fda.xscan/pom.xml b/ch.psi.fda.xscan/pom.xml index 98f5ef5..4eaf1e6 100644 --- a/ch.psi.fda.xscan/pom.xml +++ b/ch.psi.fda.xscan/pom.xml @@ -3,7 +3,7 @@ 4.0.0 ch.psi.fda ch.psi.fda.xscan - 2.3.5 + 2.3.9 diff --git a/ch.psi.fda.xscan/src/main/java/ch/psi/fda/core/loops/cr/CrlogicLoop.java b/ch.psi.fda.xscan/src/main/java/ch/psi/fda/core/loops/cr/CrlogicLoop.java index 46d4481..ec51393 100644 --- a/ch.psi.fda.xscan/src/main/java/ch/psi/fda/core/loops/cr/CrlogicLoop.java +++ b/ch.psi.fda.xscan/src/main/java/ch/psi/fda/core/loops/cr/CrlogicLoop.java @@ -20,6 +20,8 @@ package ch.psi.fda.core.loops.cr; import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; @@ -187,8 +189,9 @@ public class CrlogicLoop implements ActionLoop { */ private void collectData(String tmpDir, String tmpFileName) throws InterruptedException, IOException { semaphore.acquire(); + + InputStreamReader inreader; if (tmpDir.startsWith("smb:")) { - SmbFile tmpFile = new SmbFile(tmpDir, tmpFileName); logger.info("Collect data from " + tmpFile.getCanonicalPath()); @@ -210,75 +213,123 @@ public class CrlogicLoop implements ActionLoop { } } - InputStreamReader inreader = new InputStreamReader(tmpFile.getInputStream()); - BufferedReader in = new BufferedReader(inreader); - String line; - boolean firstline = true; - int linecount=0; - int mcounter=0; + inreader = new InputStreamReader(tmpFile.getInputStream()); + readData(inreader); + // Remove temporary file + if(!keepTmpFiles){ + tmpFile.delete(); + } + } + else{ + File tmpFile = new File(tmpDir, tmpFileName); + logger.info("Collect data from " + tmpFile.getCanonicalPath()); + + File lockfile = new File(tmpFile.getCanonicalPath() + ".lock"); + + logger.info("Wait until file is written [lock file: " + lockfile.getCanonicalPath() + "]"); + // Wait until file is created + while ((!tmpFile.exists()) || lockfile.exists()) { + try{ + Thread.sleep(100); + } + catch(InterruptedException e){ + abort=true; + } + if(abort){ + // If abort is issued while waiting for data immediately return without + // trying to read the data + return; + } + } + + inreader = new InputStreamReader(new FileInputStream(tmpFile)); + readData(inreader); + + // Remove temporary file + if(!keepTmpFiles){ + tmpFile.delete(); + } + } + + // Issue end of loop control message + eventbus.post(new EndOfStreamMessage(dataGroup)); + semaphore.release(); + } + + /** + * @param inreader + * @throws IOException + */ + private void readData(InputStreamReader inreader) throws IOException { + BufferedReader in = new BufferedReader(inreader); + String line; + boolean firstline = true; + int linecount=0; + int mcounter=0; + // boolean wasInRangeBefore = false; - boolean discardAnyway = false; - - while (true) { - line = in.readLine(); - linecount++; - if (line == null) { - break; - } else { - // if(line.matches("^\\[.*")){ - if (line.matches("^ *#.*")) { - // Skip header/comment lines + boolean discardAnyway = false; + + while (true) { + line = in.readLine(); + linecount++; + if (line == null) { + break; + } else { + // if(line.matches("^\\[.*")){ + if (line.matches("^ *#.*")) { + // Skip header/comment lines // logger.info("HEADER: " + line); - } else { - if (firstline) { - firstline = false; - continue; - } + } else { + if (firstline) { + firstline = false; + continue; + } // logger.info(line); - // Write into queue - DataMessage message = new DataMessage(metadata); - String[] tokens = line.split("\t"); - boolean use = true; + // Write into queue + DataMessage message = new DataMessage(metadata); + String[] tokens = line.split("\t"); + boolean use = true; + + for(int i=0;i