Filestorage first incomplete implementation.

This commit is contained in:
Fabian Märki
2016-09-16 18:01:39 +02:00
parent 510197769c
commit 80ffb88946
8 changed files with 514 additions and 444 deletions

View File

@ -19,13 +19,17 @@ import ch.psi.daq.cassandra.reader.CassandraReader;
import ch.psi.daq.domain.backend.Backend; import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.backend.BackendAccess; import ch.psi.daq.domain.backend.BackendAccess;
import ch.psi.daq.domain.config.DomainConfig; import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.events.ChannelEvent;
import ch.psi.daq.domain.query.processor.QueryProcessor; import ch.psi.daq.domain.query.processor.QueryProcessor;
import ch.psi.daq.domain.reader.DataReader; import ch.psi.daq.domain.reader.DataReader;
import ch.psi.daq.domain.reader.StreamEventReader;
import ch.psi.daq.filestorage.config.FileStorageConfig;
import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.query.config.QueryConfig;
import ch.psi.daq.query.processor.QueryProcessorLocal; import ch.psi.daq.query.processor.QueryProcessorLocal;
import ch.psi.daq.test.query.config.LocalQueryTestConfig; import ch.psi.daq.test.query.config.LocalQueryTestConfig;
import ch.psi.daq.test.queryrest.query.DummyArchiverApplianceReader; import ch.psi.daq.test.queryrest.query.DummyArchiverApplianceReader;
import ch.psi.daq.test.queryrest.query.DummyCassandraReader; import ch.psi.daq.test.queryrest.query.DummyCassandraReader;
import ch.psi.daq.test.queryrest.query.DummyFilestorageReader;
@Configuration @Configuration
@ComponentScan @ComponentScan
@ -49,6 +53,9 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
backendAccess.addStreamEventReaderSupplier(Backend.SF_DATABUFFER, () -> cassandraReader()); backendAccess.addStreamEventReaderSupplier(Backend.SF_DATABUFFER, () -> cassandraReader());
backendAccess.addChannelInfoReaderSupplier(Backend.SF_DATABUFFER, () -> cassandraReader()); backendAccess.addChannelInfoReaderSupplier(Backend.SF_DATABUFFER, () -> cassandraReader());
backendAccess.addStreamEventReaderSupplier(Backend.SF_IMAGESTORAGE, () -> filestorageReader());
backendAccess.addChannelInfoReaderSupplier(Backend.SF_IMAGESTORAGE, () -> filestorageReader());
backendAccess.addDataReaderSupplier(Backend.SF_ARCHIVERAPPLIANCE, () -> archiverApplianceReader()); backendAccess.addDataReaderSupplier(Backend.SF_ARCHIVERAPPLIANCE, () -> archiverApplianceReader());
} }
@ -65,6 +72,12 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport {
return new DummyCassandraReader(); return new DummyCassandraReader();
} }
@Bean(name = FileStorageConfig.BEAN_NAME_FILESTORAGE_READER)
@Lazy
public StreamEventReader<ChannelEvent> filestorageReader() {
return new DummyFilestorageReader();
}
@Bean(name = ArchiverApplianceConfig.BEAN_NAME_ARCHIVER_APPLIANCE_READER) @Bean(name = ArchiverApplianceConfig.BEAN_NAME_ARCHIVER_APPLIANCE_READER)
@Lazy @Lazy
public DataReader archiverApplianceReader() { public DataReader archiverApplianceReader() {

View File

@ -6,6 +6,8 @@ import static org.junit.Assert.assertEquals;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
@ -35,6 +37,9 @@ public class QueryRestControllerChannelInfoTest extends AbstractDaqRestTest {
private ObjectMapper objectMapper = new ObjectMapper(); private ObjectMapper objectMapper = new ObjectMapper();
@Resource(name = DomainConfig.BEAN_NAME_BACKEND_DEFAULT)
private Backend backend;
@After @After
public void tearDown() throws Exception {} public void tearDown() throws Exception {}
@ -44,7 +49,7 @@ public class QueryRestControllerChannelInfoTest extends AbstractDaqRestTest {
new RequestRangePulseId( new RequestRangePulseId(
100, 100,
101), 101),
"DataBuffer1", "DataBuffer2"); backend.getKey() + "1", backend.getKey() + "2");
String content = mapper.writeValueAsString(query); String content = mapper.writeValueAsString(query);
System.out.println(content); System.out.println(content);
@ -64,40 +69,40 @@ public class QueryRestControllerChannelInfoTest extends AbstractDaqRestTest {
List<? extends ChannelInfos> infosList = objectMapper.readValue(response, ChannelInfosList.class); List<? extends ChannelInfos> infosList = objectMapper.readValue(response, ChannelInfosList.class);
assertEquals(2, infosList.size()); assertEquals(2, infosList.size());
ChannelInfos cInfos = infosList.get(0); ChannelInfos cInfos = infosList.get(0);
assertEquals("DataBuffer1", cInfos.getChannel().getName()); assertEquals(backend.getKey() + "1", cInfos.getChannel().getName());
assertEquals(Backend.SF_DATABUFFER, cInfos.getChannel().getBackend()); assertEquals(backend, cInfos.getChannel().getBackend());
List<ChannelInfo> infos = cInfos.getChannelInfos().collect(Collectors.toList()); List<ChannelInfo> infos = cInfos.getChannelInfos().collect(Collectors.toList());
assertEquals(2, infos.size()); assertEquals(2, infos.size());
ChannelInfo info = infos.get(0); ChannelInfo info = infos.get(0);
assertEquals("DataBuffer1", info.getChannel()); assertEquals(backend.getKey() + "1", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend()); assertEquals(backend, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getStartPulseId(), info.getPulseId()); assertEquals(query.getRange().getStartPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape()); assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType()); assertEquals(Type.Int32.getKey(), info.getType());
info = infos.get(1); info = infos.get(1);
assertEquals("DataBuffer1", info.getChannel()); assertEquals(backend.getKey() + "1", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend()); assertEquals(backend, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getEndPulseId(), info.getPulseId()); assertEquals(query.getRange().getEndPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape()); assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType()); assertEquals(Type.Int32.getKey(), info.getType());
cInfos = infosList.get(1); cInfos = infosList.get(1);
assertEquals("DataBuffer2", cInfos.getChannel().getName()); assertEquals(backend.getKey() + "2", cInfos.getChannel().getName());
assertEquals(Backend.SF_DATABUFFER, cInfos.getChannel().getBackend()); assertEquals(backend, cInfos.getChannel().getBackend());
infos = cInfos.getChannelInfos().collect(Collectors.toList()); infos = cInfos.getChannelInfos().collect(Collectors.toList());
assertEquals(2, infos.size()); assertEquals(2, infos.size());
info = infos.get(0); info = infos.get(0);
assertEquals("DataBuffer2", info.getChannel()); assertEquals(backend.getKey() + "2", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend()); assertEquals(backend, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getStartPulseId(), info.getPulseId()); assertEquals(query.getRange().getStartPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape()); assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType()); assertEquals(Type.Int32.getKey(), info.getType());
info = infos.get(1); info = infos.get(1);
assertEquals("DataBuffer2", info.getChannel()); assertEquals(backend.getKey() + "2", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend()); assertEquals(backend, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getEndPulseId(), info.getPulseId()); assertEquals(query.getRange().getEndPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape()); assertArrayEquals(new int[] {1}, info.getShape());
@ -110,7 +115,7 @@ public class QueryRestControllerChannelInfoTest extends AbstractDaqRestTest {
new RequestRangePulseId( new RequestRangePulseId(
100, 100,
101), 101),
"DataBuffer1", "DataBuffer2"); backend.getKey() + "1", backend.getKey() + "2");
query.setOrdering(Ordering.desc); query.setOrdering(Ordering.desc);
String content = mapper.writeValueAsString(query); String content = mapper.writeValueAsString(query);
@ -131,40 +136,40 @@ public class QueryRestControllerChannelInfoTest extends AbstractDaqRestTest {
List<? extends ChannelInfos> infosList = objectMapper.readValue(response, ChannelInfosList.class); List<? extends ChannelInfos> infosList = objectMapper.readValue(response, ChannelInfosList.class);
assertEquals(2, infosList.size()); assertEquals(2, infosList.size());
ChannelInfos cInfos = infosList.get(0); ChannelInfos cInfos = infosList.get(0);
assertEquals("DataBuffer1", cInfos.getChannel().getName()); assertEquals(backend.getKey() + "1", cInfos.getChannel().getName());
assertEquals(Backend.SF_DATABUFFER, cInfos.getChannel().getBackend()); assertEquals(backend, cInfos.getChannel().getBackend());
List<ChannelInfo> infos = cInfos.getChannelInfos().collect(Collectors.toList()); List<ChannelInfo> infos = cInfos.getChannelInfos().collect(Collectors.toList());
assertEquals(2, infos.size()); assertEquals(2, infos.size());
ChannelInfo info = infos.get(0); ChannelInfo info = infos.get(0);
assertEquals("DataBuffer1", info.getChannel()); assertEquals(backend.getKey() + "1", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend()); assertEquals(backend, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getEndPulseId(), info.getPulseId()); assertEquals(query.getRange().getEndPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape()); assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType()); assertEquals(Type.Int32.getKey(), info.getType());
info = infos.get(1); info = infos.get(1);
assertEquals("DataBuffer1", info.getChannel()); assertEquals(backend.getKey() + "1", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend()); assertEquals(backend, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getStartPulseId(), info.getPulseId()); assertEquals(query.getRange().getStartPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape()); assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType()); assertEquals(Type.Int32.getKey(), info.getType());
cInfos = infosList.get(1); cInfos = infosList.get(1);
assertEquals("DataBuffer2", cInfos.getChannel().getName()); assertEquals(backend.getKey() + "2", cInfos.getChannel().getName());
assertEquals(Backend.SF_DATABUFFER, cInfos.getChannel().getBackend()); assertEquals(backend, cInfos.getChannel().getBackend());
infos = cInfos.getChannelInfos().collect(Collectors.toList()); infos = cInfos.getChannelInfos().collect(Collectors.toList());
assertEquals(2, infos.size()); assertEquals(2, infos.size());
info = infos.get(0); info = infos.get(0);
assertEquals("DataBuffer2", info.getChannel()); assertEquals(backend.getKey() + "2", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend()); assertEquals(backend, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getEndPulseId(), info.getPulseId()); assertEquals(query.getRange().getEndPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape()); assertArrayEquals(new int[] {1}, info.getShape());
assertEquals(Type.Int32.getKey(), info.getType()); assertEquals(Type.Int32.getKey(), info.getType());
info = infos.get(1); info = infos.get(1);
assertEquals("DataBuffer2", info.getChannel()); assertEquals(backend.getKey() + "2", info.getChannel());
assertEquals(Backend.SF_DATABUFFER, info.getBackend()); assertEquals(backend, info.getBackend());
assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime());
assertEquals(query.getRange().getStartPulseId(), info.getPulseId()); assertEquals(query.getRange().getStartPulseId(), info.getPulseId());
assertArrayEquals(new int[] {1}, info.getShape()); assertArrayEquals(new int[] {1}, info.getShape());

