added local file support for crlogic temp file read

This commit is contained in:
2014-10-10 11:03:51 +02:00
parent 33fc243671
commit d5cc43c2d1
2 changed files with 119 additions and 81 deletions
+1 -1
View File
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ch.psi.fda</groupId>
<artifactId>ch.psi.fda.xscan</artifactId>
<version>2.3.5</version>
<version>2.3.9</version>
<dependencies>
@@ -20,6 +20,8 @@
package ch.psi.fda.core.loops.cr;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
@@ -187,8 +189,9 @@ public class CrlogicLoop implements ActionLoop {
*/
private void collectData(String tmpDir, String tmpFileName) throws InterruptedException, IOException {
semaphore.acquire();
InputStreamReader inreader;
if (tmpDir.startsWith("smb:")) {
SmbFile tmpFile = new SmbFile(tmpDir, tmpFileName);
logger.info("Collect data from " + tmpFile.getCanonicalPath());
@@ -210,75 +213,123 @@ public class CrlogicLoop implements ActionLoop {
}
}
InputStreamReader inreader = new InputStreamReader(tmpFile.getInputStream());
BufferedReader in = new BufferedReader(inreader);
String line;
boolean firstline = true;
int linecount=0;
int mcounter=0;
inreader = new InputStreamReader(tmpFile.getInputStream());
readData(inreader);
// Remove temporary file
if(!keepTmpFiles){
tmpFile.delete();
}
}
else{
File tmpFile = new File(tmpDir, tmpFileName);
logger.info("Collect data from " + tmpFile.getCanonicalPath());
File lockfile = new File(tmpFile.getCanonicalPath() + ".lock");
logger.info("Wait until file is written [lock file: " + lockfile.getCanonicalPath() + "]");
// Wait until file is created
while ((!tmpFile.exists()) || lockfile.exists()) {
try{
Thread.sleep(100);
}
catch(InterruptedException e){
abort=true;
}
if(abort){
// If abort is issued while waiting for data immediately return without
// trying to read the data
return;
}
}
inreader = new InputStreamReader(new FileInputStream(tmpFile));
readData(inreader);
// Remove temporary file
if(!keepTmpFiles){
tmpFile.delete();
}
}
// Issue end of loop control message
eventbus.post(new EndOfStreamMessage(dataGroup));
semaphore.release();
}
/**
* @param inreader
* @throws IOException
*/
private void readData(InputStreamReader inreader) throws IOException {
BufferedReader in = new BufferedReader(inreader);
String line;
boolean firstline = true;
int linecount=0;
int mcounter=0;
// boolean wasInRangeBefore = false;
boolean discardAnyway = false;
while (true) {
line = in.readLine();
linecount++;
if (line == null) {
break;
} else {
// if(line.matches("^\\[.*")){
if (line.matches("^ *#.*")) {
// Skip header/comment lines
boolean discardAnyway = false;
while (true) {
line = in.readLine();
linecount++;
if (line == null) {
break;
} else {
// if(line.matches("^\\[.*")){
if (line.matches("^ *#.*")) {
// Skip header/comment lines
// logger.info("HEADER: " + line);
} else {
if (firstline) {
firstline = false;
continue;
}
} else {
if (firstline) {
firstline = false;
continue;
}
// logger.info(line);
// Write into queue
DataMessage message = new DataMessage(metadata);
String[] tokens = line.split("\t");
boolean use = true;
// Write into queue
DataMessage message = new DataMessage(metadata);
String[] tokens = line.split("\t");
boolean use = true;
for(int i=0;i<tokens.length;i++){
String t = tokens[i];
Double val;
for(int i=0;i<tokens.length;i++){
String t = tokens[i];
Double val;
if(i==0){
Double raw = new Double(t);
if(i==0){
Double raw = new Double(t);
if(useEncoder){
val = crlogicDataFilter.calculatePositionMotorUseEncoder(raw);
}
else if(useReadback){
val = crlogicDataFilter.calculatePositionMotorUseReadback(raw);
}
else{
val = crlogicDataFilter.calculatePositionMotor(raw);
}
// Check whether data is within the configured range - otherwise drop data
use = crlogicDataFilter.filter(val);
if(useEncoder){
val = crlogicDataFilter.calculatePositionMotorUseEncoder(raw);
}
else if(useReadback){
val = crlogicDataFilter.calculatePositionMotorUseReadback(raw);
}
else{
val = crlogicDataFilter.calculatePositionMotor(raw);
}
// Check whether data is within the configured range - otherwise drop data
use = crlogicDataFilter.filter(val);
// if(!use){
// break;
// }
}
else if(scalerIndices.containsKey(i)){
CrlogicDeltaDataFilter f = scalerIndices.get(i);
val = f.delta(new Double(t));
}
else{
val = new Double(t);
}
message.getData().add(val);
}
else if(scalerIndices.containsKey(i)){
CrlogicDeltaDataFilter f = scalerIndices.get(i);
val = f.delta(new Double(t));
}
else{
val = new Double(t);
}
// Does not work if zigzag, ...
message.getData().add(val);
}
// Does not work if zigzag, ...
// // Use this to filter out motor retry movements at the end of the scan
// wasInRangeBefore = wasInRangeBefore | use;
// if(!use && wasInRangeBefore){
@@ -287,34 +338,21 @@ public class CrlogicLoop implements ActionLoop {
// logger.info("Terminate read loop because point is outside range");
// break;
// }
// Filter data
if(use && !discardAnyway){
eventbus.post(message);
mcounter++;
}
// Filter data
if(use && !discardAnyway){
eventbus.post(message);
mcounter++;
}
}
}
in.close();
inreader.close();
logger.info("Lines read: "+linecount+" Messages generated (after filtering): "+mcounter);
// Remove temporary file
if(!keepTmpFiles){
tmpFile.delete();
}
}
else{
// TODO - File in local file system
}
in.close();
inreader.close();
// Issue end of loop control message
eventbus.post(new EndOfStreamMessage(dataGroup));
semaphore.release();
logger.info("Lines read: "+linecount+" Messages generated (after filtering): "+mcounter);
}