/* * Copyright (c) 2014 Paul Scherrer Institute. All rights reserved. */ package ch.psi.pshell.data; import ch.psi.bsread.DataChannel; import ch.psi.bsread.Utils; import ch.psi.bsread.common.allocator.ByteBufferAllocator; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Collections; import java.util.List; import com.fasterxml.jackson.core.JsonProcessingException; import ch.psi.bsread.common.helper.ByteBufferHelper; import ch.psi.bsread.compression.Compression; import ch.psi.bsread.converter.ByteConverter; import ch.psi.bsread.converter.MatlabByteConverter; import ch.psi.bsread.message.ChannelConfig; import ch.psi.bsread.message.DataHeader; import ch.psi.bsread.message.MainHeader; import ch.psi.bsread.message.Timestamp; import ch.psi.bsread.message.Type; import com.fasterxml.jackson.databind.ObjectMapper; import java.lang.reflect.Array; import java.util.HashMap; import java.util.Map; import java.util.function.IntFunction; public class Sender { final MainHeader mainHeader; final Compression dataHeaderCompression; final ByteConverter byteConverter; final ObjectMapper objectMapper; final IntFunction compressedValueAllocator; final IntFunction valueAllocator; private byte[] dataHeaderBytes; private String dataHeaderMD5 = ""; private List> channels = new ArrayList<>(); public Sender(ObjectMapper objectMapper) { this(objectMapper, Compression.none); } public Sender(ObjectMapper objectMapper,Compression dataHeaderCompression) { this(objectMapper, dataHeaderCompression, new MatlabByteConverter(), ByteBufferAllocator.DEFAULT_ALLOCATOR, ByteBufferAllocator.DEFAULT_ALLOCATOR); } public Sender(ObjectMapper objectMapper, Compression dataHeaderCompression, ByteConverter byteConverter, IntFunction compressedValueAllocator, IntFunction valueAllocator) { this.dataHeaderCompression = dataHeaderCompression; this.byteConverter = byteConverter; this.objectMapper = objectMapper; this.compressedValueAllocator = compressedValueAllocator; this.valueAllocator = valueAllocator; mainHeader = new MainHeader(); } int autoPulseId; public String encode() throws JsonProcessingException { long nanos = System.nanoTime(); return encode(autoPulseId++, new Timestamp((long)(nanos/1e9), (long)(nanos%1e9))); } public String encode(long pulseId, Timestamp globalTimestamp) throws JsonProcessingException { DataChannel channel; ByteOrder byteOrder; mainHeader.setPulseId(pulseId); mainHeader.setGlobalTimestamp(globalTimestamp); mainHeader.setHash(dataHeaderMD5); mainHeader.setDataHeaderCompression(dataHeaderCompression); try { Map ret = new HashMap(); ret.put("header", objectMapper.writeValueAsString(mainHeader)); ret.put("data_header", new String(dataHeaderBytes)); for (int i = 0; i < channels.size(); ++i) { channel = channels.get(i); byteOrder = channel.getConfig().getByteOrder(); final Object value = channel.getValue(pulseId); ByteBuffer valueBuffer = byteConverter.getBytes(value, channel.getConfig().getType(), byteOrder, valueAllocator); valueBuffer = channel .getConfig() .getCompression() .getCompressor() .compressData(valueBuffer, valueBuffer.position(), valueBuffer.remaining(), 0, compressedValueAllocator, channel.getConfig().getType().getBytes()); System.out.println((new String(valueBuffer.array())).length()); System.out.println(channel.getConfig().getName() + ": " + new String(valueBuffer.array())); ret.put(channel.getConfig().getName(), new String(valueBuffer.array())); Timestamp timestamp = channel.getTime(pulseId); ByteBuffer timeBuffer = byteConverter.getBytes(timestamp.getAsLongArray(), Type.Int64, byteOrder, valueAllocator); ret.put(channel.getConfig().getName() + "_timestamp", new String(timeBuffer.array())); } return objectMapper.writeValueAsString(ret); } catch (Exception e) { throw new IllegalStateException("Unable to serialize message", e); } } /** * (Re)Generate the data header based on the configured data channels */ private void generateDataHeader() { DataHeader dataHeader = new DataHeader(); for (DataChannel channel : channels) { dataHeader.addChannel(channel.getConfig()); } try { dataHeaderBytes = objectMapper.writeValueAsBytes(dataHeader); if (!Compression.none.equals(dataHeaderCompression)) { ByteBuffer tmpBuf = dataHeaderCompression.getCompressor().compressDataHeader(ByteBuffer.wrap(dataHeaderBytes), compressedValueAllocator); dataHeaderBytes = ByteBufferHelper.copyToByteArray(tmpBuf); } // decided to compute hash from the bytes that are send to Receivers // (allows to check consistency without uncompressing the bytes at // receivers side) dataHeaderMD5 = Utils.computeMD5(dataHeaderBytes); } catch (JsonProcessingException e) { throw new RuntimeException("Unable to generate data header", e); } } public void addChannel(DataChannel channel) { channels.add(channel); generateDataHeader(); } public void removeChannel(DataChannel channel) { channels.remove(channel); generateDataHeader(); } public DataChannel addConstant(String name, Object data){ return addConstant(name, data, Compression.none); } public DataChannel addConstant(String name, Object data, boolean unsigned){ return addConstant(name, data, unsigned, Compression.none); } public DataChannel addConstant(String name, Object data, Compression compression){ return addConstant(name, data, false, Compression.none); } public DataChannel addConstant(String name, Object data, boolean unsigned, Compression compression){ DataChannel channel = null; Class cls = data.getClass(); Type type = classToType(cls, unsigned); int[] shape = {1}; if (cls.isArray()){ shape = new int[] {Array.getLength(data)}; } channel = new DataChannel(new ChannelConfig(name, type, shape, 1, 0, ChannelConfig.DEFAULT_ENCODING, compression)) { @Override public Object getValue(long pulseId) { return data; } }; addChannel(channel); return channel; } public static Type classToType(Class cls, boolean unsigned){ if ((cls == Double.class) ||(cls == double[].class)){ return Type.Float64; } if ((cls == Float.class) ||(cls == float[].class)){ return Type.Float32; } if ((cls == Byte.class) ||(cls == byte[].class)){ return Type.Int8; } if ((cls == Short.class) ||(cls == short[].class)){ return unsigned ? Type.UInt8 : Type.Int16; } if ((cls == Integer.class) ||(cls == int[].class)){ return unsigned ? Type.UInt16 : Type.Int32; } if ((cls == Long.class) ||(cls == long[].class)){ return unsigned ? Type.UInt32 : Type.Int64; } if ((cls == String.class) ||(cls == String[].class)){ return Type.String; } if ((cls == Boolean.class) ||(cls == Boolean[].class)){ return Type.Bool; } throw new IllegalArgumentException("Invalid class: " + cls); } /** * Returns the currently configured data channels as an unmodifiable list * * @return Unmodifiable list of data channels */ public List> getChannels() { return Collections.unmodifiableList(channels); } public static void main(String[] args) throws Exception { ByteConverter byteConverter = new MatlabByteConverter(); Sender sender = new Sender(new ObjectMapper(),Compression.none); // Register data sources ... sender.addConstant("Double", 10.0); /* sender.addConstant("Float", 10.0f); sender.addConstant("Byte", (byte)-10); sender.addConstant("Short", (short)-10); sender.addConstant("Int", (int)-10); sender.addConstant("Long", (long)-10); sender.addConstant("UByte", (short)-10, true); sender.addConstant("UShort", (int)-10, true); sender.addConstant("UInt", (long)-10, true); sender.addConstant("Bool", true); sender.addConstant("String", "Test"); sender.addConstant("Double Arr", new double[]{10.0, 20.0, 30.0}, Compression.bitshuffle_lz4); sender.addConstant("Byte Arr", new byte[]{10, 20, 30}, Compression.bitshuffle_lz4); */ /*sender.addChannel(new DataChannel(new ChannelConfig("ABC", Type.Float64, 1, 0)) { @Override public Double getValue(long pulseId) { return (double) 10.0; } }); */ //sender.addConstant("Double Arr", new double[3]); String json = sender.encode(); System.out.println(json.length()); System.out.println(json); ObjectMapper mapper = new ObjectMapper(); Map message = mapper.readValue(json, Map.class); MainHeader mainHeader = mapper.readValue((String)message.get("header"), MainHeader.class); System.out.println(mainHeader); DataHeader dataHeader = mapper.readValue(((String)message.get("data_header")).getBytes(),DataHeader.class); System.out.println(dataHeader); for (ChannelConfig channelConfig : dataHeader.getChannels()) { ChannelConfig timestampConfig = new ChannelConfig(); timestampConfig.setType(Type.Int64); timestampConfig.setShape(new int[]{2}); timestampConfig.setByteOrder(channelConfig.getByteOrder()); ByteBuffer timestampBuffer = ByteBuffer.wrap(((String)message.get(channelConfig.getName()+"_timestamp")).getBytes()); long[] timestampArr = byteConverter.getValue(mainHeader, dataHeader, timestampConfig, timestampBuffer, mainHeader.getGlobalTimestamp()); Timestamp timestamp = new Timestamp(timestampArr[0], timestampArr[1]); System.out.println(timestamp); ByteBuffer valueBuffer = ByteBuffer.wrap(((String)message.get(channelConfig.getName())).getBytes()); Object data = byteConverter.getValue(mainHeader, dataHeader, channelConfig, valueBuffer, timestamp); System.out.println(data); } } }