View File

@ -2,6 +2,8 @@ package ch.psi.daq.test.queryrest.controller;
import java.util.Arrays; import java.util.Arrays;
import javax.annotation.Resource;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
@ -43,6 +45,9 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
public static final String TEST_CHANNEL_WAVEFORM_01 = "testChannelWaveform1"; public static final String TEST_CHANNEL_WAVEFORM_01 = "testChannelWaveform1";
public static final String[] TEST_CHANNEL_NAMES = new String[] {TEST_CHANNEL_01, TEST_CHANNEL_02}; public static final String[] TEST_CHANNEL_NAMES = new String[] {TEST_CHANNEL_01, TEST_CHANNEL_02};
@Resource(name = DomainConfig.BEAN_NAME_BACKEND_DEFAULT)
private Backend backend;
@After @After
public void tearDown() throws Exception {} public void tearDown() throws Exception {}
@ -64,16 +69,16 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("BoolScalar")) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("BoolScalar"))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").value("BoolWaveform")) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").value("BoolWaveform"))
// .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name())) .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey()))
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[2]").exists())
// .andExpect( .andExpect(MockMvcResultMatchers.jsonPath("$[2].backend").value(Backend.SF_IMAGESTORAGE.getKey()))
// MockMvcResultMatchers.jsonPath("$[1].channels[0]").value(DummyArchiverApplianceReader.TEST_CHANNEL_1)) .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels").isArray())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[1]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[0]").exists())
// .andExpect( .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[0]").value("BoolScalar"))
// MockMvcResultMatchers.jsonPath("$[1].channels[1]").value(DummyArchiverApplianceReader.TEST_CHANNEL_2)) .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[1]").exists())
; .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[1]").value("BoolWaveform"));
} }
@ -97,16 +102,25 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").value("UInt32Scalar")) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").value("UInt32Scalar"))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").value("UInt32Waveform")) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").value("UInt32Waveform"))
// .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name())) .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey()))
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[0]").doesNotExist()) .andExpect(MockMvcResultMatchers.jsonPath("$[2]").exists())
; .andExpect(MockMvcResultMatchers.jsonPath("$[2].backend").value(Backend.SF_IMAGESTORAGE.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[2].channels").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[0]").value("Int32Scalar"))
.andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[1]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[1]").value("Int32Waveform"))
.andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[2]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[2]").value("UInt32Scalar"))
.andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[3]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[3]").value("UInt32Waveform"));
} }
@Test @Test
public void testChannelNameQueryBackendOrder() throws Exception { public void testChannelNameQueryBackendOrder() throws Exception {
ChannelsRequest request = new ChannelsRequest("int64", Ordering.desc, Backend.SF_DATABUFFER); ChannelsRequest request = new ChannelsRequest("int64", Ordering.desc, backend);
String content = mapper.writeValueAsString(request); String content = mapper.writeValueAsString(request);
System.out.println(content); System.out.println(content);
@ -120,7 +134,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.status().isOk()) .andExpect(MockMvcResultMatchers.status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.SF_DATABUFFER.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(backend.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("UInt64Waveform")) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("UInt64Waveform"))
@ -152,13 +166,13 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.SF_DATABUFFER.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.SF_DATABUFFER.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[23]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[23]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[25]").doesNotExist()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[24]").doesNotExist())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey()))
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name())) .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[2].backend").value(Backend.SF_IMAGESTORAGE.getKey()))
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[2]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels").isArray())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[3]").doesNotExist()) .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[23]").exists())
; .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[24]").doesNotExist());
// each reload add another channel // each reload add another channel
request.setReload(true); request.setReload(true);
@ -179,12 +193,12 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[24]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[24]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[25]").doesNotExist()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[25]").doesNotExist())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey()))
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name())) .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[2].backend").value(Backend.SF_IMAGESTORAGE.getKey()))
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[3]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels").isArray())
// .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[4]").doesNotExist()) .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[24]").exists())
; .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[25]").doesNotExist());
} }
@Test @Test
@ -335,7 +349,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
new RequestRangePulseId( new RequestRangePulseId(
100, 100,
101), 101),
new ChannelName(TEST_CHANNEL_01, Backend.SF_DATABUFFER), new ChannelName(TEST_CHANNEL_01, backend),
new ChannelName(TEST_CHANNEL_02, Backend.SF_ARCHIVERAPPLIANCE)); new ChannelName(TEST_CHANNEL_02, Backend.SF_ARCHIVERAPPLIANCE));
String content = mapper.writeValueAsString(request); String content = mapper.writeValueAsString(request);
@ -539,7 +553,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value( .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
@ -549,7 +563,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
TestTimeUtils.getTimeStr(1, 10000000))) TestTimeUtils.getTimeStr(1, 10000000)))
.andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02)) .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.backend").value("sf-databuffer")) .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.backend").value(backend.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(100)) .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalSeconds").value( .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalSeconds").value(
@ -630,7 +644,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value( .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
@ -682,7 +696,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value( .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
@ -750,7 +764,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(1000)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(1000))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value( .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
@ -866,7 +880,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_WAVEFORM_01)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_WAVEFORM_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value( .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(
@ -927,7 +941,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest {
.andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_WAVEFORM_01)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_WAVEFORM_01))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getKey()))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray())
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100))
.andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value( .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value(

View File

@ -0,0 +1,391 @@
package ch.psi.daq.test.queryrest.query;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import javax.annotation.Resource;
import org.apache.commons.lang3.ArrayUtils;
import com.google.common.collect.Lists;
import ch.psi.bsread.message.Type;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.FieldNames;
import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.events.ChannelEvent;
import ch.psi.daq.domain.events.impl.ChannelConfigurationImpl;
import ch.psi.daq.domain.events.impl.ChannelEventImpl;
import ch.psi.daq.domain.json.channels.info.ChannelInfo;
import ch.psi.daq.domain.json.channels.info.ChannelInfoImpl;
import ch.psi.daq.domain.query.event.EventQuery;
import ch.psi.daq.domain.query.event.StreamEventQuery;
import ch.psi.daq.domain.query.range.PulseIdRangeQuery;
import ch.psi.daq.domain.query.range.TimeRangeQuery;
import ch.psi.daq.domain.reader.MetaStreamEventQuery;
import ch.psi.daq.domain.reader.StreamEventReader;
import ch.psi.daq.domain.test.backend.TestBackendAccess;
import ch.psi.daq.domain.test.gen.TestDataGen;
import ch.psi.daq.domain.utils.PropertiesUtils;
public abstract class AbstractStreamEventReader implements StreamEventReader<ChannelEvent> {
private static final Random random = new Random(0);
private static final int KEYSPACE = 1;
private List<String> channels;
private AtomicLong channelNameCallCounter = new AtomicLong();
private TestDataGen dataGen;
private Backend backend;
private String testChannelName;
@Resource(name = DomainConfig.BEAN_NAME_TEST_BACKEND_ACCESS)
private TestBackendAccess testBackendAccess;
public AbstractStreamEventReader() {
this.channels = Lists.newArrayList(
"BoolScalar",
"BoolWaveform",
"Int8Scalar",
"Int8Waveform",
"UInt8Scalar",
"UInt8Waveform",
"Int16Scalar",
"Int16Waveform",
"UInt16Scalar",
"UInt16Waveform",
"Int32Scalar",
"Int32Waveform",
"UInt32Scalar",
"UInt32Waveform",
"Int64Scalar",
"Int64Waveform",
"UInt64Scalar",
"UInt64Waveform",
"Float32Scalar",
"Float32Waveform",
"Float64Scalar",
"Float64Waveform",
"StringScalar");
}
protected void init(Backend backend){
this.backend = backend;
this.dataGen = testBackendAccess.getTestDataGen(backend);
this.testChannelName = backend.getKey() + "_TestChannel_";
}
@Override
public Backend getBackend() {
return backend;
}
/**
* @{inheritDoc
*/
@Override
public Stream<String> getChannelStream(String regex) {
channels.add(testChannelName + channelNameCallCounter.incrementAndGet());
Stream<String> channelStream = channels.stream();
if (regex != null) {
Pattern pattern = Pattern.compile(regex.toLowerCase());
channelStream = channelStream.filter(channel -> pattern.matcher(channel.toLowerCase()).find());
}
return channelStream;
}
// @Override
// public Stream<ChannelEvent> getEventStream(PulseIdRangeQuery query) {
// return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(),
// query.getEventColumns())
// .filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER));
// }
@Override
public Stream<ChannelEvent> getEventStream(TimeRangeQuery query) {
return getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10)
.filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER));
}
public Stream<ChannelEvent> getEventStream(EventQuery eventQuery, Stream<? extends StreamEventQuery> queryProviders) {
Stream<ChannelEvent> result = queryProviders.map(ceq -> {
if (ceq instanceof MetaStreamEventQuery) {
return getEvent((MetaStreamEventQuery<ChannelEvent>) ceq);
} else {
throw new UnsupportedOperationException("This is not yet implemented!");
}
});
return result;
}
public static Stream<ChannelEvent> getDummyEventStream(String channelParam, long startIndex, long endIndex,
String... columns) {
String channelLower = channelParam.toLowerCase();
String channel =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam
: null;
Stream<Long> rangeStream;
if (channelParam.contains("[") && channelParam.contains("]")) {
rangeStream =
Arrays.stream(
channelParam.substring(
channelParam.indexOf("[") + 1,
channelParam.indexOf("]"))
.split(",")
)
.map(str -> str.trim())
.map(Long::parseLong);
} else {
rangeStream = LongStream.rangeClosed(startIndex, endIndex).boxed();
}
Stream<ChannelEvent> eventStream =
rangeStream.map(
i -> {
BigDecimal iocTime =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0)
: PropertiesUtils.DEFAULT_VALUE_DECIMAL;
BigDecimal globalTime =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0)
: PropertiesUtils.DEFAULT_VALUE_DECIMAL;
long pulseId =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_PULSE_ID)) ? i : PropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE;
if (channelLower.contains("waveform")) {
long[] value = random.longs(8).toArray();
value[0] = i;
value[1] = i;
return new ChannelEventImpl(
channel,
iocTime,
pulseId,
globalTime,
KEYSPACE,
value
);
} else if (channelLower.contains("image")) {
int x = 4;
int y = 8;
int[] shape = new int[] {x, y};
long[] value = random.longs(x * y).toArray();
value[0] = i;
value[1] = i;
return new ChannelEventImpl(
channel,
iocTime,
pulseId,
globalTime,
KEYSPACE,
value,
shape
);
} else {
return new ChannelEventImpl(
channel,
iocTime,
pulseId,
globalTime,
KEYSPACE,
i
);
}
});
return eventStream;
}
private List<? extends DataEvent> getDummyEvents(String channel, long startIndex, long endIndex, String... columns) {
return getDummyEventStream(channel, startIndex, endIndex, columns).collect(Collectors.toList());
}
/**
* @{inheritDoc
*/
@Override
public List<String> getChannels() {
return Lists.newArrayList(channels);
}
/**
* @{inheritDoc
*/
@Override
public List<String> getChannels(String regex) {
return Lists.newArrayList(channels).stream().filter(s -> {
return s.contains(regex);
}).collect(Collectors.toList());
}
/**
* @{inheritDoc
*/
@Override
public ChannelEvent getEvent(MetaStreamEventQuery<ChannelEvent> queryInfo, String... columns) {
if (queryInfo.getPulseId() > 0) {
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId(),
columns)
.get(0);
}
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getGlobalMillis() / 10,
queryInfo.getGlobalMillis() / 10).get(0);
}
/**
* @{inheritDoc
*/
@Override
public CompletableFuture<ChannelEvent> getEventAsync(MetaStreamEventQuery<ChannelEvent> queryInfo, String... columns) {
// implement when needed
throw new UnsupportedOperationException();
}
// /**
// * @{inheritDoc
// */
// @Override
// public Stream<? extends StreamEventQuery> getStreamEventQueryStream(PulseIdRangeQuery query) {
//
// return dataGen.generateMetaPulseId(
// query.getStartPulseId(),
// (query.getEndPulseId() - query.getStartPulseId() + 1),
// i -> i * 10,
// i -> 0,
// i -> i,
// query.getChannel())
// .stream()
// .map(metaPulse -> {
// metaPulse.setKeyspace(KEYSPACE);
// return metaPulse;
// });
// }
public Stream<? extends StreamEventQuery> getStreamEventQueryStream(TimeRangeQuery query) {
return dataGen.generateMetaTime(
KEYSPACE,
3600,
query.getStartMillis() / 10,
((query.getEndMillis() - query.getStartMillis()) / 10 + 1),
i -> i * 10,
i -> 0,
i -> i,
query.getChannel()).stream();
}
// /**
// * @{inheritDoc
// */
// @Override
// public Stream<MetaPulseId> getMetaStream(PulseIdRangeQuery query) {
//
// return getStreamEventQueryStream(query).map(r -> {
// return (MetaPulseId) r;
// });
//
// }
/**
* @{inheritDoc
*/
@Override
public Stream<? extends MetaStreamEventQuery<ChannelEvent>> getMetaStream(TimeRangeQuery query) {
return getStreamEventQueryStream(query).map(r -> {
return (MetaStreamEventQuery<ChannelEvent>) r;
});
}
@Override
public Stream<ChannelEvent> getEventStream(Stream<? extends MetaStreamEventQuery<ChannelEvent>> queryInfos,
String... columns) {
return getEventStream(null, queryInfos);
}
@Override
public Stream<ChannelConfiguration> getChannelConfiguration(TimeRangeQuery query) {
List<ChannelConfiguration> configs = new ArrayList<>();
BigDecimal time = query.getStartTime();
configs.add(
new ChannelConfigurationImpl(
query.getChannel(),
time,
TimeUtils.getMillis(time) / 10,
0,
Type.Int32.getKey(),
new int[] {1},
false,
ChannelConfiguration.DEFAULT_LOCAL_WRITE,
ChannelConfiguration.DEFAULT_BIN_SIZE_IN_MILLIS,
ChannelConfiguration.SPLIT_COUNT,
ChannelConfiguration.DEFAULT_SOURCE,
ChannelConfiguration.DEFAULT_MODULO,
ChannelConfiguration.DEFAULT_OFFSET,
Backend.SF_DATABUFFER));
if (query.getEndMillis() > query.getStartMillis()) {
time = query.getEndTime();
configs.add(
new ChannelConfigurationImpl(
query.getChannel(),
time,
TimeUtils.getMillis(time) / 10,
1,
Type.Int32.getKey(),
new int[] {1},
false,
ChannelConfiguration.DEFAULT_LOCAL_WRITE,
ChannelConfiguration.DEFAULT_BIN_SIZE_IN_MILLIS,
ChannelConfiguration.SPLIT_COUNT,
ChannelConfiguration.DEFAULT_SOURCE,
ChannelConfiguration.DEFAULT_MODULO,
ChannelConfiguration.DEFAULT_OFFSET,
Backend.SF_DATABUFFER));
}
if (Ordering.desc.equals(query.getOrdering())) {
Collections.reverse(configs);
}
return configs.stream();
}
@Override
public TimeRangeQuery getTimeRangeQuery(PulseIdRangeQuery query) {
return new TimeRangeQuery(
TimeUtils.getTimeFromMillis(query.getStartPulseId() * 10, 0),
TimeUtils.getTimeFromMillis(query.getEndPulseId() * 10, 0),
query);
}
@Override
public Stream<? extends ChannelInfo> getChannelInfoStream(TimeRangeQuery query) {
return getChannelConfiguration(query)
.map(channelConfiguration -> new ChannelInfoImpl(channelConfiguration));
}
@Override
public void truncateCache() {}
}

