Added some things ...

This commit is contained in:
2015-08-17 08:11:40 +02:00
parent 301b8cb2be
commit fee224f6f2
4 changed files with 36 additions and 132 deletions
+1 -1
View File
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ch.psi.fda</groupId>
<artifactId>ch.psi.fda.xscan</artifactId>
<version>2.4.0_zmq</version>
<version>2.4.1_zmq</version>
<dependencies>
@@ -1080,7 +1080,7 @@ public class Acquisition {
if(hcrOnly){
// There are no additional channels to be read out while taking data via hcrlogic
// Therefor we just register the hcr loop as action loop
// Therefore we just register the hcr loop as action loop
aLoop = actionLoop;
}
@@ -66,7 +66,7 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
*/
private boolean dataGroup = false;
private boolean keepTmpFiles = false;
// private boolean keepTmpFiles = false;
private volatile boolean stopReadoutThread = false;
private Thread readoutThread;
@@ -157,7 +157,7 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
* @param inreader
* @throws IOException
*/
private void receive() {
public void receive() {
MainHeader mainHeader;
@@ -166,13 +166,18 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
} catch (IOException e) {
throw new RuntimeException("Unable to decode main header", e);
}
if(!socket.hasReceiveMore()){
throw new RuntimeException("There is no data submessage");
}
byte[] bytes = socket.recv();
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
DataMessage message = new DataMessage(metadata);
boolean use = true;
for(int i=0; i<mainHeader.getElements() && socket.hasReceiveMore(); i++){
byte[] bytes = socket.recv();
Double raw = ByteBuffer.wrap(bytes).getDouble();
for(int i=0; i<mainHeader.getElements(); i++){
Double raw = byteBuffer.getDouble();
Double val;
if(i==0){
@@ -199,8 +204,9 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
message.getData().add(val);
}
// Drain remaining messages
int n = drain();
int n = drainHangingSubmessages();
if(n>1){
throw new RuntimeException("More than 1 message drained from stream: "+n);
}
@@ -209,99 +215,6 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
eventbus.post(message);
}
// BufferedReader in = new BufferedReader(inreader);
// String line;
// boolean firstline = true;
// int linecount=0;
//// int mcounter=0;
//
//// boolean wasInRangeBefore = false;
// boolean discardAnyway = false;
//
// while (true) {
// line = in.readLine();
// linecount++;
// if (line == null) {
// break;
// } else {
// // if(line.matches("^\\[.*")){
// if (line.matches("^ *#.*")) {
// // Skip header/comment lines
//// logger.info("HEADER: " + line);
// } else {
// if (firstline) {
// firstline = false;
// continue;
// }
//
//// logger.info(line);
//
// // Write into queue
// DataMessage message = new DataMessage(metadata);
// String[] tokens = line.split("\t");
// boolean use = true;
//
// for(int i=0;i<tokens.length;i++){
// String t = tokens[i];
// Double val;
//
// if(i==0){
// Double raw = new Double(t);
//
// if(useEncoder){
// val = crlogicDataFilter.calculatePositionMotorUseEncoder(raw);
// }
// else if(useReadback){
// val = crlogicDataFilter.calculatePositionMotorUseReadback(raw);
// }
// else{
// val = crlogicDataFilter.calculatePositionMotor(raw);
// }
//
// // Check whether data is within the configured range - otherwise drop data
// use = crlogicDataFilter.filter(val);
//// if(!use){
//// break;
//// }
// }
// else if(scalerIndices.containsKey(i)){
// CrlogicDeltaDataFilter f = scalerIndices.get(i);
// val = f.delta(new Double(t));
// }
// else{
// val = new Double(t);
// }
//
//
// message.getData().add(val);
// }
//
// // Does not work if zigzag, ...
//// // Use this to filter out motor retry movements at the end of the scan
//// wasInRangeBefore = wasInRangeBefore | use;
//// if(!use && wasInRangeBefore){
//// discardAnyway=true;
//// // Optimization - terminate read loop once range is left
//// logger.info("Terminate read loop because point is outside range");
//// break;
//// }
//
// // Filter data
// if(use && !discardAnyway){
// eventbus.post(message);
// mcounter++;
// }
//
//
// }
// }
// }
// in.close();
// inreader.close();
// logger.info("Lines read: "+linecount+" Messages generated (after filtering): "+mcounter);
}
@@ -327,14 +240,6 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
executionThread = Thread.currentThread();
}
// TODO each actuator will result in an additional sensor (at the beginning)
// Dependent on actuator settings (readback use encoder, ...)
// TODO filename generation?
// final String tmpFileName = "tmp-"+System.currentTimeMillis()+".txt";
Long timeout = 600000l; // 10 minutes move timeout
// Check if logic is inactive, otherwise return early
@@ -358,7 +263,6 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
}
}
logger.info("Set parameters");
template.getNfsServer().setValue("");
template.getNfsShare().setValue("");
@@ -544,9 +448,7 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
motortemplate.getVelocity().setValue(backupSpeed);
motortemplate.getBacklashDistance().setValue(backupBacklash);
}
// Request read of data file
// readQueue.put(tmpFileName);
if(zigZag){
// reverse start/end
@@ -590,7 +492,8 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
metadata.add(new Metadata(s.getId()));
}
// Clear interrupted state
Thread.interrupted();
stopReadoutThread = false;
// Start readout Thread
readoutThread = new Thread(new Runnable() {
@@ -600,21 +503,6 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
connect(ioc);
while(!stopReadoutThread){
receive();
// String file;
// try {
//// file = readQueue.take();
// } catch (InterruptedException e) {
// break;
// }
// TODO Read file and
// try {
// collectData(smbShare, file);
// } catch (InterruptedException e) {
// throw new RuntimeException("Unable to read CRLOGIC raw data file",e);
// } catch (IOException e) {
// throw new RuntimeException("Unable to read CRLOGIC raw data file",e);
// }
}
close();
@@ -742,6 +630,9 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
}
public void connect(String address) {
// Clear interrupted state
Thread.interrupted();
logger.info("Connecting with IOC"+address);
context = ZMQ.context(1);
socket = context.socket(ZMQ.PULL);
@@ -750,6 +641,9 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
}
public void close() {
// Interrupt the readout thread to release the read block
readoutThread.interrupt();
logger.info("Closing stream from IOC "+ioc);
socket.close();
context.close();
@@ -757,7 +651,7 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
context = null;
}
private int drain() {
private int drainHangingSubmessages() {
int count = 0;
while (socket.hasReceiveMore()) {
// is there a way to avoid copying data to user space here?
@@ -808,11 +702,10 @@ public class CrlogicLoopStream extends CrlogicLoop implements ActionLoop {
}
public boolean isKeepTmpFiles() {
return keepTmpFiles;
return false;
}
public void setKeepTmpFiles(boolean keepTmpFiles) {
this.keepTmpFiles = keepTmpFiles;
}
public EventBus getEventBus(){
@@ -95,4 +95,15 @@ public class CrlogicLoopStreamTest {
cservice.destroy();
}
@Test
public void testReceive(){
ChannelService cservice = new DefaultChannelService();
CrlogicLoopStream crlogic = new CrlogicLoopStream(cservice, "X05LA-ES2-CRL", "localhost", false);
crlogic.connect("localhost");
crlogic.receive();
crlogic.close();
}
}