diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/AcquisitionRMain.java b/ch.psi.fda/src/main/java/ch/psi/fda/AcquisitionRMain.java deleted file mode 100644 index 37f8888..0000000 --- a/ch.psi.fda/src/main/java/ch/psi/fda/AcquisitionRMain.java +++ /dev/null @@ -1,426 +0,0 @@ -/** - * - * Copyright 2010 Paul Scherrer Institute. All rights reserved. - * - * This code is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This code is distributed in the hope that it will be useful, - * but without any warranty; without even the implied warranty of - * merchantability or fitness for a particular purpose. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this code. If not, see . - * - */ - -package ch.psi.fda; - -import java.awt.FlowLayout; -import java.awt.event.ActionEvent; -import java.awt.event.ActionListener; -import java.awt.event.WindowAdapter; -import java.awt.event.WindowEvent; -import java.io.File; -import java.io.PrintWriter; -import java.util.HashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.swing.JFrame; -import javax.swing.JPanel; -import javax.swing.JScrollPane; -import javax.swing.JSplitPane; -import javax.swing.JTabbedPane; -import javax.swing.ScrollPaneLayout; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; - -import com.google.common.eventbus.AsyncEventBus; -import com.google.common.eventbus.EventBus; - -import sun.misc.Signal; -import sun.misc.SignalHandler; -import ch.psi.fda.aq.VisualizationMapper; -import ch.psi.fda.gui.ProgressPanel; -import ch.psi.fda.gui.ScrollableFlowPanel; -import ch.psi.fda.install.ApplicationConfigurator; -import ch.psi.fda.model.ModelManager; -import ch.psi.fda.model.v1.Configuration; -import ch.psi.fda.model.v1.Data; -import ch.psi.fda.rest.RestClient; -import ch.psi.fda.visualizer.Visualizer; - -/** - * Entry class for command line based data acquisition - */ -@SuppressWarnings("restriction") -public class AcquisitionRMain { - - private static Logger logger = Logger.getLogger(AcquisitionRMain.class.getName()); - - private static boolean abortedViaSignal = false; - - /** - * Main Program - * Process exit code: -1 if wrong number of arguments are passed - * Process exit code: 3 if aborted via Ctrl+C - * - * @param args Arguments of the program - */ - public static void main(String[] args) { - - String scriptname = "fda_scan"; - - Integer iterations = null; - boolean autoclose = false; - boolean nogui = false; - String files[] = null; - - HashMap varTable = new HashMap(); - - // Iterations option - OptionBuilder.hasArg(); - OptionBuilder.withArgName("iterations"); - OptionBuilder.withDescription("Number of iterations"); - OptionBuilder.withType(new Integer(1)); - Option o_iterations = OptionBuilder.create( "iterations"); - - // Variables option - OptionBuilder.hasArg(); - OptionBuilder.withArgName("variables"); - OptionBuilder.withDescription("Scan variables - variables are specified in the form var=value,var2=value2"); - OptionBuilder.withType(new Integer(1)); - Option o_variables = OptionBuilder.create( "variables"); - - Option o_autoclose = new Option( "autoclose", "Close down application after scan" ); - Option o_init = new Option( "initialize", "Initialize application directories and configuration files" ); - Option o_nogui = new Option( "nogui", "Do not show scan GUI" ); - - Options options = new Options(); - options.addOption(o_variables); - options.addOption(o_iterations); - options.addOption(o_autoclose); - options.addOption(o_init); - options.addOption(o_nogui); - - CommandLineParser parser = new GnuParser(); - // Parse the command line arguments - try { - CommandLine line = parser.parse( options, args ); - - // Initialize application - if( line.hasOption(o_init.getOpt()) ){ - // Initialize application - ApplicationConfigurator ac = new ApplicationConfigurator(); - ac.initializeApplication(); - System.exit(0); - } - - if(line.getArgs().length<1){ - throw new ParseException("One argument is required"); - } - - files=line.getArgs(); - - // Iterations option - if( line.hasOption(o_iterations.getOpt()) ){ - iterations = Integer.parseInt(line.getOptionValue(o_iterations.getOpt())); - } - - // Variables - if( line.hasOption(o_variables.getOpt()) ){ - String variables = line.getOptionValue(o_variables.getOpt() ); - String[] vars = variables.split(","); - for(String varp:vars){ - String[] pair = varp.split("="); - if(pair.length!=2){ - throw new ParseException("Variables are not specified the correct way. -variables var1=val1,var2=val2"); - } - varTable.put(pair[0], pair[1]); - } - } - - // Autoclose option - if( line.hasOption( o_autoclose.getOpt() ) ) { - autoclose = true; - } - - // No GUI option - if( line.hasOption( o_nogui.getOpt() ) ) { - nogui = true; - } - - } catch (ParseException e) { - System.err.println(e.getMessage()); - HelpFormatter formatter = new HelpFormatter(); - formatter.printUsage(new PrintWriter(System.out, true), HelpFormatter.DEFAULT_WIDTH, scriptname, options); - System.exit(-1); - } - - // Run application - try{ - for(String file: files){ - run(new File(file), iterations, autoclose, nogui); - } - - // Close application automatically if autoclose option is set (and visualizations are specified) - if(nogui || autoclose ){ - System.exit(0); - } - - } - catch(Exception ee){ - System.out.println("Acquisition failed due to: "+ee.getMessage()); - logger.log(Level.SEVERE, "Acquisition failed due to: ", ee); // Do not print stack trace - System.exit(-1); - } - - } - - /** - * Run scan - * @param file Scan file - * @param iterations Number of iterations - * @param autoclose Flag whether to close the application automatically after the scan - * @param nogui Flag whether to run the scan with a GUI - */ - public static void run(File file, Integer iterations, boolean autoclose, boolean nogui){ - -// // Initialize application -// ApplicationConfigurator ac = new ApplicationConfigurator(); -// ac.initializeApplication(); - - if(!file.exists()){ - throw new RuntimeException("File "+file.getAbsolutePath()+" does not exist"); - } - - Configuration c; - try { - c = ModelManager.unmarshall(file); - } catch (Exception e) { - throw new RuntimeException("Unable to deserialize configuration: "+e.getMessage(), e); - } - - // Set data file name - // Determine name used for the data file - String name = file.getName(); - name = name.replaceAll("\\.xml$", ""); - - if(c.getData()!=null){ - Data data = c.getData(); - // Only update filename if no name is specified in xml file - if(data.getFileName()==null){ - data.setFileName(name); - } - } - else{ - Data data = new Data(); - data.setFileName(name); - c.setData(data); - } - - - // Override number of executions - if(iterations != null){ - c.setNumberOfExecution(iterations); - } - // Fix configuration if iterations is specified with 0 and no iterations option is specified - if(c.getNumberOfExecution()==0){ - c.setNumberOfExecution(1); - } - - // Create/get acquisition engine -// final Acquisition acquisition = new Acquisition(new DefaultChannelService(), new AcquisitionConfiguration()); - - boolean vis = false; - // Only register data visualization task/processor if there are visualizations - if(c.getVisualization().size()>0 && !nogui){ - vis=true; - } - - EventBus b = new AsyncEventBus(Executors.newSingleThreadExecutor()); - - final RestClient client = new RestClient(); - - -// acquisition.initalize(b, c); - - Visualizer visualizer = null; - // Only register data visualization task/processor if there are visualizations - if(vis){ - final StreamClient sclient = new StreamClient(b); - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - sclient.listen(); - } - }); - - visualizer = new Visualizer(VisualizationMapper.mapVisualizations(c.getVisualization())); - b.register(visualizer); - - // TODO eventually set update on delimiter/dim boundary here - - // If there is a continous dimension only update plot at the end of a line - if(c.getScan() != null && c.getScan().getCdimension()!=null){ - visualizer.setUpdateAtStreamElement(false); - visualizer.setUpdateAtStreamDelimiter(true); - visualizer.setUpdateAtEndOfStream(true); - } - } - - // GUI GUI GUI GUI GUI GUI GUI - ProgressPanel progressPanel = null; - if(visualizer != null){ // Only bring up GUI if there are some plots ... - // Visualizations - JPanel opanel = new ScrollableFlowPanel(); - opanel.setLayout(new FlowLayout()); - - JScrollPane spane = new JScrollPane(opanel, ScrollPaneLayout.VERTICAL_SCROLLBAR_AS_NEEDED, ScrollPaneLayout.HORIZONTAL_SCROLLBAR_NEVER); - JTabbedPane tpane = new JTabbedPane(); - tpane.addTab("Overview", spane); - - for(JPanel p: visualizer.getPlotPanels()){ - opanel.add(p); - } - - final JFrame frame = new JFrame("FDA: "+file); - frame.setSize(1200,800); - - // Create progress panel - progressPanel = new ProgressPanel(); - progressPanel.addActionListener(new ActionListener() { - - @Override - public void actionPerformed(ActionEvent e) { - try { - client.stop(); - } catch (Exception e1) { - logger.log(Level.SEVERE, "Exception occured while aborting scan", e1); - } - - } - }); - - - JSplitPane splitPane = new JSplitPane(); - splitPane.setLeftComponent(progressPanel); - splitPane.setRightComponent(tpane); - - frame.add(splitPane); - frame.addWindowListener(new WindowAdapter(){ - @Override - public void windowClosing(WindowEvent we){ - if(client.isActive()){ - // Abort acquisition - client.stop(); - } - - // Wait until acquisition is aborted. Maximum wait 10*100milliseconds before forcefully - // terminate application - int count=0; - while(client.isActive()){ - if(count == 10){ - break; - } - - // Sleep 100 milliseconds - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - count++; - } - - // Terminate program - System.exit(0); - } - }); - frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);//.DO_NOTHING_ON_CLOSE); -// frame.setVisible(true); - - java.awt.EventQueue.invokeLater(new Runnable() { - public void run() { - frame.setVisible(true); - } - }); - } - // GUI GUI GUI GUI GUI GUI GUI - - // CLI CLI CLI CLI - // Register the scan engine as Signal Handler - Signal.handle(new Signal("INT"), new SignalHandler() { - /** - * Thread save signal counter - */ - private AtomicInteger signalCount= new AtomicInteger(0); - - /** - * Testing signal handler (in Eclipse) use this after starting scan: - * - * SL5: A=`ps -ef | tail -10 | grep jav[a] | awk '{printf $2}'`;kill -2 $A - * MacOS X: A=`ps -ef | grep AcquisitionMai[n] | awk '{printf $2}'`;kill -2 $A - * - * on the command line use CTRL-C - */ - @Override - public void handle(Signal signal) { - - logger.finest("Received signal: "+signal); - - int count = signalCount.incrementAndGet(); - - // If signal is received more than 1 time forcefully abort application - if(count>1){ - logger.info("Terminate application"); - System.exit(2); - } - - abortedViaSignal = true; - // Abort acquisition engine - if(client.isActive()){ - // Abort acquisition - client.stop(); - } - } - }); - // CLI CLI CLI CLI - - - if(visualizer != null){ - visualizer.configure(); - } - client.acquire(c); - - - // GUI GUI GUI GUI GUI GUI GUI - // Set progress panel to done -// if(progressPanel != null){ -// // Can this be done via a Listener? -// progressPanel.done(); -// } - // GUI GUI GUI GUI GUI GUI GUI - - if(abortedViaSignal){ - System.exit(3); - } - } - - - -} diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/ViewerMain.java b/ch.psi.fda/src/main/java/ch/psi/fda/ViewerMain.java index 16436ba..227fb80 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/ViewerMain.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/ViewerMain.java @@ -36,6 +36,7 @@ import javax.swing.ScrollPaneLayout; import com.google.common.eventbus.EventBus; import ch.psi.fda.gui.ScrollableFlowPanel; +import ch.psi.fda.rest.client.StreamClient; import ch.psi.fda.visualizer.SeriesDataFilter; import ch.psi.fda.visualizer.Visualizer; import ch.psi.fda.visualizer.XYSeriesDataFilter; @@ -101,7 +102,7 @@ public class ViewerMain { bus.register(visualizer); visualizer.configure(); - new StreamClient(bus).listen(); + new StreamClient(bus).listen("tcp://emac:10000"); } /** diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionEngine.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionEngine.java index e489076..50ba81e 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionEngine.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/AcquisitionEngine.java @@ -18,7 +18,6 @@ */ package ch.psi.fda.rest; -import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -111,20 +110,32 @@ public class AcquisitionEngine { * @param configuration * @return */ - public String submit(Configuration configuration){ - ExecutionRequest r = new ExecutionRequest(UUID.randomUUID().toString(), configuration); + public void submit(String trackingId, Configuration configuration){ + + // Check whether trackingId already exists + for(ExecutionRequest req: requests){ + if(req.getTrackingId().equals(trackingId)){ + throw new RuntimeException("TrackingId "+trackingId+" is already submitted"); + } + } + + ExecutionRequest r = new ExecutionRequest(trackingId, configuration); try{ requests.put(r); } catch (InterruptedException e) { } - return r.getTrackingId(); } + public void terminateAll(){ + requests.clear(); + terminate(); + } + /** * Terminate the currently executed request */ - public void terminate(){ + private void terminate(){ synchronized(this){ if(acquisition==null){ return; diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/ZMQDataService.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/ZMQDataService.java index facd02b..8e03424 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/rest/ZMQDataService.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/ZMQDataService.java @@ -66,7 +66,7 @@ public class ZMQDataService { @Subscribe public void onMessage(Message m) { logger.info(m.toString()); - socket.sendMore("{\"trackingId\":\"" + trackingId + "\"}"); + socket.sendMore("{\"htype\": [\"fda-2.1\"], \"trackingId\":\"" + trackingId + "\"}"); try ( ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b); diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/client/RemoteAcquisitionMain.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/client/RemoteAcquisitionMain.java new file mode 100644 index 0000000..32257a5 --- /dev/null +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/client/RemoteAcquisitionMain.java @@ -0,0 +1,316 @@ +/** + * + * Copyright 2010 Paul Scherrer Institute. All rights reserved. + * + * This code is free software: you can redistribute it and/or modify it under + * the terms of the GNU Lesser General Public License as published by the Free + * Software Foundation, either version 3 of the License, or (at your option) any + * later version. + * + * This code is distributed in the hope that it will be useful, but without any + * warranty; without even the implied warranty of merchantability or fitness for + * a particular purpose. See the GNU Lesser General Public License for more + * details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this code. If not, see . + * + */ + +package ch.psi.fda.rest.client; + +import java.awt.FlowLayout; +import java.awt.event.ActionEvent; +import java.awt.event.ActionListener; +import java.awt.event.WindowAdapter; +import java.awt.event.WindowEvent; +import java.io.File; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.swing.JFrame; +import javax.swing.JPanel; +import javax.swing.JScrollPane; +import javax.swing.JSplitPane; +import javax.swing.JTabbedPane; +import javax.swing.ScrollPaneLayout; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; + +import com.google.common.eventbus.AsyncEventBus; +import com.google.common.eventbus.EventBus; + +import sun.misc.Signal; +import sun.misc.SignalHandler; +import ch.psi.fda.aq.VisualizationMapper; +import ch.psi.fda.gui.ProgressPanel; +import ch.psi.fda.gui.ScrollableFlowPanel; +import ch.psi.fda.model.ModelManager; +import ch.psi.fda.model.v1.Configuration; +import ch.psi.fda.model.v1.Data; +import ch.psi.fda.visualizer.Visualizer; + +@SuppressWarnings("restriction") +public class RemoteAcquisitionMain { + + private static Logger logger = Logger.getLogger(RemoteAcquisitionMain.class.getName()); + + /** + * Main Program Process exit code: -1 if wrong number of arguments are + * passed Process exit code: 3 if aborted via Ctrl+C + * + * @param args + * Arguments of the program + */ + public static void main(String[] args) { + + String scriptname = "fda_scan"; + + Integer iterations = null; + boolean nogui = false; + List files = new ArrayList<>(); + + OptionBuilder.hasArg(); + OptionBuilder.withArgName("iterations"); + OptionBuilder.withDescription("Number of iterations"); + OptionBuilder.withType(new Integer(1)); + Option o_iterations = OptionBuilder.create("iterations"); + + Option o_nogui = new Option("nogui", "Do not show scan GUI"); + + Options options = new Options(); + options.addOption(o_iterations); + options.addOption(o_nogui); + + CommandLineParser parser = new GnuParser(); + try { + CommandLine line = parser.parse(options, args); + + if (line.getArgs().length < 1) { + throw new ParseException("One argument is required"); + } + + if (line.hasOption(o_iterations.getOpt())) { + iterations = Integer.parseInt(line.getOptionValue(o_iterations.getOpt())); + } + + if (line.hasOption(o_nogui.getOpt())) { + nogui = true; + } + + for (String f : line.getArgs()) { + File file = new File(f); + if (!file.exists()) { + throw new RuntimeException("File " + file.getAbsolutePath() + " does not exist"); + } + files.add(file); + } + + } catch (ParseException e) { + System.err.println(e.getMessage()); + HelpFormatter formatter = new HelpFormatter(); + formatter.printUsage(new PrintWriter(System.out, true), HelpFormatter.DEFAULT_WIDTH, scriptname, options); + System.exit(-1); + } + + try { + for (File file : files) { + executeScan(file, iterations, nogui); + } + } catch (Exception ee) { + logger.log(Level.SEVERE, "Acquisition failed due to: ", ee); + System.exit(-1); + } + } + + + /** + * Execute the given scan on the server + * @param file + * @param iterations + * @param nogui + */ + public static void executeScan(File file, Integer iterations, boolean nogui) { + + Configuration c; + try { + c = ModelManager.unmarshall(file); + } catch (Exception e) { + throw new RuntimeException("Unable to deserialize configuration: " + e.getMessage(), e); + } + + // Set data file name + // Determine name used for the data file + String name = file.getName(); + name = name.replaceAll("\\.xml$", ""); + + if (c.getData() != null) { + Data data = c.getData(); + // Only update filename if no name is specified in xml file + if (data.getFileName() == null) { + data.setFileName(name); + } + } else { + Data data = new Data(); + data.setFileName(name); + c.setData(data); + } + + // Override number of executions + if (iterations != null) { + c.setNumberOfExecution(iterations); + } + // Fix configuration if iterations is specified with 0 and no iterations + // option is specified + if (c.getNumberOfExecution() == 0) { + c.setNumberOfExecution(1); + } + + + EventBus b = new AsyncEventBus(Executors.newSingleThreadExecutor()); + final RestClient client = new RestClient(); + StreamClient streamClient = null; + + + if (!nogui && c.getVisualization().size() > 0) { + + streamClient = new StreamClient(b); + + Visualizer visualizer = new Visualizer(VisualizationMapper.mapVisualizations(c.getVisualization())); + visualizer.configure(); + b.register(visualizer); + + // If there is a continuous dimension only update plot at the end of a line + if (c.getScan() != null && c.getScan().getCdimension() != null) { + visualizer.setUpdateAtStreamElement(false); + visualizer.setUpdateAtStreamDelimiter(true); + visualizer.setUpdateAtEndOfStream(true); + } + + JPanel opanel = new ScrollableFlowPanel(); + opanel.setLayout(new FlowLayout()); + + JScrollPane spane = new JScrollPane(opanel, ScrollPaneLayout.VERTICAL_SCROLLBAR_AS_NEEDED, ScrollPaneLayout.HORIZONTAL_SCROLLBAR_NEVER); + JTabbedPane tpane = new JTabbedPane(); + tpane.addTab("Overview", spane); + + for (JPanel p : visualizer.getPlotPanels()) { + opanel.add(p); + } + + final JFrame frame = new JFrame("FDA: " + file); + frame.setSize(1200, 800); + + final ProgressPanel progressPanel = new ProgressPanel(); + progressPanel.addActionListener(new ActionListener() { + + @Override + public void actionPerformed(ActionEvent e) { + try { + client.stop(); + } catch (Exception e1) { + logger.log(Level.SEVERE, "Exception occured while aborting scan", e1); + } finally { + progressPanel.done(); + } + + } + }); + + JSplitPane splitPane = new JSplitPane(); + splitPane.setLeftComponent(progressPanel); + splitPane.setRightComponent(tpane); + + frame.add(splitPane); + frame.addWindowListener(new WindowAdapter() { + @Override + public void windowClosing(WindowEvent we) { + if (client.isActive()) { + client.stop(); + } + + int count = 0; + while (client.isActive()) { + if (count == 10) { + break; + } + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + count++; + } + + System.exit(0); + } + }); + frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE); // .DO_NOTHING_ON_CLOSE); + + java.awt.EventQueue.invokeLater(new Runnable() { + public void run() { + frame.setVisible(true); + } + }); + } + + + + Signal.handle(new Signal("INT"), new SignalHandler() { + + private AtomicInteger signalCount = new AtomicInteger(0); + + @Override + public void handle(Signal signal) { + + logger.finest("Received signal: " + signal); + + try{ + client.stop(); + } + catch(Exception e){ + logger.log(Level.WARNING, "Stopping scan failed with exception", e); + } + + if (signalCount.incrementAndGet() > 1) { + logger.info("Terminate application"); + System.exit(2); + } + } + }); + + + + String trackingId = UUID.randomUUID().toString(); + logger.info("TrackingID of job: " + trackingId); + + if(streamClient!=null){ + streamClient.filterTrackingId(trackingId); + final StreamClient sclient = streamClient; + Executors.newSingleThreadExecutor().execute(new Runnable() { + @Override + public void run() { + sclient.listen("tcp://emac:10000"); + } + }); + } + + client.acquire(trackingId, c); + + + // TODO Need to be informed when stopped! - Progess Panel + } +} diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/RestClient.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/client/RestClient.java similarity index 85% rename from ch.psi.fda/src/main/java/ch/psi/fda/rest/RestClient.java rename to ch.psi.fda/src/main/java/ch/psi/fda/rest/client/RestClient.java index c03b4f3..dff2139 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/rest/RestClient.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/client/RestClient.java @@ -16,7 +16,7 @@ * along with this code. If not, see . * */ -package ch.psi.fda.rest; +package ch.psi.fda.rest.client; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; @@ -42,13 +42,14 @@ public class RestClient { - public String acquire(Configuration c){ + public String acquire(String trackingId, Configuration c){ // Wrap configuration in JAXBElement as there is no @XmlRootElement available within the generated Configuration class JAXBElement jaxbElement = new JAXBElement<>(new QName("ROOT"), Configuration.class, c); - return target.request().post(Entity.entity(jaxbElement, MediaType.APPLICATION_XML), String.class); + return target.path(trackingId).request().put(Entity.entity(jaxbElement, MediaType.APPLICATION_XML), String.class); } public void stop(){ + // TODO in future only the actual request should be aborted! target.request().delete(); } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/StreamClient.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/client/StreamClient.java similarity index 66% rename from ch.psi.fda/src/main/java/ch/psi/fda/StreamClient.java rename to ch.psi.fda/src/main/java/ch/psi/fda/rest/client/StreamClient.java index c867296..6d59dad 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/StreamClient.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/client/StreamClient.java @@ -16,43 +16,56 @@ * along with this code. If not, see . * */ -package ch.psi.fda; +package ch.psi.fda.rest.client; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectInputStream; +import java.util.HashMap; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; import org.jeromq.ZMQ; import com.google.common.eventbus.EventBus; -/** - * - */ public class StreamClient { - - private static final Logger logger = Logger.getLogger(StreamClient.class.getName()); - private final EventBus bus; + private static final Logger logger = Logger.getLogger(StreamClient.class.getName()); + private ObjectMapper mapper = new ObjectMapper(new JsonFactory()); - public StreamClient(EventBus bus){ + + private final EventBus bus; + private String trackingIdFilter = null; + + public StreamClient(EventBus bus) { this.bus = bus; } - public void listen() { + public void listen(String endpoint) { ZMQ.Context context = ZMQ.context(); zmq.ZError.clear(); // Clear error code ZMQ.Socket socket = context.socket(ZMQ.SUB); - socket.connect("tcp://emac:10000"); + socket.connect(endpoint); socket.subscribe(""); // SUBSCRIBE ! while (true) { + String tid = null; + byte[] content = null; - // String header = socket.recvStr(); // header - socket.recvStr(); // header + byte[] header = socket.recv(); // header + if(trackingIdFilter!=null){ + try { + Map m = mapper.readValue(header, new TypeReference>(){}); + tid = (String) m.get("trackingId"); + } catch (IOException e) { + } + } while (socket.hasReceiveMore()) { content = socket.recv(); } @@ -61,6 +74,10 @@ public class StreamClient { continue; } + if(tid!=null && !tid.matches(trackingIdFilter)){ + continue; + } + try (ByteArrayInputStream bis = new ByteArrayInputStream(content); ObjectInput in = new ObjectInputStream(bis);) { Object o = in.readObject(); bus.post(o); @@ -70,4 +87,12 @@ public class StreamClient { } } + + public String getTrackingIdFilter() { + return trackingIdFilter; + } + + public void filterTrackingId(String trackingId) { + this.trackingIdFilter = trackingId; + } } diff --git a/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/ScanService.java b/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/ScanService.java index 7e5449e..8ec277b 100644 --- a/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/ScanService.java +++ b/ch.psi.fda/src/main/java/ch/psi/fda/rest/services/ScanService.java @@ -23,8 +23,9 @@ import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.GET; -import javax.ws.rs.POST; +import javax.ws.rs.PUT; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.core.MediaType; import ch.psi.fda.model.v1.Configuration; @@ -36,15 +37,22 @@ public class ScanService { @Inject private AcquisitionEngine aengine; - @POST + @PUT + @Path("{trackingId}") @Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) - public String execute(Configuration configuration) throws InterruptedException{ - return aengine.submit(configuration); + public void execute(@PathParam("trackingId") String trackingId, Configuration configuration) throws InterruptedException{ + aengine.submit(trackingId, configuration); } @DELETE - public void stop(){ - aengine.terminate(); + @Path("{trackingId}") + public void stop(@PathParam("trackingId") String trackingId){ + aengine.terminate(trackingId); + } + + @DELETE + public void terminateAll(){ + aengine.terminateAll(); } @GET