View File

@ -16,10 +16,10 @@ import ch.psi.daq.domain.query.range.TimeRangeQuery;
import ch.psi.daq.domain.reader.DataReader; import ch.psi.daq.domain.reader.DataReader;
public class DummyArchiverApplianceReader implements DataReader { public class DummyArchiverApplianceReader implements DataReader {
public static final String ARCHIVER_TEST_CHANNEL = "ArchiverTestChannel_"; public static final String TEST_CHANNEL = Backend.SF_ARCHIVERAPPLIANCE.getKey() + "_TestChannel_";
public static final String TEST_CHANNEL_1 = "ArchiverChannel_1"; public static final String TEST_CHANNEL_1 = Backend.SF_ARCHIVERAPPLIANCE.getKey() + "_Channel_1";
public static final String TEST_CHANNEL_2 = "ArchiverChannel_2"; public static final String TEST_CHANNEL_2 = Backend.SF_ARCHIVERAPPLIANCE.getKey() + "_Channel_2";
private List<String> channels = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2); private List<String> channels = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2);
private AtomicLong channelNameCallCounter = new AtomicLong(); private AtomicLong channelNameCallCounter = new AtomicLong();
@ -32,7 +32,7 @@ public class DummyArchiverApplianceReader implements DataReader {
@Override @Override
public Stream<String> getChannelStream(String regex) { public Stream<String> getChannelStream(String regex) {
channels.add(ARCHIVER_TEST_CHANNEL + channelNameCallCounter.incrementAndGet()); channels.add(TEST_CHANNEL + channelNameCallCounter.incrementAndGet());
Stream<String> channelStream = channels.stream(); Stream<String> channelStream = channels.stream();
if (regex != null) { if (regex != null) {
@ -67,6 +67,5 @@ public class DummyArchiverApplianceReader implements DataReader {
} }
@Override @Override
public void truncateCache() { public void truncateCache() {}
}
} }

View File

@ -1,378 +1,27 @@
package ch.psi.daq.test.queryrest.query; package ch.psi.daq.test.queryrest.query;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import ch.psi.bsread.message.Type;
import ch.psi.daq.cassandra.reader.CassandraReader; import ch.psi.daq.cassandra.reader.CassandraReader;
import ch.psi.daq.common.ordering.Ordering;
import ch.psi.daq.common.time.TimeUtils;
import ch.psi.daq.domain.DataEvent;
import ch.psi.daq.domain.FieldNames;
import ch.psi.daq.domain.backend.Backend; import ch.psi.daq.domain.backend.Backend;
import ch.psi.daq.domain.config.DomainConfig;
import ch.psi.daq.domain.events.ChannelConfiguration; import ch.psi.daq.domain.events.ChannelConfiguration;
import ch.psi.daq.domain.events.ChannelEvent;
import ch.psi.daq.domain.events.impl.ChannelConfigurationImpl;
import ch.psi.daq.domain.events.impl.ChannelEventImpl;
import ch.psi.daq.domain.json.channels.info.ChannelInfo;
import ch.psi.daq.domain.json.channels.info.ChannelInfoImpl;
import ch.psi.daq.domain.query.event.EventQuery;
import ch.psi.daq.domain.query.event.StreamEventQuery;
import ch.psi.daq.domain.query.range.PulseIdRangeQuery;
import ch.psi.daq.domain.query.range.TimeRangeQuery; import ch.psi.daq.domain.query.range.TimeRangeQuery;
import ch.psi.daq.domain.reader.MetaStreamEventQuery;
import ch.psi.daq.domain.test.backend.TestBackendAccess;
import ch.psi.daq.domain.test.gen.TestDataGen;
import ch.psi.daq.domain.utils.PropertiesUtils;
public class DummyCassandraReader implements CassandraReader { public class DummyCassandraReader extends AbstractStreamEventReader implements CassandraReader {
private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class); private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class);
public static final String DATABUFFER_TEST_CHANNEL = "DataBufferTestChannel_";
private static final Random random = new Random(0);
private static final int KEYSPACE = 1;
private List<String> channels;
private AtomicLong channelNameCallCounter = new AtomicLong();
private TestDataGen dataGen;
private Backend backend = Backend.SF_DATABUFFER;
@Resource(name = DomainConfig.BEAN_NAME_TEST_BACKEND_ACCESS)
private TestBackendAccess testBackendAccess;
public DummyCassandraReader() { public DummyCassandraReader() {
this.channels = Lists.newArrayList(
"BoolScalar",
"BoolWaveform",
"Int8Scalar",
"Int8Waveform",
"UInt8Scalar",
"UInt8Waveform",
"Int16Scalar",
"Int16Waveform",
"UInt16Scalar",
"UInt16Waveform",
"Int32Scalar",
"Int32Waveform",
"UInt32Scalar",
"UInt32Waveform",
"Int64Scalar",
"Int64Waveform",
"UInt64Scalar",
"UInt64Waveform",
"Float32Scalar",
"Float32Waveform",
"Float64Scalar",
"Float64Waveform",
"StringScalar");
} }
@PostConstruct @PostConstruct
public void afterPropertiesSet() { public void afterPropertiesSet() {
dataGen = testBackendAccess.getTestDataGen(backend); init(Backend.SF_DATABUFFER);
}
@Override
public Backend getBackend() {
return backend;
}
/**
* @{inheritDoc
*/
@Override
public Stream<String> getChannelStream(String regex) {
channels.add(DATABUFFER_TEST_CHANNEL + channelNameCallCounter.incrementAndGet());
Stream<String> channelStream = channels.stream();
if (regex != null) {
Pattern pattern = Pattern.compile(regex.toLowerCase());
channelStream = channelStream.filter(channel -> pattern.matcher(channel.toLowerCase()).find());
}
return channelStream;
}
// @Override
// public Stream<ChannelEvent> getEventStream(PulseIdRangeQuery query) {
// return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(),
// query.getEventColumns())
// .filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER));
// }
@Override
public Stream<ChannelEvent> getEventStream(TimeRangeQuery query) {
return getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10)
.filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER));
}
public Stream<ChannelEvent> getEventStream(EventQuery eventQuery, Stream<? extends StreamEventQuery> queryProviders) {
Stream<ChannelEvent> result = queryProviders.map(ceq -> {
if (ceq instanceof MetaStreamEventQuery) {
return getEvent((MetaStreamEventQuery<ChannelEvent>) ceq);
} else {
throw new UnsupportedOperationException("This is not yet implemented!");
}
});
return result;
}
public static Stream<ChannelEvent> getDummyEventStream(String channelParam, long startIndex, long endIndex,
String... columns) {
String channelLower = channelParam.toLowerCase();
String channel =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam
: null;
Stream<Long> rangeStream;
if (channelParam.contains("[") && channelParam.contains("]")) {
rangeStream =
Arrays.stream(
channelParam.substring(
channelParam.indexOf("[") + 1,
channelParam.indexOf("]"))
.split(",")
)
.map(str -> str.trim())
.map(Long::parseLong);
} else {
rangeStream = LongStream.rangeClosed(startIndex, endIndex).boxed();
}
Stream<ChannelEvent> eventStream =
rangeStream.map(
i -> {
BigDecimal iocTime =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0)
: PropertiesUtils.DEFAULT_VALUE_DECIMAL;
BigDecimal globalTime =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0)
: PropertiesUtils.DEFAULT_VALUE_DECIMAL;
long pulseId =
(columns == null || columns.length == 0 || ArrayUtils.contains(columns,
FieldNames.FIELD_PULSE_ID)) ? i : PropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE;
if (channelLower.contains("waveform")) {
long[] value = random.longs(8).toArray();
value[0] = i;
value[1] = i;
return new ChannelEventImpl(
channel,
iocTime,
pulseId,
globalTime,
KEYSPACE,
value
);
} else if (channelLower.contains("image")) {
int x = 4;
int y = 8;
int[] shape = new int[] {x, y};
long[] value = random.longs(x * y).toArray();
value[0] = i;
value[1] = i;
return new ChannelEventImpl(
channel,
iocTime,
pulseId,
globalTime,
KEYSPACE,
value,
shape
);
} else {
return new ChannelEventImpl(
channel,
iocTime,
pulseId,
globalTime,
KEYSPACE,
i
);
}
});
return eventStream;
}
private List<? extends DataEvent> getDummyEvents(String channel, long startIndex, long endIndex, String... columns) {
return getDummyEventStream(channel, startIndex, endIndex, columns).collect(Collectors.toList());
}
/**
* @{inheritDoc
*/
@Override
public List<String> getChannels() {
return Lists.newArrayList(channels);
}
/**
* @{inheritDoc
*/
@Override
public List<String> getChannels(String regex) {
return Lists.newArrayList(channels).stream().filter(s -> {
return s.contains(regex);
}).collect(Collectors.toList());
}
/**
* @{inheritDoc
*/
@Override
public ChannelEvent getEvent(MetaStreamEventQuery<ChannelEvent> queryInfo, String... columns) {
if (queryInfo.getPulseId() > 0) {
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId(),
columns)
.get(0);
}
return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getGlobalMillis() / 10,
queryInfo.getGlobalMillis() / 10).get(0);
}
/**
* @{inheritDoc
*/
@Override
public CompletableFuture<ChannelEvent> getEventAsync(MetaStreamEventQuery<ChannelEvent> queryInfo, String... columns) {
// implement when needed
throw new UnsupportedOperationException();
}
// /**
// * @{inheritDoc
// */
// @Override
// public Stream<? extends StreamEventQuery> getStreamEventQueryStream(PulseIdRangeQuery query) {
//
// return dataGen.generateMetaPulseId(
// query.getStartPulseId(),
// (query.getEndPulseId() - query.getStartPulseId() + 1),
// i -> i * 10,
// i -> 0,
// i -> i,
// query.getChannel())
// .stream()
// .map(metaPulse -> {
// metaPulse.setKeyspace(KEYSPACE);
// return metaPulse;
// });
// }
public Stream<? extends StreamEventQuery> getStreamEventQueryStream(TimeRangeQuery query) {
return dataGen.generateMetaTime(
KEYSPACE,
3600,
query.getStartMillis() / 10,
((query.getEndMillis() - query.getStartMillis()) / 10 + 1),
i -> i * 10,
i -> 0,
i -> i,
query.getChannel()).stream();
}
// /**
// * @{inheritDoc
// */
// @Override
// public Stream<MetaPulseId> getMetaStream(PulseIdRangeQuery query) {
//
// return getStreamEventQueryStream(query).map(r -> {
// return (MetaPulseId) r;
// });
//
// }
/**
* @{inheritDoc
*/
@Override
public Stream<? extends MetaStreamEventQuery<ChannelEvent>> getMetaStream(TimeRangeQuery query) {
return getStreamEventQueryStream(query).map(r -> {
return (MetaStreamEventQuery<ChannelEvent>) r;
});
}
@Override
public Stream<ChannelEvent> getEventStream(Stream<? extends MetaStreamEventQuery<ChannelEvent>> queryInfos) {
return getEventStream(null, queryInfos);
}
@Override
public Stream<ChannelConfiguration> getChannelConfiguration(TimeRangeQuery query) {
List<ChannelConfiguration> configs = new ArrayList<>();
BigDecimal time = query.getStartTime();
configs.add(
new ChannelConfigurationImpl(
query.getChannel(),
time,
TimeUtils.getMillis(time) / 10,
0,
Type.Int32.getKey(),
new int[] {1},
false,
ChannelConfiguration.DEFAULT_LOCAL_WRITE,
ChannelConfiguration.DEFAULT_BIN_SIZE_IN_MILLIS,
ChannelConfiguration.SPLIT_COUNT,
ChannelConfiguration.DEFAULT_SOURCE,
ChannelConfiguration.DEFAULT_MODULO,
ChannelConfiguration.DEFAULT_OFFSET,
Backend.SF_DATABUFFER));
if (query.getEndMillis() > query.getStartMillis()) {
time = query.getEndTime();
configs.add(
new ChannelConfigurationImpl(
query.getChannel(),
time,
TimeUtils.getMillis(time) / 10,
1,
Type.Int32.getKey(),
new int[] {1},
false,
ChannelConfiguration.DEFAULT_LOCAL_WRITE,
ChannelConfiguration.DEFAULT_BIN_SIZE_IN_MILLIS,
ChannelConfiguration.SPLIT_COUNT,
ChannelConfiguration.DEFAULT_SOURCE,
ChannelConfiguration.DEFAULT_MODULO,
ChannelConfiguration.DEFAULT_OFFSET,
Backend.SF_DATABUFFER));
}
if(Ordering.desc.equals(query.getOrdering())){
Collections.reverse(configs);
}
return configs.stream();
} }
@Override @Override
@ -408,22 +57,4 @@ public class DummyCassandraReader implements CassandraReader {
// implement when needed // implement when needed
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public TimeRangeQuery getTimeRangeQuery(PulseIdRangeQuery query) {
return new TimeRangeQuery(
TimeUtils.getTimeFromMillis(query.getStartPulseId() * 10, 0),
TimeUtils.getTimeFromMillis(query.getEndPulseId() * 10, 0),
query);
}
@Override
public Stream<? extends ChannelInfo> getChannelInfoStream(TimeRangeQuery query) {
return getChannelConfiguration(query)
.map(channelConfiguration -> new ChannelInfoImpl(channelConfiguration));
}
@Override
public void truncateCache() {
}
} }

View File

@ -0,0 +1,16 @@
package ch.psi.daq.test.queryrest.query;
import javax.annotation.PostConstruct;
import ch.psi.daq.domain.backend.Backend;
public class DummyFilestorageReader extends AbstractStreamEventReader {
public DummyFilestorageReader() {
}
@PostConstruct
public void afterPropertiesSet() {
init(Backend.SF_IMAGESTORAGE);
}
}

View File

@ -0,0 +1 @@
backend.default=sf-databuffer