Files
dev/plugins/Sender.java
2018-01-19 10:56:53 +01:00

281 lines
11 KiB
Java
Executable File

/*
* Copyright (c) 2014 Paul Scherrer Institute. All rights reserved.
*/
package ch.psi.pshell.data;
import ch.psi.bsread.DataChannel;
import ch.psi.bsread.Utils;
import ch.psi.bsread.common.allocator.ByteBufferAllocator;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import com.fasterxml.jackson.core.JsonProcessingException;
import ch.psi.bsread.common.helper.ByteBufferHelper;
import ch.psi.bsread.compression.Compression;
import ch.psi.bsread.converter.ByteConverter;
import ch.psi.bsread.converter.MatlabByteConverter;
import ch.psi.bsread.message.ChannelConfig;
import ch.psi.bsread.message.DataHeader;
import ch.psi.bsread.message.MainHeader;
import ch.psi.bsread.message.Timestamp;
import ch.psi.bsread.message.Type;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Array;
import java.util.HashMap;
import java.util.Map;
import java.util.function.IntFunction;
public class Sender {
final MainHeader mainHeader;
final Compression dataHeaderCompression;
final ByteConverter byteConverter;
final ObjectMapper objectMapper;
final IntFunction<ByteBuffer> compressedValueAllocator;
final IntFunction<ByteBuffer> valueAllocator;
private byte[] dataHeaderBytes;
private String dataHeaderMD5 = "";
private List<DataChannel<?>> channels = new ArrayList<>();
public Sender(ObjectMapper objectMapper) {
this(objectMapper, Compression.none);
}
public Sender(ObjectMapper objectMapper,Compression dataHeaderCompression) {
this(objectMapper, dataHeaderCompression, new MatlabByteConverter(), ByteBufferAllocator.DEFAULT_ALLOCATOR, ByteBufferAllocator.DEFAULT_ALLOCATOR);
}
public Sender(ObjectMapper objectMapper, Compression dataHeaderCompression, ByteConverter byteConverter, IntFunction<ByteBuffer> compressedValueAllocator, IntFunction<ByteBuffer> valueAllocator) {
this.dataHeaderCompression = dataHeaderCompression;
this.byteConverter = byteConverter;
this.objectMapper = objectMapper;
this.compressedValueAllocator = compressedValueAllocator;
this.valueAllocator = valueAllocator;
mainHeader = new MainHeader();
}
int autoPulseId;
public String encode() throws JsonProcessingException {
long nanos = System.nanoTime();
return encode(autoPulseId++, new Timestamp((long)(nanos/1e9), (long)(nanos%1e9)));
}
public String encode(long pulseId, Timestamp globalTimestamp) throws JsonProcessingException {
DataChannel<?> channel;
ByteOrder byteOrder;
mainHeader.setPulseId(pulseId);
mainHeader.setGlobalTimestamp(globalTimestamp);
mainHeader.setHash(dataHeaderMD5);
mainHeader.setDataHeaderCompression(dataHeaderCompression);
try {
Map ret = new HashMap();
ret.put("header", objectMapper.writeValueAsString(mainHeader));
ret.put("data_header", new String(dataHeaderBytes));
for (int i = 0; i < channels.size(); ++i) {
channel = channels.get(i);
byteOrder = channel.getConfig().getByteOrder();
final Object value = channel.getValue(pulseId);
ByteBuffer valueBuffer = byteConverter.getBytes(value, channel.getConfig().getType(), byteOrder, valueAllocator);
valueBuffer = channel
.getConfig()
.getCompression()
.getCompressor()
.compressData(valueBuffer, valueBuffer.position(), valueBuffer.remaining(), 0,
compressedValueAllocator, channel.getConfig().getType().getBytes());
System.out.println((new String(valueBuffer.array())).length());
System.out.println(channel.getConfig().getName() + ": " + new String(valueBuffer.array()));
ret.put(channel.getConfig().getName(), new String(valueBuffer.array()));
Timestamp timestamp = channel.getTime(pulseId);
ByteBuffer timeBuffer = byteConverter.getBytes(timestamp.getAsLongArray(), Type.Int64, byteOrder, valueAllocator);
ret.put(channel.getConfig().getName() + "_timestamp", new String(timeBuffer.array()));
}
return objectMapper.writeValueAsString(ret);
} catch (Exception e) {
throw new IllegalStateException("Unable to serialize message", e);
}
}
/**
* (Re)Generate the data header based on the configured data channels
*/
private void generateDataHeader() {
DataHeader dataHeader = new DataHeader();
for (DataChannel<?> channel : channels) {
dataHeader.addChannel(channel.getConfig());
}
try {
dataHeaderBytes = objectMapper.writeValueAsBytes(dataHeader);
if (!Compression.none.equals(dataHeaderCompression)) {
ByteBuffer tmpBuf = dataHeaderCompression.getCompressor().compressDataHeader(ByteBuffer.wrap(dataHeaderBytes),
compressedValueAllocator);
dataHeaderBytes = ByteBufferHelper.copyToByteArray(tmpBuf);
}
// decided to compute hash from the bytes that are send to Receivers
// (allows to check consistency without uncompressing the bytes at
// receivers side)
dataHeaderMD5 = Utils.computeMD5(dataHeaderBytes);
} catch (JsonProcessingException e) {
throw new RuntimeException("Unable to generate data header", e);
}
}
public void addChannel(DataChannel<?> channel) {
channels.add(channel);
generateDataHeader();
}
public void removeChannel(DataChannel<?> channel) {
channels.remove(channel);
generateDataHeader();
}
public DataChannel addConstant(String name, Object data){
return addConstant(name, data, Compression.none);
}
public DataChannel addConstant(String name, Object data, boolean unsigned){
return addConstant(name, data, unsigned, Compression.none);
}
public DataChannel addConstant(String name, Object data, Compression compression){
return addConstant(name, data, false, Compression.none);
}
public DataChannel addConstant(String name, Object data, boolean unsigned, Compression compression){
DataChannel channel = null;
Class cls = data.getClass();
Type type = classToType(cls, unsigned);
int[] shape = {1};
if (cls.isArray()){
shape = new int[] {Array.getLength(data)};
}
channel = new DataChannel(new ChannelConfig(name, type, shape, 1, 0, ChannelConfig.DEFAULT_ENCODING, compression)) {
@Override
public Object getValue(long pulseId) {
return data;
}
};
addChannel(channel);
return channel;
}
public static Type classToType(Class cls, boolean unsigned){
if ((cls == Double.class) ||(cls == double[].class)){
return Type.Float64;
}
if ((cls == Float.class) ||(cls == float[].class)){
return Type.Float32;
}
if ((cls == Byte.class) ||(cls == byte[].class)){
return Type.Int8;
}
if ((cls == Short.class) ||(cls == short[].class)){
return unsigned ? Type.UInt8 : Type.Int16;
}
if ((cls == Integer.class) ||(cls == int[].class)){
return unsigned ? Type.UInt16 : Type.Int32;
}
if ((cls == Long.class) ||(cls == long[].class)){
return unsigned ? Type.UInt32 : Type.Int64;
}
if ((cls == String.class) ||(cls == String[].class)){
return Type.String;
}
if ((cls == Boolean.class) ||(cls == Boolean[].class)){
return Type.Bool;
}
throw new IllegalArgumentException("Invalid class: " + cls);
}
/**
* Returns the currently configured data channels as an unmodifiable list
*
* @return Unmodifiable list of data channels
*/
public List<DataChannel<?>> getChannels() {
return Collections.unmodifiableList(channels);
}
public static void main(String[] args) throws Exception {
ByteConverter byteConverter = new MatlabByteConverter();
Sender sender = new Sender(new ObjectMapper(),Compression.none);
// Register data sources ...
sender.addConstant("Double", 10.0);
/*
sender.addConstant("Float", 10.0f);
sender.addConstant("Byte", (byte)-10);
sender.addConstant("Short", (short)-10);
sender.addConstant("Int", (int)-10);
sender.addConstant("Long", (long)-10);
sender.addConstant("UByte", (short)-10, true);
sender.addConstant("UShort", (int)-10, true);
sender.addConstant("UInt", (long)-10, true);
sender.addConstant("Bool", true);
sender.addConstant("String", "Test");
sender.addConstant("Double Arr", new double[]{10.0, 20.0, 30.0}, Compression.bitshuffle_lz4);
sender.addConstant("Byte Arr", new byte[]{10, 20, 30}, Compression.bitshuffle_lz4);
*/
/*sender.addChannel(new DataChannel<Double>(new ChannelConfig("ABC", Type.Float64, 1, 0)) {
@Override
public Double getValue(long pulseId) {
return (double) 10.0;
}
});
*/
//sender.addConstant("Double Arr", new double[3]);
String json = sender.encode();
System.out.println(json.length());
System.out.println(json);
ObjectMapper mapper = new ObjectMapper();
Map message = mapper.readValue(json, Map.class);
MainHeader mainHeader = mapper.readValue((String)message.get("header"), MainHeader.class);
System.out.println(mainHeader);
DataHeader dataHeader = mapper.readValue(((String)message.get("data_header")).getBytes(),DataHeader.class);
System.out.println(dataHeader);
for (ChannelConfig channelConfig : dataHeader.getChannels()) {
ChannelConfig timestampConfig = new ChannelConfig();
timestampConfig.setType(Type.Int64);
timestampConfig.setShape(new int[]{2});
timestampConfig.setByteOrder(channelConfig.getByteOrder());
ByteBuffer timestampBuffer = ByteBuffer.wrap(((String)message.get(channelConfig.getName()+"_timestamp")).getBytes());
long[] timestampArr = byteConverter.getValue(mainHeader, dataHeader, timestampConfig, timestampBuffer, mainHeader.getGlobalTimestamp());
Timestamp timestamp = new Timestamp(timestampArr[0], timestampArr[1]);
System.out.println(timestamp);
ByteBuffer valueBuffer = ByteBuffer.wrap(((String)message.get(channelConfig.getName())).getBytes());
Object data = byteConverter.getValue(mainHeader, dataHeader, channelConfig, valueBuffer, timestamp);
System.out.println(data);
}
}
}