reorganized package structure for new dataacquisition engine

This commit is contained in:
2013-07-30 11:36:26 +02:00
parent e64d341a38
commit f84a5696b0
9 changed files with 197 additions and 37 deletions

View File

@@ -35,6 +35,8 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import ch.psi.fda.aq.ng.AcquisitionEngineNG;
import ch.psi.fda.aq.ng.ScanMapperNG;
import ch.psi.fda.core.ActionLoop;
import ch.psi.fda.core.Actor;
import ch.psi.fda.core.EngineConfiguration;
@@ -127,6 +129,8 @@ import ch.psi.jcae.impl.DefaultChannelService;
*/
public class Acquisition {
private static boolean NEW_ENGINE = true;
// Get Logger
private static Logger logger = Logger.getLogger(Acquisition.class.getName());
@@ -134,6 +138,8 @@ public class Acquisition {
private ActionLoop actionLoop;
private Collector collector;
private ScanMapperNG mapper;
private AcquisitionEngineNG acquisitionEngine;
private DataDispatcher dispatcher;
private Manipulator manipulator;
private DataSerializerTXT serializer;
@@ -240,20 +246,29 @@ public class Acquisition {
// Configure core engine
EngineConfiguration.getInstance().setFailOnSensorError(smodel.isFailOnSensorError());
if(NEW_ENGINE){
acquisitionEngine = new AcquisitionEngineNG(cservice);
mapper = new ScanMapperNG();
mapper.map(smodel);
this.manipulator = new Manipulator(new DataQueue( acquisitionEngine.getOutQueue(), mapper.getDataMessageMetadata()), this.manipulations);
}
else{
logger.fine("Map Model to internal logic");
// Map scan to base model
// After this call actionLoop and collector will be initialized
mapScan(smodel);
logger.fine("ActionLoop and Collector initialized");
// TODO Remove this workaround
Collections.reverse(collector.getQueues());
// Add manipulator into processing chain
this.manipulator = new Manipulator(collector.getOutQueue(), this.manipulations);
}
logger.fine("Map Model to internal logic");
// Map scan to base model
// After this call actionLoop and collector will be initialized
mapScan(smodel);
logger.fine("ActionLoop and Collector initialized");
// TODO Remove this workaround
Collections.reverse(collector.getQueues());
// Add manipulator into processing chain
this.manipulator = new Manipulator(collector.getOutQueue(), this.manipulations);
// // Insert dispatcher into processing chain
this.dispatcher = new DataDispatcher(manipulator.getOutQueue());
@@ -287,8 +302,11 @@ public class Acquisition {
try{
active = true;
Thread tc = new Thread(collector);
tc.start();
Thread tc = null;
if(!NEW_ENGINE){
tc = new Thread(collector);
tc.start();
}
Thread tm = new Thread(manipulator);
tm.start();
@@ -299,15 +317,25 @@ public class Acquisition {
Thread t = new Thread(serializer);
t.start();
actionLoop.prepare();
actionLoop.execute();
actionLoop.cleanup();
if(NEW_ENGINE){
acquisitionEngine.execute(mapper.getResourceDescriptors(), mapper.getScript());
}
else {
actionLoop.prepare();
actionLoop.execute();
actionLoop.cleanup();
}
// Wait for data collector threads
// Do this with a Latch or something like that
// Give the threads 1 minute to catch up
tc.join(60000);
if(!NEW_ENGINE){
tc.join(60000);
}
tm.join(60000);
td.join(60000);
t.join(60000);

View File

@@ -16,11 +16,13 @@
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.aq;
package ch.psi.fda.aq.ng;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
@@ -28,6 +30,9 @@ import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.core.messages.Message;
import ch.psi.jcae.Channel;
import ch.psi.jcae.ChannelDescriptor;
import ch.psi.jcae.ChannelException;
@@ -42,9 +47,11 @@ public class AcquisitionEngineNG {
private static final Logger logger = Logger.getLogger(AcquisitionEngineNG.class.getName());
private ChannelService cservice;
private BlockingQueue<Message> outQueue;
public AcquisitionEngineNG(ChannelService cservice) {
this.cservice = cservice;
this.outQueue = new LinkedBlockingQueue<Message>(1000); // Create bounded queue to prevent running out of memory ...
// Workaround for Jython memory leak
// http://blog.hillbrecht.de/2009/07/11/jython-memory-leakout-of-memory-problem/
@@ -52,6 +59,12 @@ public class AcquisitionEngineNG {
}
public void execute(Map<String, ?> resourceDescriptors, String script){
// TODO Remove debugging messages
// For debugging purpose only
logger.info("Resources to create: "+resourceDescriptors);
logger.info("Script to execute: "+script);
try {
ScriptEngine engine = new ScriptEngineManager().getEngineByName("python");
@@ -73,16 +86,17 @@ public class AcquisitionEngineNG {
}
else if(resourceDescriptors.get(k) instanceof ProbeDescriptor){
ProbeDescriptor descriptor = (ProbeDescriptor) resourceDescriptors.get(k);
List<Channel<?>> descriptors = new ArrayList<>();
List<Channel<?>> channels = new ArrayList<>();
for(ChannelDescriptor<?> s: descriptor.getSensors()){
try {
descriptors.add(cservice.createChannel(s));
channels.add(cservice.createChannel(s));
} catch (ChannelException | InterruptedException | TimeoutException e) {
throw new RuntimeException("Unable to create resource for channel: "+s.getName(),e);
}
}
engine.put(k, new ChannelProbeResource(descriptors));
engine.put(k, new ChannelProbeResource(channels, outQueue));
}
else{
throw new RuntimeException("Resource type not supported: "+resourceDescriptors.get(k).getClass().getName());
@@ -97,5 +111,9 @@ public class AcquisitionEngineNG {
throw new RuntimeException("Action failed while executing the Jython script",e);
}
}
public BlockingQueue<Message> getOutQueue(){
return outQueue;
}
}

View File

@@ -16,14 +16,17 @@
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.aq;
package ch.psi.fda.aq.ng;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import ch.psi.fda.core.messages.DataMessage;
import ch.psi.fda.core.messages.EndOfStreamMessage;
import ch.psi.fda.core.messages.Message;
import ch.psi.fda.core.messages.StreamDelimiterMessage;
import ch.psi.jcae.Channel;
import ch.psi.jcae.ChannelException;
@@ -34,12 +37,16 @@ import ch.psi.jcae.ChannelException;
public class ChannelProbeResource {
private final List<Channel<?>> channels;
private BlockingQueue<Message> queue;
public ChannelProbeResource(List<Channel<?>> channels){
public ChannelProbeResource(List<Channel<?>> channels, BlockingQueue<Message> queue){
this.channels = channels;
this.queue = queue;
}
/**
* Read configured channels and send out data message
*/
public void read(){
try{
DataMessage message = new DataMessage();
@@ -50,9 +57,38 @@ public class ChannelProbeResource {
message.getData().add(o);
}
System.out.println("MESSAGE: "+message);
queue.put(message);
} catch (InterruptedException | TimeoutException | ChannelException | ExecutionException e) {
throw new RuntimeException(e);
}
}
/**
* Send a stream delimiter message for the given index
* @param index
* @param iflag
*/
public void delimiter(int index, boolean iflag){
try {
StreamDelimiterMessage message = new StreamDelimiterMessage(index, iflag);
System.out.println(message);
queue.put(message);
} catch (InterruptedException e) {
throw new RuntimeException("Unable to send delimiter message", e);
}
}
/**
* Send end of stream message
*/
public void terminateStream(){
try {
EndOfStreamMessage message = new EndOfStreamMessage();
System.out.println(message);
queue.put(message);
} catch (InterruptedException e) {
throw new RuntimeException("Unable to send end of stream message", e);
}
}
}

View File

@@ -16,7 +16,7 @@
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.aq;
package ch.psi.fda.aq.ng;
import java.util.ArrayList;
import java.util.List;

View File

@@ -16,7 +16,7 @@
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.aq;
package ch.psi.fda.aq.ng;
import java.util.ArrayList;
import java.util.HashMap;
@@ -27,6 +27,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.logging.Logger;
import ch.psi.fda.core.messages.ComponentMetadata;
import ch.psi.fda.core.messages.DataMessageMetadata;
import ch.psi.fda.model.v1.Action;
import ch.psi.fda.model.v1.ArrayPositioner;
import ch.psi.fda.model.v1.ChannelAction;
@@ -37,6 +39,8 @@ import ch.psi.fda.model.v1.DiscreteStepDimension;
import ch.psi.fda.model.v1.DiscreteStepPositioner;
import ch.psi.fda.model.v1.FunctionPositioner;
import ch.psi.fda.model.v1.LinearPositioner;
import ch.psi.fda.model.v1.PseudoPositioner;
import ch.psi.fda.model.v1.RegionPositioner;
import ch.psi.fda.model.v1.Scan;
import ch.psi.fda.model.v1.ScriptAction;
import ch.psi.fda.model.v1.ShellAction;
@@ -65,6 +69,8 @@ public class ScanMapperNG {
private Map<String, Object> resourceDescriptors = new HashMap<>();
private ProbeDescriptor probeDescriptor = new ProbeDescriptor();
private DataMessageMetadata dataMessageMetadata = new DataMessageMetadata();
private String indentation = "";
@@ -124,6 +130,14 @@ public class ScanMapperNG {
return resourceDescriptors;
}
/**
* TODO need to be moved somewhere else
* @return
*/
public DataMessageMetadata getDataMessageMetadata(){
return dataMessageMetadata;
}
private void mapDimensions(List<Dimension> dimensions, int index) {
if(dimensions.size()<1){
@@ -161,12 +175,29 @@ public class ScanMapperNG {
// Use readback
probeDescriptor.getSensors().add(new ChannelDescriptor<>(type, positioner.getReadback(), false));
}
System.out.println(positioner.getId());
dataMessageMetadata.getComponents().add(new ComponentMetadata(positioner.getId(), (dimensions.size()-1-index)));
// TODO Done resource
if(positioner instanceof LinearPositioner){
// Calculate steps and add it to position array
LinearPositioner lpositioner = (LinearPositioner) positioner;
boolean first = true;
StringBuffer b = new StringBuffer();
b.append("[");
for(double i=0;(lpositioner.getStart()+i*lpositioner.getStepSize())<=lpositioner.getEnd();i++){
if(first){
first=false;
}
else{
b.append(",");
}
b.append(String.format("%f", (lpositioner.getStart()+i*lpositioner.getStepSize())));
}
b.append("]");
script.append(indentation + var+"_positions="+b.toString()+"\n");
}
else if(positioner instanceof ArrayPositioner){
ArrayPositioner apositioner = (ArrayPositioner) positioner;
@@ -192,6 +223,12 @@ public class ScanMapperNG {
FunctionPositioner fpositioner = (FunctionPositioner) positioner;
// TODO take variable mappings into account !
}
else if(positioner instanceof PseudoPositioner){
// TODO implement
}
else if(positioner instanceof RegionPositioner){
// TODO implement
}
}
// Create dimension loop
@@ -208,6 +245,7 @@ public class ScanMapperNG {
mapActions(d.getAction());
// TODO map guard and sensors
// Check whether final dimension reached
if ((index + 1) < dimensions.size()) {
@@ -219,8 +257,17 @@ public class ScanMapperNG {
script.append(indentation + PROBE_RESOURCE_ID+".read()\n");
}
indentation = indentation.replaceFirst(INDENT, ""); // decrease
// indentation
indentation = indentation.replaceFirst(INDENT, ""); // decrease indentation
script.append(indentation + PROBE_RESOURCE_ID+".delimiter("+(dimensions.size()-1-index)+", "+(d.isDataGroup()?"True":"False")+")\n");
// send delimiter for this dimension
// if lowest dimension end of stream message
// if highest dimension no delimiter
if(index==0){
script.append(indentation + PROBE_RESOURCE_ID+".terminateStream()\n");
}
mapActions(d.getPostAction());
} else if (dimension instanceof ContinuousDimension) {
// ContinuousDimension d = (ContinuousDimension) dimension;
@@ -353,4 +400,5 @@ public class ScanMapperNG {
resourceDescriptors.put(varname, new ChannelDescriptor<>(type, channelName, monitor));
return varname;
}
}

View File

@@ -16,7 +16,7 @@
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.aq;
package ch.psi.fda.aq.ng;
/**
* Resource descriptor of an operating system shell.

View File

@@ -16,7 +16,7 @@
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.aq;
package ch.psi.fda.aq.ng;
import java.io.BufferedReader;
import java.io.IOException;

View File

@@ -16,7 +16,7 @@
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.aq;
package ch.psi.fda.aq.ng;
import java.util.logging.Logger;
@@ -25,6 +25,8 @@ import org.junit.Before;
import org.junit.Test;
import ch.psi.fda.TestChannels;
import ch.psi.fda.aq.ng.AcquisitionEngineNG;
import ch.psi.fda.aq.ng.ScanMapperNG;
import ch.psi.fda.model.v1.ArrayPositioner;
import ch.psi.fda.model.v1.ChannelAction;
import ch.psi.fda.model.v1.Configuration;
@@ -60,7 +62,7 @@ public class AcquisitionEngineNGTest {
}
/**
* Test method for {@link ch.psi.fda.aq.AcquisitionEngineNG#execute(java.lang.String)}.
* Test method for {@link ch.psi.fda.aq.ng.AcquisitionEngineNG#execute(java.lang.String)}.
*/
@Test
public void testExecuteChannelAction() {

View File

@@ -16,7 +16,7 @@
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.aq;
package ch.psi.fda.aq.ng;
import static org.junit.Assert.*;
@@ -28,10 +28,12 @@ import org.junit.Before;
import org.junit.Test;
import ch.psi.fda.TestChannels;
import ch.psi.fda.aq.ng.ScanMapperNG;
import ch.psi.fda.model.v1.ArrayPositioner;
import ch.psi.fda.model.v1.ChannelAction;
import ch.psi.fda.model.v1.Configuration;
import ch.psi.fda.model.v1.DiscreteStepDimension;
import ch.psi.fda.model.v1.LinearPositioner;
import ch.psi.fda.model.v1.Scan;
import ch.psi.fda.model.v1.ScriptAction;
import ch.psi.fda.model.v1.ShellAction;
@@ -61,7 +63,7 @@ public class ScanMapperNGTest {
}
/**
* Test method for {@link ch.psi.fda.aq.ScanMapperNG#mapActions(java.util.List)}.
* Test method for {@link ch.psi.fda.aq.ng.ScanMapperNG#mapActions(java.util.List)}.
*/
@Test
public void testMap() {
@@ -101,6 +103,32 @@ public class ScanMapperNGTest {
config.getScan().getDimension().add(d);
mapper.map(config);
String script = mapper.getScript();
System.out.println(script);
}
@Test
public void testMapLinearPositioner() {
Configuration config = new Configuration();
config.setScan(new Scan());
DiscreteStepDimension d = new DiscreteStepDimension();
LinearPositioner positioner = new LinearPositioner();
positioner.setName(TestChannels.ANALOG_OUT);
positioner.setStart(0.1);
positioner.setEnd(2.2);
positioner.setStepSize(0.1);
d.getPositioner().add(positioner);
config.getScan().getDimension().add(d);
// config.getScan().getDimension().add(d);
mapper.map(config);
String script = mapper.getScript();