New ScreenPanel
This commit is contained in:
280
plugins/Sender.java
Executable file
280
plugins/Sender.java
Executable file
@@ -0,0 +1,280 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Paul Scherrer Institute. All rights reserved.
|
||||
*/
|
||||
package ch.psi.pshell.data;
|
||||
|
||||
import ch.psi.bsread.DataChannel;
|
||||
import ch.psi.bsread.Utils;
|
||||
import ch.psi.bsread.common.allocator.ByteBufferAllocator;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
|
||||
import ch.psi.bsread.common.helper.ByteBufferHelper;
|
||||
import ch.psi.bsread.compression.Compression;
|
||||
import ch.psi.bsread.converter.ByteConverter;
|
||||
import ch.psi.bsread.converter.MatlabByteConverter;
|
||||
import ch.psi.bsread.message.ChannelConfig;
|
||||
import ch.psi.bsread.message.DataHeader;
|
||||
import ch.psi.bsread.message.MainHeader;
|
||||
import ch.psi.bsread.message.Timestamp;
|
||||
import ch.psi.bsread.message.Type;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.IntFunction;
|
||||
|
||||
public class Sender {
|
||||
|
||||
final MainHeader mainHeader;
|
||||
|
||||
final Compression dataHeaderCompression;
|
||||
final ByteConverter byteConverter;
|
||||
final ObjectMapper objectMapper;
|
||||
final IntFunction<ByteBuffer> compressedValueAllocator;
|
||||
final IntFunction<ByteBuffer> valueAllocator;
|
||||
|
||||
private byte[] dataHeaderBytes;
|
||||
private String dataHeaderMD5 = "";
|
||||
|
||||
private List<DataChannel<?>> channels = new ArrayList<>();
|
||||
|
||||
public Sender(ObjectMapper objectMapper) {
|
||||
this(objectMapper, Compression.none);
|
||||
}
|
||||
|
||||
public Sender(ObjectMapper objectMapper,Compression dataHeaderCompression) {
|
||||
this(objectMapper, dataHeaderCompression, new MatlabByteConverter(), ByteBufferAllocator.DEFAULT_ALLOCATOR, ByteBufferAllocator.DEFAULT_ALLOCATOR);
|
||||
}
|
||||
|
||||
public Sender(ObjectMapper objectMapper, Compression dataHeaderCompression, ByteConverter byteConverter, IntFunction<ByteBuffer> compressedValueAllocator, IntFunction<ByteBuffer> valueAllocator) {
|
||||
this.dataHeaderCompression = dataHeaderCompression;
|
||||
this.byteConverter = byteConverter;
|
||||
this.objectMapper = objectMapper;
|
||||
this.compressedValueAllocator = compressedValueAllocator;
|
||||
this.valueAllocator = valueAllocator;
|
||||
mainHeader = new MainHeader();
|
||||
}
|
||||
|
||||
int autoPulseId;
|
||||
public String encode() throws JsonProcessingException {
|
||||
long nanos = System.nanoTime();
|
||||
return encode(autoPulseId++, new Timestamp((long)(nanos/1e9), (long)(nanos%1e9)));
|
||||
}
|
||||
|
||||
public String encode(long pulseId, Timestamp globalTimestamp) throws JsonProcessingException {
|
||||
DataChannel<?> channel;
|
||||
ByteOrder byteOrder;
|
||||
|
||||
mainHeader.setPulseId(pulseId);
|
||||
mainHeader.setGlobalTimestamp(globalTimestamp);
|
||||
mainHeader.setHash(dataHeaderMD5);
|
||||
mainHeader.setDataHeaderCompression(dataHeaderCompression);
|
||||
|
||||
try {
|
||||
Map ret = new HashMap();
|
||||
ret.put("header", objectMapper.writeValueAsString(mainHeader));
|
||||
ret.put("data_header", new String(dataHeaderBytes));
|
||||
|
||||
for (int i = 0; i < channels.size(); ++i) {
|
||||
channel = channels.get(i);
|
||||
byteOrder = channel.getConfig().getByteOrder();
|
||||
final Object value = channel.getValue(pulseId);
|
||||
|
||||
ByteBuffer valueBuffer = byteConverter.getBytes(value, channel.getConfig().getType(), byteOrder, valueAllocator);
|
||||
valueBuffer = channel
|
||||
.getConfig()
|
||||
.getCompression()
|
||||
.getCompressor()
|
||||
.compressData(valueBuffer, valueBuffer.position(), valueBuffer.remaining(), 0,
|
||||
compressedValueAllocator, channel.getConfig().getType().getBytes());
|
||||
System.out.println((new String(valueBuffer.array())).length());
|
||||
System.out.println(channel.getConfig().getName() + ": " + new String(valueBuffer.array()));
|
||||
ret.put(channel.getConfig().getName(), new String(valueBuffer.array()));
|
||||
|
||||
Timestamp timestamp = channel.getTime(pulseId);
|
||||
ByteBuffer timeBuffer = byteConverter.getBytes(timestamp.getAsLongArray(), Type.Int64, byteOrder, valueAllocator);
|
||||
ret.put(channel.getConfig().getName() + "_timestamp", new String(timeBuffer.array()));
|
||||
}
|
||||
return objectMapper.writeValueAsString(ret);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Unable to serialize message", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* (Re)Generate the data header based on the configured data channels
|
||||
*/
|
||||
private void generateDataHeader() {
|
||||
DataHeader dataHeader = new DataHeader();
|
||||
|
||||
for (DataChannel<?> channel : channels) {
|
||||
dataHeader.addChannel(channel.getConfig());
|
||||
}
|
||||
|
||||
try {
|
||||
dataHeaderBytes = objectMapper.writeValueAsBytes(dataHeader);
|
||||
if (!Compression.none.equals(dataHeaderCompression)) {
|
||||
ByteBuffer tmpBuf = dataHeaderCompression.getCompressor().compressDataHeader(ByteBuffer.wrap(dataHeaderBytes),
|
||||
compressedValueAllocator);
|
||||
dataHeaderBytes = ByteBufferHelper.copyToByteArray(tmpBuf);
|
||||
}
|
||||
// decided to compute hash from the bytes that are send to Receivers
|
||||
// (allows to check consistency without uncompressing the bytes at
|
||||
// receivers side)
|
||||
dataHeaderMD5 = Utils.computeMD5(dataHeaderBytes);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException("Unable to generate data header", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void addChannel(DataChannel<?> channel) {
|
||||
channels.add(channel);
|
||||
generateDataHeader();
|
||||
}
|
||||
|
||||
public void removeChannel(DataChannel<?> channel) {
|
||||
channels.remove(channel);
|
||||
generateDataHeader();
|
||||
}
|
||||
|
||||
|
||||
public DataChannel addConstant(String name, Object data){
|
||||
return addConstant(name, data, Compression.none);
|
||||
}
|
||||
|
||||
public DataChannel addConstant(String name, Object data, boolean unsigned){
|
||||
return addConstant(name, data, unsigned, Compression.none);
|
||||
}
|
||||
|
||||
public DataChannel addConstant(String name, Object data, Compression compression){
|
||||
return addConstant(name, data, false, Compression.none);
|
||||
}
|
||||
|
||||
public DataChannel addConstant(String name, Object data, boolean unsigned, Compression compression){
|
||||
DataChannel channel = null;
|
||||
Class cls = data.getClass();
|
||||
Type type = classToType(cls, unsigned);
|
||||
int[] shape = {1};
|
||||
if (cls.isArray()){
|
||||
shape = new int[] {Array.getLength(data)};
|
||||
}
|
||||
|
||||
channel = new DataChannel(new ChannelConfig(name, type, shape, 1, 0, ChannelConfig.DEFAULT_ENCODING, compression)) {
|
||||
@Override
|
||||
public Object getValue(long pulseId) {
|
||||
return data;
|
||||
}
|
||||
};
|
||||
|
||||
addChannel(channel);
|
||||
return channel;
|
||||
}
|
||||
|
||||
public static Type classToType(Class cls, boolean unsigned){
|
||||
if ((cls == Double.class) ||(cls == double[].class)){
|
||||
return Type.Float64;
|
||||
}
|
||||
if ((cls == Float.class) ||(cls == float[].class)){
|
||||
return Type.Float32;
|
||||
}
|
||||
if ((cls == Byte.class) ||(cls == byte[].class)){
|
||||
return Type.Int8;
|
||||
}
|
||||
if ((cls == Short.class) ||(cls == short[].class)){
|
||||
return unsigned ? Type.UInt8 : Type.Int16;
|
||||
}
|
||||
if ((cls == Integer.class) ||(cls == int[].class)){
|
||||
return unsigned ? Type.UInt16 : Type.Int32;
|
||||
}
|
||||
if ((cls == Long.class) ||(cls == long[].class)){
|
||||
return unsigned ? Type.UInt32 : Type.Int64;
|
||||
}
|
||||
if ((cls == String.class) ||(cls == String[].class)){
|
||||
return Type.String;
|
||||
}
|
||||
if ((cls == Boolean.class) ||(cls == Boolean[].class)){
|
||||
return Type.Bool;
|
||||
}
|
||||
throw new IllegalArgumentException("Invalid class: " + cls);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the currently configured data channels as an unmodifiable list
|
||||
*
|
||||
* @return Unmodifiable list of data channels
|
||||
*/
|
||||
public List<DataChannel<?>> getChannels() {
|
||||
return Collections.unmodifiableList(channels);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
ByteConverter byteConverter = new MatlabByteConverter();
|
||||
|
||||
|
||||
Sender sender = new Sender(new ObjectMapper(),Compression.none);
|
||||
|
||||
// Register data sources ...
|
||||
sender.addConstant("Double", 10.0);
|
||||
/*
|
||||
sender.addConstant("Float", 10.0f);
|
||||
sender.addConstant("Byte", (byte)-10);
|
||||
sender.addConstant("Short", (short)-10);
|
||||
sender.addConstant("Int", (int)-10);
|
||||
sender.addConstant("Long", (long)-10);
|
||||
sender.addConstant("UByte", (short)-10, true);
|
||||
sender.addConstant("UShort", (int)-10, true);
|
||||
sender.addConstant("UInt", (long)-10, true);
|
||||
sender.addConstant("Bool", true);
|
||||
sender.addConstant("String", "Test");
|
||||
sender.addConstant("Double Arr", new double[]{10.0, 20.0, 30.0}, Compression.bitshuffle_lz4);
|
||||
sender.addConstant("Byte Arr", new byte[]{10, 20, 30}, Compression.bitshuffle_lz4);
|
||||
*/
|
||||
|
||||
/*sender.addChannel(new DataChannel<Double>(new ChannelConfig("ABC", Type.Float64, 1, 0)) {
|
||||
@Override
|
||||
public Double getValue(long pulseId) {
|
||||
return (double) 10.0;
|
||||
}
|
||||
});
|
||||
*/
|
||||
//sender.addConstant("Double Arr", new double[3]);
|
||||
|
||||
String json = sender.encode();
|
||||
System.out.println(json.length());
|
||||
System.out.println(json);
|
||||
|
||||
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
Map message = mapper.readValue(json, Map.class);
|
||||
MainHeader mainHeader = mapper.readValue((String)message.get("header"), MainHeader.class);
|
||||
System.out.println(mainHeader);
|
||||
DataHeader dataHeader = mapper.readValue(((String)message.get("data_header")).getBytes(),DataHeader.class);
|
||||
System.out.println(dataHeader);
|
||||
|
||||
|
||||
for (ChannelConfig channelConfig : dataHeader.getChannels()) {
|
||||
ChannelConfig timestampConfig = new ChannelConfig();
|
||||
timestampConfig.setType(Type.Int64);
|
||||
timestampConfig.setShape(new int[]{2});
|
||||
timestampConfig.setByteOrder(channelConfig.getByteOrder());
|
||||
ByteBuffer timestampBuffer = ByteBuffer.wrap(((String)message.get(channelConfig.getName()+"_timestamp")).getBytes());
|
||||
long[] timestampArr = byteConverter.getValue(mainHeader, dataHeader, timestampConfig, timestampBuffer, mainHeader.getGlobalTimestamp());
|
||||
Timestamp timestamp = new Timestamp(timestampArr[0], timestampArr[1]);
|
||||
System.out.println(timestamp);
|
||||
|
||||
ByteBuffer valueBuffer = ByteBuffer.wrap(((String)message.get(channelConfig.getName())).getBytes());
|
||||
Object data = byteConverter.getValue(mainHeader, dataHeader, channelConfig, valueBuffer, timestamp);
|
||||
System.out.println(data);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user