Improved server and client
This commit is contained in:
2013-12-20 13:43:16 +01:00
parent 244001fd99
commit 76e9ea1bed
8 changed files with 390 additions and 454 deletions

View File

@@ -1,426 +0,0 @@
/**
*
* Copyright 2010 Paul Scherrer Institute. All rights reserved.
*
* This code is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This code is distributed in the hope that it will be useful,
* but without any warranty; without even the implied warranty of
* merchantability or fitness for a particular purpose. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda;
import java.awt.FlowLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent;
import java.io.File;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JSplitPane;
import javax.swing.JTabbedPane;
import javax.swing.ScrollPaneLayout;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import sun.misc.Signal;
import sun.misc.SignalHandler;
import ch.psi.fda.aq.VisualizationMapper;
import ch.psi.fda.gui.ProgressPanel;
import ch.psi.fda.gui.ScrollableFlowPanel;
import ch.psi.fda.install.ApplicationConfigurator;
import ch.psi.fda.model.ModelManager;
import ch.psi.fda.model.v1.Configuration;
import ch.psi.fda.model.v1.Data;
import ch.psi.fda.rest.RestClient;
import ch.psi.fda.visualizer.Visualizer;
/**
* Entry class for command line based data acquisition
*/
@SuppressWarnings("restriction")
public class AcquisitionRMain {
private static Logger logger = Logger.getLogger(AcquisitionRMain.class.getName());
private static boolean abortedViaSignal = false;
/**
* Main Program
* Process exit code: -1 if wrong number of arguments are passed
* Process exit code: 3 if aborted via Ctrl+C
*
* @param args Arguments of the program
*/
public static void main(String[] args) {
String scriptname = "fda_scan";
Integer iterations = null;
boolean autoclose = false;
boolean nogui = false;
String files[] = null;
HashMap<String,String> varTable = new HashMap<String,String>();
// Iterations option
OptionBuilder.hasArg();
OptionBuilder.withArgName("iterations");
OptionBuilder.withDescription("Number of iterations");
OptionBuilder.withType(new Integer(1));
Option o_iterations = OptionBuilder.create( "iterations");
// Variables option
OptionBuilder.hasArg();
OptionBuilder.withArgName("variables");
OptionBuilder.withDescription("Scan variables - variables are specified in the form var=value,var2=value2");
OptionBuilder.withType(new Integer(1));
Option o_variables = OptionBuilder.create( "variables");
Option o_autoclose = new Option( "autoclose", "Close down application after scan" );
Option o_init = new Option( "initialize", "Initialize application directories and configuration files" );
Option o_nogui = new Option( "nogui", "Do not show scan GUI" );
Options options = new Options();
options.addOption(o_variables);
options.addOption(o_iterations);
options.addOption(o_autoclose);
options.addOption(o_init);
options.addOption(o_nogui);
CommandLineParser parser = new GnuParser();
// Parse the command line arguments
try {
CommandLine line = parser.parse( options, args );
// Initialize application
if( line.hasOption(o_init.getOpt()) ){
// Initialize application
ApplicationConfigurator ac = new ApplicationConfigurator();
ac.initializeApplication();
System.exit(0);
}
if(line.getArgs().length<1){
throw new ParseException("One argument is required");
}
files=line.getArgs();
// Iterations option
if( line.hasOption(o_iterations.getOpt()) ){
iterations = Integer.parseInt(line.getOptionValue(o_iterations.getOpt()));
}
// Variables
if( line.hasOption(o_variables.getOpt()) ){
String variables = line.getOptionValue(o_variables.getOpt() );
String[] vars = variables.split(",");
for(String varp:vars){
String[] pair = varp.split("=");
if(pair.length!=2){
throw new ParseException("Variables are not specified the correct way. -variables var1=val1,var2=val2");
}
varTable.put(pair[0], pair[1]);
}
}
// Autoclose option
if( line.hasOption( o_autoclose.getOpt() ) ) {
autoclose = true;
}
// No GUI option
if( line.hasOption( o_nogui.getOpt() ) ) {
nogui = true;
}
} catch (ParseException e) {
System.err.println(e.getMessage());
HelpFormatter formatter = new HelpFormatter();
formatter.printUsage(new PrintWriter(System.out, true), HelpFormatter.DEFAULT_WIDTH, scriptname, options);
System.exit(-1);
}
// Run application
try{
for(String file: files){
run(new File(file), iterations, autoclose, nogui);
}
// Close application automatically if autoclose option is set (and visualizations are specified)
if(nogui || autoclose ){
System.exit(0);
}
}
catch(Exception ee){
System.out.println("Acquisition failed due to: "+ee.getMessage());
logger.log(Level.SEVERE, "Acquisition failed due to: ", ee); // Do not print stack trace
System.exit(-1);
}
}
/**
* Run scan
* @param file Scan file
* @param iterations Number of iterations
* @param autoclose Flag whether to close the application automatically after the scan
* @param nogui Flag whether to run the scan with a GUI
*/
public static void run(File file, Integer iterations, boolean autoclose, boolean nogui){
// // Initialize application
// ApplicationConfigurator ac = new ApplicationConfigurator();
// ac.initializeApplication();
if(!file.exists()){
throw new RuntimeException("File "+file.getAbsolutePath()+" does not exist");
}
Configuration c;
try {
c = ModelManager.unmarshall(file);
} catch (Exception e) {
throw new RuntimeException("Unable to deserialize configuration: "+e.getMessage(), e);
}
// Set data file name
// Determine name used for the data file
String name = file.getName();
name = name.replaceAll("\\.xml$", "");
if(c.getData()!=null){
Data data = c.getData();
// Only update filename if no name is specified in xml file
if(data.getFileName()==null){
data.setFileName(name);
}
}
else{
Data data = new Data();
data.setFileName(name);
c.setData(data);
}
// Override number of executions
if(iterations != null){
c.setNumberOfExecution(iterations);
}
// Fix configuration if iterations is specified with 0 and no iterations option is specified
if(c.getNumberOfExecution()==0){
c.setNumberOfExecution(1);
}
// Create/get acquisition engine
// final Acquisition acquisition = new Acquisition(new DefaultChannelService(), new AcquisitionConfiguration());
boolean vis = false;
// Only register data visualization task/processor if there are visualizations
if(c.getVisualization().size()>0 && !nogui){
vis=true;
}
EventBus b = new AsyncEventBus(Executors.newSingleThreadExecutor());
final RestClient client = new RestClient();
// acquisition.initalize(b, c);
Visualizer visualizer = null;
// Only register data visualization task/processor if there are visualizations
if(vis){
final StreamClient sclient = new StreamClient(b);
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
sclient.listen();
}
});
visualizer = new Visualizer(VisualizationMapper.mapVisualizations(c.getVisualization()));
b.register(visualizer);
// TODO eventually set update on delimiter/dim boundary here
// If there is a continous dimension only update plot at the end of a line
if(c.getScan() != null && c.getScan().getCdimension()!=null){
visualizer.setUpdateAtStreamElement(false);
visualizer.setUpdateAtStreamDelimiter(true);
visualizer.setUpdateAtEndOfStream(true);
}
}
// GUI GUI GUI GUI GUI GUI GUI
ProgressPanel progressPanel = null;
if(visualizer != null){ // Only bring up GUI if there are some plots ...
// Visualizations
JPanel opanel = new ScrollableFlowPanel();
opanel.setLayout(new FlowLayout());
JScrollPane spane = new JScrollPane(opanel, ScrollPaneLayout.VERTICAL_SCROLLBAR_AS_NEEDED, ScrollPaneLayout.HORIZONTAL_SCROLLBAR_NEVER);
JTabbedPane tpane = new JTabbedPane();
tpane.addTab("Overview", spane);
for(JPanel p: visualizer.getPlotPanels()){
opanel.add(p);
}
final JFrame frame = new JFrame("FDA: "+file);
frame.setSize(1200,800);
// Create progress panel
progressPanel = new ProgressPanel();
progressPanel.addActionListener(new ActionListener() {
@Override
public void actionPerformed(ActionEvent e) {
try {
client.stop();
} catch (Exception e1) {
logger.log(Level.SEVERE, "Exception occured while aborting scan", e1);
}
}
});
JSplitPane splitPane = new JSplitPane();
splitPane.setLeftComponent(progressPanel);
splitPane.setRightComponent(tpane);
frame.add(splitPane);
frame.addWindowListener(new WindowAdapter(){
@Override
public void windowClosing(WindowEvent we){
if(client.isActive()){
// Abort acquisition
client.stop();
}
// Wait until acquisition is aborted. Maximum wait 10*100milliseconds before forcefully
// terminate application
int count=0;
while(client.isActive()){
if(count == 10){
break;
}
// Sleep 100 milliseconds
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
count++;
}
// Terminate program
System.exit(0);
}
});
frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);//.DO_NOTHING_ON_CLOSE);
// frame.setVisible(true);
java.awt.EventQueue.invokeLater(new Runnable() {
public void run() {
frame.setVisible(true);
}
});
}
// GUI GUI GUI GUI GUI GUI GUI
// CLI CLI CLI CLI
// Register the scan engine as Signal Handler
Signal.handle(new Signal("INT"), new SignalHandler() {
/**
* Thread save signal counter
*/
private AtomicInteger signalCount= new AtomicInteger(0);
/**
* Testing signal handler (in Eclipse) use this after starting scan:
*
* SL5: A=`ps -ef | tail -10 | grep jav[a] | awk '{printf $2}'`;kill -2 $A
* MacOS X: A=`ps -ef | grep AcquisitionMai[n] | awk '{printf $2}'`;kill -2 $A
*
* on the command line use CTRL-C
*/
@Override
public void handle(Signal signal) {
logger.finest("Received signal: "+signal);
int count = signalCount.incrementAndGet();
// If signal is received more than 1 time forcefully abort application
if(count>1){
logger.info("Terminate application");
System.exit(2);
}
abortedViaSignal = true;
// Abort acquisition engine
if(client.isActive()){
// Abort acquisition
client.stop();
}
}
});
// CLI CLI CLI CLI
if(visualizer != null){
visualizer.configure();
}
client.acquire(c);
// GUI GUI GUI GUI GUI GUI GUI
// Set progress panel to done
// if(progressPanel != null){
// // Can this be done via a Listener?
// progressPanel.done();
// }
// GUI GUI GUI GUI GUI GUI GUI
if(abortedViaSignal){
System.exit(3);
}
}
}

