This commit is contained in:
gac-x12sa
2022-12-23 15:26:41 +01:00
parent 4d38f3f6cb
commit fac8f2b120

204
plugins/Array10.java Normal file
View File

@@ -0,0 +1,204 @@
import ch.psi.pshell.device.Cacheable;
import ch.psi.pshell.device.DeviceBase;
import ch.psi.pshell.device.Readable;
import ch.psi.pshell.device.ReadonlyAsyncRegisterBase;
import ch.psi.pshell.device.ReadonlyRegister.ReadonlyRegisterArray;
import ch.psi.pshell.device.ReadonlyRegister.ReadonlyRegisterMatrix;
import ch.psi.utils.BufferConverter;
import ch.psi.utils.Convert;
import ch.psi.utils.EncoderJson;
import ch.psi.utils.State;
import ch.psi.utils.Type;
import java.io.IOException;
import java.lang.reflect.Array;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
/**
* Array streaming protocol based on ZMQs
*/
public class Array10 extends DeviceBase implements Readable, Cacheable, Readable.ReadableType{
final String address;
final int mode;
final Array10Array devArray;
final Array10Matrix devMatrix;
volatile int message_count;
Thread thread;
public Array10(String name, String address, int mode){
super(name);
this.address = address;
this.mode = mode;
this.devArray = new Array10Array();
this.devMatrix = new Array10Matrix();
addChild(devArray);
addChild(devMatrix);
message_count=0;
}
public class Array10Array extends ReadonlyAsyncRegisterBase implements ReadonlyRegisterArray{
Array10Array(){
super(Array10.this.getName() + "_array");
}
@Override
public int getSize() {
Object cache = take();
if ((cache == null) || (!cache.getClass().isArray())) {
return 0;
}
return Array.getLength(cache);
}
void set(Object data){
onReadout(data);
}
}
public class Array10Matrix extends ReadonlyAsyncRegisterBase implements ReadonlyRegisterMatrix{
int[] shape;
int modulo =1;
Array10Matrix(){
super(Array10.this.getName()+ "_matrix");
}
@Override
public int getWidth() {
return (shape==null) ? 0 : shape[1];
}
@Override
public int getHeight() {
return (shape==null) ? 0 : shape[0];
}
public int getModulo(){
return modulo;
}
public void setModulo(int modulo){
this.modulo = modulo;
}
void set(Object data, int[] shape){
if (message_count % modulo == 0){
this.shape = shape;
if ((data!=null)&&(shape!=null)&&(shape.length==2)){
onReadout(Convert.reshape(data, shape));
}
}
}
}
@Override
protected void doInitialize() throws IOException, InterruptedException{
stop();
super.doInitialize();
message_count=0;
start();
}
public void start(){
if (thread==null){
getLogger().info("Starting");
thread = new Thread(() -> {
receiverTask();
});
thread.setName("Array10 receiver: " + getName());
thread.setDaemon(true);
setState(State.Busy);
thread.start();
}
}
public void stop(){
if (thread!=null){
getLogger().info("Stopping");
try{
thread.interrupt();
} finally {
thread=null;
setState(State.Ready);
}
}
}
private void receiverTask(){
getLogger().info("Enter rx thread");
Context context = null;
Socket socket = null;
try{
context = ZMQ.context(1);
socket = context.socket(mode);
socket.connect(address);
if (mode == ZMQ.SUB){
socket.subscribe("");
}
getLogger().info("Running " + mode + " " + address);
while ((!Thread.currentThread().isInterrupted()) && (getState().isRunning())) {
byte[] header_arr = socket.recv(ZMQ.NOBLOCK);
if (header_arr !=null){
try{
String json = new String(header_arr, StandardCharsets.UTF_8);
Map<String, Object> header = (Map) EncoderJson.decode(json, Map.class);
int[] shape = (int[]) header.getOrDefault("shape", null);
String dtype = (String) header.getOrDefault("type", "int8");
byte[] data_arr = socket.recv();
if (data_arr!=null){
Object data=BufferConverter.fromArray(data_arr, Type.fromKey(dtype));
Map<String, Object> value = new HashMap<>();
value.put("header", header);
value.put("data", data);
value.put("id", message_count);
message_count++;
setCache(value);
if (devArray!=null){
devArray.set(data);
}
if (devMatrix!=null){
devMatrix.set(data, shape);
} }
} catch (Exception ex){
getLogger().log(Level.WARNING, null, ex);
}
} else {
Thread.sleep(10);
}
}
} catch (Exception ex){
getLogger().log(Level.WARNING, null, ex);
} finally {
if (socket!=null){
socket.close();
}
if (context!=null){
context.term();
}
getLogger().info("Quit rx thread");
}
}
public Array10Array getDevArray(){
return devArray;
}
@Override
protected void doClose() throws IOException {
stop();
super.doClose();
}
@Override
public Object read() throws IOException, InterruptedException {
waitCacheChange(-1);
return take();
}
}