From f84a5696b07cfa13be5e11f18919ad76d2daa88d Mon Sep 17 00:00:00 2001 From: Simon Ebner Date: Tue, 30 Jul 2013 11:36:26 +0200 Subject: [PATCH] reorganized package structure for new dataacquisition engine --- .../main/java/ch/psi/fda/aq/Acquisition.java | 66 +++++++++++++------ .../fda/aq/{ => ng}/AcquisitionEngineNG.java | 26 ++++++-- .../fda/aq/{ => ng}/ChannelProbeResource.java | 44 +++++++++++-- .../psi/fda/aq/{ => ng}/ProbeDescriptor.java | 2 +- .../ch/psi/fda/aq/{ => ng}/ScanMapperNG.java | 54 ++++++++++++++- .../psi/fda/aq/{ => ng}/ShellDescriptor.java | 2 +- .../ch/psi/fda/aq/{ => ng}/ShellResource.java | 2 +- .../aq/{ => ng}/AcquisitionEngineNGTest.java | 6 +- .../psi/fda/aq/{ => ng}/ScanMapperNGTest.java | 32 ++++++++- 9 files changed, 197 insertions(+), 37 deletions(-) rename ch.psi.fda/src/main/java/ch/psi/fda/aq/{ => ng}/AcquisitionEngineNG.java (78%) rename ch.psi.fda/src/main/java/ch/psi/fda/aq/{ => ng}/ChannelProbeResource.java (56%) rename ch.psi.fda/src/main/java/ch/psi/fda/aq/{ => ng}/ProbeDescriptor.java (97%) rename ch.psi.fda/src/main/java/ch/psi/fda/aq/{ => ng}/ScanMapperNG.java (87%) rename ch.psi.fda/src/main/java/ch/psi/fda/aq/{ => ng}/ShellDescriptor.java (97%) rename ch.psi.fda/src/main/java/ch/psi/fda/aq/{ => ng}/ShellResource.java (98%) rename ch.psi.fda/src/test/java/ch/psi/fda/aq/{ => ng}/AcquisitionEngineNGTest.java (96%) rename ch.psi.fda/src/test/java/ch/psi/fda/aq/{ => ng}/ScanMapperNGTest.java (76%) diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java index 6a1ef4e..9478a4c 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/Acquisition.java @@ -35,6 +35,8 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.logging.SimpleFormatter; +import ch.psi.fda.aq.ng.AcquisitionEngineNG; +import ch.psi.fda.aq.ng.ScanMapperNG; import ch.psi.fda.core.ActionLoop; import ch.psi.fda.core.Actor; import ch.psi.fda.core.EngineConfiguration; @@ -127,6 +129,8 @@ import ch.psi.jcae.impl.DefaultChannelService; */ public class Acquisition { + private static boolean NEW_ENGINE = true; + // Get Logger private static Logger logger = Logger.getLogger(Acquisition.class.getName()); @@ -134,6 +138,8 @@ public class Acquisition { private ActionLoop actionLoop; private Collector collector; + private ScanMapperNG mapper; + private AcquisitionEngineNG acquisitionEngine; private DataDispatcher dispatcher; private Manipulator manipulator; private DataSerializerTXT serializer; @@ -240,20 +246,29 @@ public class Acquisition { // Configure core engine EngineConfiguration.getInstance().setFailOnSensorError(smodel.isFailOnSensorError()); + if(NEW_ENGINE){ + acquisitionEngine = new AcquisitionEngineNG(cservice); + mapper = new ScanMapperNG(); + mapper.map(smodel); + + this.manipulator = new Manipulator(new DataQueue( acquisitionEngine.getOutQueue(), mapper.getDataMessageMetadata()), this.manipulations); + } + else{ + logger.fine("Map Model to internal logic"); + + // Map scan to base model + // After this call actionLoop and collector will be initialized + mapScan(smodel); + + logger.fine("ActionLoop and Collector initialized"); + + // TODO Remove this workaround + Collections.reverse(collector.getQueues()); + + // Add manipulator into processing chain + this.manipulator = new Manipulator(collector.getOutQueue(), this.manipulations); + } - logger.fine("Map Model to internal logic"); - - // Map scan to base model - // After this call actionLoop and collector will be initialized - mapScan(smodel); - - logger.fine("ActionLoop and Collector initialized"); - - // TODO Remove this workaround - Collections.reverse(collector.getQueues()); - - // Add manipulator into processing chain - this.manipulator = new Manipulator(collector.getOutQueue(), this.manipulations); // // Insert dispatcher into processing chain this.dispatcher = new DataDispatcher(manipulator.getOutQueue()); @@ -287,8 +302,11 @@ public class Acquisition { try{ active = true; - Thread tc = new Thread(collector); - tc.start(); + Thread tc = null; + if(!NEW_ENGINE){ + tc = new Thread(collector); + tc.start(); + } Thread tm = new Thread(manipulator); tm.start(); @@ -299,15 +317,25 @@ public class Acquisition { Thread t = new Thread(serializer); t.start(); - actionLoop.prepare(); - actionLoop.execute(); - actionLoop.cleanup(); + if(NEW_ENGINE){ + acquisitionEngine.execute(mapper.getResourceDescriptors(), mapper.getScript()); + } + else { + actionLoop.prepare(); + actionLoop.execute(); + actionLoop.cleanup(); + } + + + // Wait for data collector threads // Do this with a Latch or something like that // Give the threads 1 minute to catch up - tc.join(60000); + if(!NEW_ENGINE){ + tc.join(60000); + } tm.join(60000); td.join(60000); t.join(60000); diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/AcquisitionEngineNG.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/AcquisitionEngineNG.java similarity index 78% rename from ch.psi.fda/src/main/java/ch/psi/fda/aq/AcquisitionEngineNG.java rename to ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/AcquisitionEngineNG.java index 0d8dd87..e26ab53 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/AcquisitionEngineNG.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/AcquisitionEngineNG.java @@ -16,11 +16,13 @@ * along with this code. If not, see . * */ -package ch.psi.fda.aq; +package ch.psi.fda.aq.ng; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeoutException; import java.util.logging.Logger; @@ -28,6 +30,9 @@ import javax.script.ScriptEngine; import javax.script.ScriptEngineManager; import javax.script.ScriptException; +import ch.psi.fda.core.messages.ComponentMetadata; +import ch.psi.fda.core.messages.DataMessageMetadata; +import ch.psi.fda.core.messages.Message; import ch.psi.jcae.Channel; import ch.psi.jcae.ChannelDescriptor; import ch.psi.jcae.ChannelException; @@ -42,9 +47,11 @@ public class AcquisitionEngineNG { private static final Logger logger = Logger.getLogger(AcquisitionEngineNG.class.getName()); private ChannelService cservice; + private BlockingQueue outQueue; public AcquisitionEngineNG(ChannelService cservice) { this.cservice = cservice; + this.outQueue = new LinkedBlockingQueue(1000); // Create bounded queue to prevent running out of memory ... // Workaround for Jython memory leak // http://blog.hillbrecht.de/2009/07/11/jython-memory-leakout-of-memory-problem/ @@ -52,6 +59,12 @@ public class AcquisitionEngineNG { } public void execute(Map resourceDescriptors, String script){ + + // TODO Remove debugging messages + // For debugging purpose only + logger.info("Resources to create: "+resourceDescriptors); + logger.info("Script to execute: "+script); + try { ScriptEngine engine = new ScriptEngineManager().getEngineByName("python"); @@ -73,16 +86,17 @@ public class AcquisitionEngineNG { } else if(resourceDescriptors.get(k) instanceof ProbeDescriptor){ ProbeDescriptor descriptor = (ProbeDescriptor) resourceDescriptors.get(k); - List> descriptors = new ArrayList<>(); + List> channels = new ArrayList<>(); for(ChannelDescriptor s: descriptor.getSensors()){ try { - descriptors.add(cservice.createChannel(s)); + channels.add(cservice.createChannel(s)); + } catch (ChannelException | InterruptedException | TimeoutException e) { throw new RuntimeException("Unable to create resource for channel: "+s.getName(),e); } } - engine.put(k, new ChannelProbeResource(descriptors)); + engine.put(k, new ChannelProbeResource(channels, outQueue)); } else{ throw new RuntimeException("Resource type not supported: "+resourceDescriptors.get(k).getClass().getName()); @@ -97,5 +111,9 @@ public class AcquisitionEngineNG { throw new RuntimeException("Action failed while executing the Jython script",e); } } + + public BlockingQueue getOutQueue(){ + return outQueue; + } } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/ChannelProbeResource.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ChannelProbeResource.java similarity index 56% rename from ch.psi.fda/src/main/java/ch/psi/fda/aq/ChannelProbeResource.java rename to ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ChannelProbeResource.java index 36b69bf..a816f1e 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/ChannelProbeResource.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ChannelProbeResource.java @@ -16,14 +16,17 @@ * along with this code. If not, see . * */ -package ch.psi.fda.aq; +package ch.psi.fda.aq.ng; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import ch.psi.fda.core.messages.DataMessage; +import ch.psi.fda.core.messages.EndOfStreamMessage; +import ch.psi.fda.core.messages.Message; +import ch.psi.fda.core.messages.StreamDelimiterMessage; import ch.psi.jcae.Channel; import ch.psi.jcae.ChannelException; @@ -34,12 +37,16 @@ import ch.psi.jcae.ChannelException; public class ChannelProbeResource { private final List> channels; - + private BlockingQueue queue; - public ChannelProbeResource(List> channels){ + public ChannelProbeResource(List> channels, BlockingQueue queue){ this.channels = channels; + this.queue = queue; } + /** + * Read configured channels and send out data message + */ public void read(){ try{ DataMessage message = new DataMessage(); @@ -50,9 +57,38 @@ public class ChannelProbeResource { message.getData().add(o); } System.out.println("MESSAGE: "+message); + queue.put(message); } catch (InterruptedException | TimeoutException | ChannelException | ExecutionException e) { throw new RuntimeException(e); } } + + /** + * Send a stream delimiter message for the given index + * @param index + * @param iflag + */ + public void delimiter(int index, boolean iflag){ + try { + StreamDelimiterMessage message = new StreamDelimiterMessage(index, iflag); + System.out.println(message); + queue.put(message); + } catch (InterruptedException e) { + throw new RuntimeException("Unable to send delimiter message", e); + } + } + + /** + * Send end of stream message + */ + public void terminateStream(){ + try { + EndOfStreamMessage message = new EndOfStreamMessage(); + System.out.println(message); + queue.put(message); + } catch (InterruptedException e) { + throw new RuntimeException("Unable to send end of stream message", e); + } + } } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/ProbeDescriptor.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ProbeDescriptor.java similarity index 97% rename from ch.psi.fda/src/main/java/ch/psi/fda/aq/ProbeDescriptor.java rename to ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ProbeDescriptor.java index b2a1309..67e6325 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/ProbeDescriptor.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ProbeDescriptor.java @@ -16,7 +16,7 @@ * along with this code. If not, see . * */ -package ch.psi.fda.aq; +package ch.psi.fda.aq.ng; import java.util.ArrayList; import java.util.List; diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/ScanMapperNG.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ScanMapperNG.java similarity index 87% rename from ch.psi.fda/src/main/java/ch/psi/fda/aq/ScanMapperNG.java rename to ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ScanMapperNG.java index 4104543..6d12c1e 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/ScanMapperNG.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ScanMapperNG.java @@ -16,7 +16,7 @@ * along with this code. If not, see . * */ -package ch.psi.fda.aq; +package ch.psi.fda.aq.ng; import java.util.ArrayList; import java.util.HashMap; @@ -27,6 +27,8 @@ import java.util.Set; import java.util.UUID; import java.util.logging.Logger; +import ch.psi.fda.core.messages.ComponentMetadata; +import ch.psi.fda.core.messages.DataMessageMetadata; import ch.psi.fda.model.v1.Action; import ch.psi.fda.model.v1.ArrayPositioner; import ch.psi.fda.model.v1.ChannelAction; @@ -37,6 +39,8 @@ import ch.psi.fda.model.v1.DiscreteStepDimension; import ch.psi.fda.model.v1.DiscreteStepPositioner; import ch.psi.fda.model.v1.FunctionPositioner; import ch.psi.fda.model.v1.LinearPositioner; +import ch.psi.fda.model.v1.PseudoPositioner; +import ch.psi.fda.model.v1.RegionPositioner; import ch.psi.fda.model.v1.Scan; import ch.psi.fda.model.v1.ScriptAction; import ch.psi.fda.model.v1.ShellAction; @@ -65,6 +69,8 @@ public class ScanMapperNG { private Map resourceDescriptors = new HashMap<>(); private ProbeDescriptor probeDescriptor = new ProbeDescriptor(); + private DataMessageMetadata dataMessageMetadata = new DataMessageMetadata(); + private String indentation = ""; @@ -124,6 +130,14 @@ public class ScanMapperNG { return resourceDescriptors; } + /** + * TODO need to be moved somewhere else + * @return + */ + public DataMessageMetadata getDataMessageMetadata(){ + return dataMessageMetadata; + } + private void mapDimensions(List dimensions, int index) { if(dimensions.size()<1){ @@ -161,12 +175,29 @@ public class ScanMapperNG { // Use readback probeDescriptor.getSensors().add(new ChannelDescriptor<>(type, positioner.getReadback(), false)); } + System.out.println(positioner.getId()); + dataMessageMetadata.getComponents().add(new ComponentMetadata(positioner.getId(), (dimensions.size()-1-index))); // TODO Done resource if(positioner instanceof LinearPositioner){ + // Calculate steps and add it to position array LinearPositioner lpositioner = (LinearPositioner) positioner; + boolean first = true; + StringBuffer b = new StringBuffer(); + b.append("["); + for(double i=0;(lpositioner.getStart()+i*lpositioner.getStepSize())<=lpositioner.getEnd();i++){ + if(first){ + first=false; + } + else{ + b.append(","); + } + b.append(String.format("%f", (lpositioner.getStart()+i*lpositioner.getStepSize()))); + } + b.append("]"); + script.append(indentation + var+"_positions="+b.toString()+"\n"); } else if(positioner instanceof ArrayPositioner){ ArrayPositioner apositioner = (ArrayPositioner) positioner; @@ -192,6 +223,12 @@ public class ScanMapperNG { FunctionPositioner fpositioner = (FunctionPositioner) positioner; // TODO take variable mappings into account ! } + else if(positioner instanceof PseudoPositioner){ + // TODO implement + } + else if(positioner instanceof RegionPositioner){ + // TODO implement + } } // Create dimension loop @@ -208,6 +245,7 @@ public class ScanMapperNG { mapActions(d.getAction()); // TODO map guard and sensors + // Check whether final dimension reached if ((index + 1) < dimensions.size()) { @@ -219,8 +257,17 @@ public class ScanMapperNG { script.append(indentation + PROBE_RESOURCE_ID+".read()\n"); } - indentation = indentation.replaceFirst(INDENT, ""); // decrease - // indentation + indentation = indentation.replaceFirst(INDENT, ""); // decrease indentation + + + script.append(indentation + PROBE_RESOURCE_ID+".delimiter("+(dimensions.size()-1-index)+", "+(d.isDataGroup()?"True":"False")+")\n"); + // send delimiter for this dimension + // if lowest dimension end of stream message + // if highest dimension no delimiter + if(index==0){ + script.append(indentation + PROBE_RESOURCE_ID+".terminateStream()\n"); + } + mapActions(d.getPostAction()); } else if (dimension instanceof ContinuousDimension) { // ContinuousDimension d = (ContinuousDimension) dimension; @@ -353,4 +400,5 @@ public class ScanMapperNG { resourceDescriptors.put(varname, new ChannelDescriptor<>(type, channelName, monitor)); return varname; } + } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/ShellDescriptor.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ShellDescriptor.java similarity index 97% rename from ch.psi.fda/src/main/java/ch/psi/fda/aq/ShellDescriptor.java rename to ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ShellDescriptor.java index a1f8e0a..900b3a7 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/ShellDescriptor.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ShellDescriptor.java @@ -16,7 +16,7 @@ * along with this code. If not, see . * */ -package ch.psi.fda.aq; +package ch.psi.fda.aq.ng; /** * Resource descriptor of an operating system shell. diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/aq/ShellResource.java b/ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ShellResource.java similarity index 98% rename from ch.psi.fda/src/main/java/ch/psi/fda/aq/ShellResource.java rename to ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ShellResource.java index 14f15ca..5e516db 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/aq/ShellResource.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/aq/ng/ShellResource.java @@ -16,7 +16,7 @@ * along with this code. If not, see . * */ -package ch.psi.fda.aq; +package ch.psi.fda.aq.ng; import java.io.BufferedReader; import java.io.IOException; diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/aq/AcquisitionEngineNGTest.java b/ch.psi.fda/src/test/java/ch/psi/fda/aq/ng/AcquisitionEngineNGTest.java similarity index 96% rename from ch.psi.fda/src/test/java/ch/psi/fda/aq/AcquisitionEngineNGTest.java rename to ch.psi.fda/src/test/java/ch/psi/fda/aq/ng/AcquisitionEngineNGTest.java index ca14fe9..682e339 100644 --- a/ch.psi.fda/src/test/java/ch/psi/fda/aq/AcquisitionEngineNGTest.java +++ b/ch.psi.fda/src/test/java/ch/psi/fda/aq/ng/AcquisitionEngineNGTest.java @@ -16,7 +16,7 @@ * along with this code. If not, see . * */ -package ch.psi.fda.aq; +package ch.psi.fda.aq.ng; import java.util.logging.Logger; @@ -25,6 +25,8 @@ import org.junit.Before; import org.junit.Test; import ch.psi.fda.TestChannels; +import ch.psi.fda.aq.ng.AcquisitionEngineNG; +import ch.psi.fda.aq.ng.ScanMapperNG; import ch.psi.fda.model.v1.ArrayPositioner; import ch.psi.fda.model.v1.ChannelAction; import ch.psi.fda.model.v1.Configuration; @@ -60,7 +62,7 @@ public class AcquisitionEngineNGTest { } /** - * Test method for {@link ch.psi.fda.aq.AcquisitionEngineNG#execute(java.lang.String)}. + * Test method for {@link ch.psi.fda.aq.ng.AcquisitionEngineNG#execute(java.lang.String)}. */ @Test public void testExecuteChannelAction() { diff --git a/ch.psi.fda/src/test/java/ch/psi/fda/aq/ScanMapperNGTest.java b/ch.psi.fda/src/test/java/ch/psi/fda/aq/ng/ScanMapperNGTest.java similarity index 76% rename from ch.psi.fda/src/test/java/ch/psi/fda/aq/ScanMapperNGTest.java rename to ch.psi.fda/src/test/java/ch/psi/fda/aq/ng/ScanMapperNGTest.java index fa6c5c6..ec1e220 100644 --- a/ch.psi.fda/src/test/java/ch/psi/fda/aq/ScanMapperNGTest.java +++ b/ch.psi.fda/src/test/java/ch/psi/fda/aq/ng/ScanMapperNGTest.java @@ -16,7 +16,7 @@ * along with this code. If not, see . * */ -package ch.psi.fda.aq; +package ch.psi.fda.aq.ng; import static org.junit.Assert.*; @@ -28,10 +28,12 @@ import org.junit.Before; import org.junit.Test; import ch.psi.fda.TestChannels; +import ch.psi.fda.aq.ng.ScanMapperNG; import ch.psi.fda.model.v1.ArrayPositioner; import ch.psi.fda.model.v1.ChannelAction; import ch.psi.fda.model.v1.Configuration; import ch.psi.fda.model.v1.DiscreteStepDimension; +import ch.psi.fda.model.v1.LinearPositioner; import ch.psi.fda.model.v1.Scan; import ch.psi.fda.model.v1.ScriptAction; import ch.psi.fda.model.v1.ShellAction; @@ -61,7 +63,7 @@ public class ScanMapperNGTest { } /** - * Test method for {@link ch.psi.fda.aq.ScanMapperNG#mapActions(java.util.List)}. + * Test method for {@link ch.psi.fda.aq.ng.ScanMapperNG#mapActions(java.util.List)}. */ @Test public void testMap() { @@ -101,6 +103,32 @@ public class ScanMapperNGTest { config.getScan().getDimension().add(d); + mapper.map(config); + String script = mapper.getScript(); + + System.out.println(script); + } + + @Test + public void testMapLinearPositioner() { + + Configuration config = new Configuration(); + config.setScan(new Scan()); + + DiscreteStepDimension d = new DiscreteStepDimension(); + + LinearPositioner positioner = new LinearPositioner(); + positioner.setName(TestChannels.ANALOG_OUT); + positioner.setStart(0.1); + positioner.setEnd(2.2); + positioner.setStepSize(0.1); + + d.getPositioner().add(positioner); + + config.getScan().getDimension().add(d); +// config.getScan().getDimension().add(d); + + mapper.map(config); String script = mapper.getScript();