View File

@@ -36,6 +36,7 @@ import javax.swing.ScrollPaneLayout;
import com.google.common.eventbus.EventBus;
import ch.psi.fda.gui.ScrollableFlowPanel;
import ch.psi.fda.rest.client.StreamClient;
import ch.psi.fda.visualizer.SeriesDataFilter;
import ch.psi.fda.visualizer.Visualizer;
import ch.psi.fda.visualizer.XYSeriesDataFilter;
@@ -101,7 +102,7 @@ public class ViewerMain {
bus.register(visualizer);
visualizer.configure();
new StreamClient(bus).listen();
new StreamClient(bus).listen("tcp://emac:10000");
}
/**

View File

@@ -18,7 +18,6 @@
*/
package ch.psi.fda.rest;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -111,20 +110,32 @@ public class AcquisitionEngine {
* @param configuration
* @return
*/
public String submit(Configuration configuration){
ExecutionRequest r = new ExecutionRequest(UUID.randomUUID().toString(), configuration);
public void submit(String trackingId, Configuration configuration){
// Check whether trackingId already exists
for(ExecutionRequest req: requests){
if(req.getTrackingId().equals(trackingId)){
throw new RuntimeException("TrackingId "+trackingId+" is already submitted");
}
}
ExecutionRequest r = new ExecutionRequest(trackingId, configuration);
try{
requests.put(r);
} catch (InterruptedException e) {
}
return r.getTrackingId();
}
public void terminateAll(){
requests.clear();
terminate();
}
/**
* Terminate the currently executed request
*/
public void terminate(){
private void terminate(){
synchronized(this){
if(acquisition==null){
return;

View File

@@ -66,7 +66,7 @@ public class ZMQDataService {
@Subscribe
public void onMessage(Message m) {
logger.info(m.toString());
socket.sendMore("{\"trackingId\":\"" + trackingId + "\"}");
socket.sendMore("{\"htype\": [\"fda-2.1\"], \"trackingId\":\"" + trackingId + "\"}");
try (
ByteArrayOutputStream b = new ByteArrayOutputStream();
ObjectOutputStream o = new ObjectOutputStream(b);

View File

@@ -0,0 +1,316 @@
/**
*
* Copyright 2010 Paul Scherrer Institute. All rights reserved.
*
* This code is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* This code is distributed in the hope that it will be useful, but without any
* warranty; without even the implied warranty of merchantability or fitness for
* a particular purpose. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.rest.client;
import java.awt.FlowLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent;
import java.io.File;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JSplitPane;
import javax.swing.JTabbedPane;
import javax.swing.ScrollPaneLayout;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import sun.misc.Signal;
import sun.misc.SignalHandler;
import ch.psi.fda.aq.VisualizationMapper;
import ch.psi.fda.gui.ProgressPanel;
import ch.psi.fda.gui.ScrollableFlowPanel;
import ch.psi.fda.model.ModelManager;
import ch.psi.fda.model.v1.Configuration;
import ch.psi.fda.model.v1.Data;
import ch.psi.fda.visualizer.Visualizer;
@SuppressWarnings("restriction")
public class RemoteAcquisitionMain {
private static Logger logger = Logger.getLogger(RemoteAcquisitionMain.class.getName());
/**
* Main Program Process exit code: -1 if wrong number of arguments are
* passed Process exit code: 3 if aborted via Ctrl+C
*
* @param args
* Arguments of the program
*/
public static void main(String[] args) {
String scriptname = "fda_scan";
Integer iterations = null;
boolean nogui = false;
List<File> files = new ArrayList<>();
OptionBuilder.hasArg();
OptionBuilder.withArgName("iterations");
OptionBuilder.withDescription("Number of iterations");
OptionBuilder.withType(new Integer(1));
Option o_iterations = OptionBuilder.create("iterations");
Option o_nogui = new Option("nogui", "Do not show scan GUI");
Options options = new Options();
options.addOption(o_iterations);
options.addOption(o_nogui);
CommandLineParser parser = new GnuParser();
try {
CommandLine line = parser.parse(options, args);
if (line.getArgs().length < 1) {
throw new ParseException("One argument is required");
}
if (line.hasOption(o_iterations.getOpt())) {
iterations = Integer.parseInt(line.getOptionValue(o_iterations.getOpt()));
}
if (line.hasOption(o_nogui.getOpt())) {
nogui = true;
}
for (String f : line.getArgs()) {
File file = new File(f);
if (!file.exists()) {
throw new RuntimeException("File " + file.getAbsolutePath() + " does not exist");
}
files.add(file);
}
} catch (ParseException e) {
System.err.println(e.getMessage());
HelpFormatter formatter = new HelpFormatter();
formatter.printUsage(new PrintWriter(System.out, true), HelpFormatter.DEFAULT_WIDTH, scriptname, options);
System.exit(-1);
}
try {
for (File file : files) {
executeScan(file, iterations, nogui);
}
} catch (Exception ee) {
logger.log(Level.SEVERE, "Acquisition failed due to: ", ee);
System.exit(-1);
}
}
/**
* Execute the given scan on the server
* @param file
* @param iterations
* @param nogui
*/
public static void executeScan(File file, Integer iterations, boolean nogui) {
Configuration c;
try {
c = ModelManager.unmarshall(file);
} catch (Exception e) {
throw new RuntimeException("Unable to deserialize configuration: " + e.getMessage(), e);
}
// Set data file name
// Determine name used for the data file
String name = file.getName();
name = name.replaceAll("\\.xml$", "");
if (c.getData() != null) {
Data data = c.getData();
// Only update filename if no name is specified in xml file
if (data.getFileName() == null) {
data.setFileName(name);
}
} else {
Data data = new Data();
data.setFileName(name);
c.setData(data);
}
// Override number of executions
if (iterations != null) {
c.setNumberOfExecution(iterations);
}
// Fix configuration if iterations is specified with 0 and no iterations
// option is specified
if (c.getNumberOfExecution() == 0) {
c.setNumberOfExecution(1);
}
EventBus b = new AsyncEventBus(Executors.newSingleThreadExecutor());
final RestClient client = new RestClient();
StreamClient streamClient = null;
if (!nogui && c.getVisualization().size() > 0) {
streamClient = new StreamClient(b);
Visualizer visualizer = new Visualizer(VisualizationMapper.mapVisualizations(c.getVisualization()));
visualizer.configure();
b.register(visualizer);
// If there is a continuous dimension only update plot at the end of a line
if (c.getScan() != null && c.getScan().getCdimension() != null) {
visualizer.setUpdateAtStreamElement(false);
visualizer.setUpdateAtStreamDelimiter(true);
visualizer.setUpdateAtEndOfStream(true);
}
JPanel opanel = new ScrollableFlowPanel();
opanel.setLayout(new FlowLayout());
JScrollPane spane = new JScrollPane(opanel, ScrollPaneLayout.VERTICAL_SCROLLBAR_AS_NEEDED, ScrollPaneLayout.HORIZONTAL_SCROLLBAR_NEVER);
JTabbedPane tpane = new JTabbedPane();
tpane.addTab("Overview", spane);
for (JPanel p : visualizer.getPlotPanels()) {
opanel.add(p);
}
final JFrame frame = new JFrame("FDA: " + file);
frame.setSize(1200, 800);
final ProgressPanel progressPanel = new ProgressPanel();
progressPanel.addActionListener(new ActionListener() {
@Override
public void actionPerformed(ActionEvent e) {
try {
client.stop();
} catch (Exception e1) {
logger.log(Level.SEVERE, "Exception occured while aborting scan", e1);
} finally {
progressPanel.done();
}
}
});
JSplitPane splitPane = new JSplitPane();
splitPane.setLeftComponent(progressPanel);
splitPane.setRightComponent(tpane);
frame.add(splitPane);
frame.addWindowListener(new WindowAdapter() {
@Override
public void windowClosing(WindowEvent we) {
if (client.isActive()) {
client.stop();
}
int count = 0;
while (client.isActive()) {
if (count == 10) {
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
count++;
}
System.exit(0);
}
});
frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE); // .DO_NOTHING_ON_CLOSE);
java.awt.EventQueue.invokeLater(new Runnable() {
public void run() {
frame.setVisible(true);
}
});
}
Signal.handle(new Signal("INT"), new SignalHandler() {
private AtomicInteger signalCount = new AtomicInteger(0);
@Override
public void handle(Signal signal) {
logger.finest("Received signal: " + signal);
try{
client.stop();
}
catch(Exception e){
logger.log(Level.WARNING, "Stopping scan failed with exception", e);
}
if (signalCount.incrementAndGet() > 1) {
logger.info("Terminate application");
System.exit(2);
}
}
});
String trackingId = UUID.randomUUID().toString();
logger.info("TrackingID of job: " + trackingId);
if(streamClient!=null){
streamClient.filterTrackingId(trackingId);
final StreamClient sclient = streamClient;
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
sclient.listen("tcp://emac:10000");
}
});
}
client.acquire(trackingId, c);
// TODO Need to be informed when stopped! - Progess Panel
}
}

View File

@@ -16,7 +16,7 @@
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda.rest;
package ch.psi.fda.rest.client;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
@@ -42,13 +42,14 @@ public class RestClient {
public String acquire(Configuration c){
public String acquire(String trackingId, Configuration c){
// Wrap configuration in JAXBElement as there is no @XmlRootElement available within the generated Configuration class
JAXBElement<Configuration> jaxbElement = new JAXBElement<>(new QName("ROOT"), Configuration.class, c);
return target.request().post(Entity.entity(jaxbElement, MediaType.APPLICATION_XML), String.class);
return target.path(trackingId).request().put(Entity.entity(jaxbElement, MediaType.APPLICATION_XML), String.class);
}
public void stop(){
// TODO in future only the actual request should be aborted!
target.request().delete();
}

View File

@@ -16,43 +16,56 @@
* along with this code. If not, see <http://www.gnu.org/licenses/>.
*
*/
package ch.psi.fda;
package ch.psi.fda.rest.client;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.jeromq.ZMQ;
import com.google.common.eventbus.EventBus;
/**
*
*/
public class StreamClient {
private static final Logger logger = Logger.getLogger(StreamClient.class.getName());
private final EventBus bus;
private static final Logger logger = Logger.getLogger(StreamClient.class.getName());
private ObjectMapper mapper = new ObjectMapper(new JsonFactory());
public StreamClient(EventBus bus){
private final EventBus bus;
private String trackingIdFilter = null;
public StreamClient(EventBus bus) {
this.bus = bus;
}
public void listen() {
public void listen(String endpoint) {
ZMQ.Context context = ZMQ.context();
zmq.ZError.clear(); // Clear error code
ZMQ.Socket socket = context.socket(ZMQ.SUB);
socket.connect("tcp://emac:10000");
socket.connect(endpoint);
socket.subscribe(""); // SUBSCRIBE !
while (true) {
String tid = null;
byte[] content = null;
// String header = socket.recvStr(); // header
socket.recvStr(); // header
byte[] header = socket.recv(); // header
if(trackingIdFilter!=null){
try {
Map<String,Object> m = mapper.readValue(header, new TypeReference<HashMap<String,Object>>(){});
tid = (String) m.get("trackingId");
} catch (IOException e) {
}
}
while (socket.hasReceiveMore()) {
content = socket.recv();
}
@@ -61,6 +74,10 @@ public class StreamClient {
continue;
}
if(tid!=null && !tid.matches(trackingIdFilter)){
continue;
}
try (ByteArrayInputStream bis = new ByteArrayInputStream(content); ObjectInput in = new ObjectInputStream(bis);) {
Object o = in.readObject();
bus.post(o);
@@ -70,4 +87,12 @@ public class StreamClient {
}
}
public String getTrackingIdFilter() {
return trackingIdFilter;
}
public void filterTrackingId(String trackingId) {
this.trackingIdFilter = trackingId;
}
}

View File

@@ -23,8 +23,9 @@ import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.MediaType;
import ch.psi.fda.model.v1.Configuration;
@@ -36,15 +37,22 @@ public class ScanService {
@Inject
private AcquisitionEngine aengine;
@POST
@PUT
@Path("{trackingId}")
@Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public String execute(Configuration configuration) throws InterruptedException{
return aengine.submit(configuration);
public void execute(@PathParam("trackingId") String trackingId, Configuration configuration) throws InterruptedException{
aengine.submit(trackingId, configuration);
}
@DELETE
public void stop(){
aengine.terminate();
@Path("{trackingId}")
public void stop(@PathParam("trackingId") String trackingId){
aengine.terminate(trackingId);
}
@DELETE
public void terminateAll(){
aengine.terminateAll();
}
@GET