From 80ffb88946be527ab8286b3bb84e16953ee0a4bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=A4rki?= Date: Fri, 16 Sep 2016 18:01:39 +0200 Subject: [PATCH] Filestorage first incomplete implementation. --- .../queryrest/config/DaqWebMvcConfig.java | 13 + .../QueryRestControllerChannelInfoTest.java | 57 +-- .../QueryRestControllerJsonTest.java | 92 +++-- .../query/AbstractStreamEventReader.java | 391 ++++++++++++++++++ .../query/DummyArchiverApplianceReader.java | 13 +- .../queryrest/query/DummyCassandraReader.java | 375 +---------------- .../query/DummyFilestorageReader.java | 16 + src/test/resources/queryrest-test.properties | 1 + 8 files changed, 514 insertions(+), 444 deletions(-) create mode 100644 src/test/java/ch/psi/daq/test/queryrest/query/AbstractStreamEventReader.java create mode 100644 src/test/java/ch/psi/daq/test/queryrest/query/DummyFilestorageReader.java diff --git a/src/test/java/ch/psi/daq/test/queryrest/config/DaqWebMvcConfig.java b/src/test/java/ch/psi/daq/test/queryrest/config/DaqWebMvcConfig.java index 768b5d0..65dc612 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/config/DaqWebMvcConfig.java +++ b/src/test/java/ch/psi/daq/test/queryrest/config/DaqWebMvcConfig.java @@ -19,13 +19,17 @@ import ch.psi.daq.cassandra.reader.CassandraReader; import ch.psi.daq.domain.backend.Backend; import ch.psi.daq.domain.backend.BackendAccess; import ch.psi.daq.domain.config.DomainConfig; +import ch.psi.daq.domain.events.ChannelEvent; import ch.psi.daq.domain.query.processor.QueryProcessor; import ch.psi.daq.domain.reader.DataReader; +import ch.psi.daq.domain.reader.StreamEventReader; +import ch.psi.daq.filestorage.config.FileStorageConfig; import ch.psi.daq.query.config.QueryConfig; import ch.psi.daq.query.processor.QueryProcessorLocal; import ch.psi.daq.test.query.config.LocalQueryTestConfig; import ch.psi.daq.test.queryrest.query.DummyArchiverApplianceReader; import ch.psi.daq.test.queryrest.query.DummyCassandraReader; +import ch.psi.daq.test.queryrest.query.DummyFilestorageReader; @Configuration @ComponentScan @@ -48,6 +52,9 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport { public void afterPropertiesSet() { backendAccess.addStreamEventReaderSupplier(Backend.SF_DATABUFFER, () -> cassandraReader()); backendAccess.addChannelInfoReaderSupplier(Backend.SF_DATABUFFER, () -> cassandraReader()); + + backendAccess.addStreamEventReaderSupplier(Backend.SF_IMAGESTORAGE, () -> filestorageReader()); + backendAccess.addChannelInfoReaderSupplier(Backend.SF_IMAGESTORAGE, () -> filestorageReader()); backendAccess.addDataReaderSupplier(Backend.SF_ARCHIVERAPPLIANCE, () -> archiverApplianceReader()); } @@ -64,6 +71,12 @@ public class DaqWebMvcConfig extends WebMvcConfigurationSupport { public CassandraReader cassandraReader() { return new DummyCassandraReader(); } + + @Bean(name = FileStorageConfig.BEAN_NAME_FILESTORAGE_READER) + @Lazy + public StreamEventReader filestorageReader() { + return new DummyFilestorageReader(); + } @Bean(name = ArchiverApplianceConfig.BEAN_NAME_ARCHIVER_APPLIANCE_READER) @Lazy diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerChannelInfoTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerChannelInfoTest.java index ab3bab3..e6cbe35 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerChannelInfoTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerChannelInfoTest.java @@ -6,6 +6,8 @@ import static org.junit.Assert.assertEquals; import java.util.List; import java.util.stream.Collectors; +import javax.annotation.Resource; + import org.junit.After; import org.junit.Test; import org.springframework.http.MediaType; @@ -34,6 +36,9 @@ import ch.psi.daq.test.queryrest.AbstractDaqRestTest; public class QueryRestControllerChannelInfoTest extends AbstractDaqRestTest { private ObjectMapper objectMapper = new ObjectMapper(); + + @Resource(name = DomainConfig.BEAN_NAME_BACKEND_DEFAULT) + private Backend backend; @After public void tearDown() throws Exception {} @@ -44,7 +49,7 @@ public class QueryRestControllerChannelInfoTest extends AbstractDaqRestTest { new RequestRangePulseId( 100, 101), - "DataBuffer1", "DataBuffer2"); + backend.getKey() + "1", backend.getKey() + "2"); String content = mapper.writeValueAsString(query); System.out.println(content); @@ -64,40 +69,40 @@ public class QueryRestControllerChannelInfoTest extends AbstractDaqRestTest { List infosList = objectMapper.readValue(response, ChannelInfosList.class); assertEquals(2, infosList.size()); ChannelInfos cInfos = infosList.get(0); - assertEquals("DataBuffer1", cInfos.getChannel().getName()); - assertEquals(Backend.SF_DATABUFFER, cInfos.getChannel().getBackend()); + assertEquals(backend.getKey() + "1", cInfos.getChannel().getName()); + assertEquals(backend, cInfos.getChannel().getBackend()); List infos = cInfos.getChannelInfos().collect(Collectors.toList()); assertEquals(2, infos.size()); ChannelInfo info = infos.get(0); - assertEquals("DataBuffer1", info.getChannel()); - assertEquals(Backend.SF_DATABUFFER, info.getBackend()); + assertEquals(backend.getKey() + "1", info.getChannel()); + assertEquals(backend, info.getBackend()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime()); assertEquals(query.getRange().getStartPulseId(), info.getPulseId()); assertArrayEquals(new int[] {1}, info.getShape()); assertEquals(Type.Int32.getKey(), info.getType()); info = infos.get(1); - assertEquals("DataBuffer1", info.getChannel()); - assertEquals(Backend.SF_DATABUFFER, info.getBackend()); + assertEquals(backend.getKey() + "1", info.getChannel()); + assertEquals(backend, info.getBackend()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime()); assertEquals(query.getRange().getEndPulseId(), info.getPulseId()); assertArrayEquals(new int[] {1}, info.getShape()); assertEquals(Type.Int32.getKey(), info.getType()); cInfos = infosList.get(1); - assertEquals("DataBuffer2", cInfos.getChannel().getName()); - assertEquals(Backend.SF_DATABUFFER, cInfos.getChannel().getBackend()); + assertEquals(backend.getKey() + "2", cInfos.getChannel().getName()); + assertEquals(backend, cInfos.getChannel().getBackend()); infos = cInfos.getChannelInfos().collect(Collectors.toList()); assertEquals(2, infos.size()); info = infos.get(0); - assertEquals("DataBuffer2", info.getChannel()); - assertEquals(Backend.SF_DATABUFFER, info.getBackend()); + assertEquals(backend.getKey() + "2", info.getChannel()); + assertEquals(backend, info.getBackend()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime()); assertEquals(query.getRange().getStartPulseId(), info.getPulseId()); assertArrayEquals(new int[] {1}, info.getShape()); assertEquals(Type.Int32.getKey(), info.getType()); info = infos.get(1); - assertEquals("DataBuffer2", info.getChannel()); - assertEquals(Backend.SF_DATABUFFER, info.getBackend()); + assertEquals(backend.getKey() + "2", info.getChannel()); + assertEquals(backend, info.getBackend()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime()); assertEquals(query.getRange().getEndPulseId(), info.getPulseId()); assertArrayEquals(new int[] {1}, info.getShape()); @@ -110,7 +115,7 @@ public class QueryRestControllerChannelInfoTest extends AbstractDaqRestTest { new RequestRangePulseId( 100, 101), - "DataBuffer1", "DataBuffer2"); + backend.getKey() + "1", backend.getKey() + "2"); query.setOrdering(Ordering.desc); String content = mapper.writeValueAsString(query); @@ -131,40 +136,40 @@ public class QueryRestControllerChannelInfoTest extends AbstractDaqRestTest { List infosList = objectMapper.readValue(response, ChannelInfosList.class); assertEquals(2, infosList.size()); ChannelInfos cInfos = infosList.get(0); - assertEquals("DataBuffer1", cInfos.getChannel().getName()); - assertEquals(Backend.SF_DATABUFFER, cInfos.getChannel().getBackend()); + assertEquals(backend.getKey() + "1", cInfos.getChannel().getName()); + assertEquals(backend, cInfos.getChannel().getBackend()); List infos = cInfos.getChannelInfos().collect(Collectors.toList()); assertEquals(2, infos.size()); ChannelInfo info = infos.get(0); - assertEquals("DataBuffer1", info.getChannel()); - assertEquals(Backend.SF_DATABUFFER, info.getBackend()); + assertEquals(backend.getKey() + "1", info.getChannel()); + assertEquals(backend, info.getBackend()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime()); assertEquals(query.getRange().getEndPulseId(), info.getPulseId()); assertArrayEquals(new int[] {1}, info.getShape()); assertEquals(Type.Int32.getKey(), info.getType()); info = infos.get(1); - assertEquals("DataBuffer1", info.getChannel()); - assertEquals(Backend.SF_DATABUFFER, info.getBackend()); + assertEquals(backend.getKey() + "1", info.getChannel()); + assertEquals(backend, info.getBackend()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime()); assertEquals(query.getRange().getStartPulseId(), info.getPulseId()); assertArrayEquals(new int[] {1}, info.getShape()); assertEquals(Type.Int32.getKey(), info.getType()); cInfos = infosList.get(1); - assertEquals("DataBuffer2", cInfos.getChannel().getName()); - assertEquals(Backend.SF_DATABUFFER, cInfos.getChannel().getBackend()); + assertEquals(backend.getKey() + "2", cInfos.getChannel().getName()); + assertEquals(backend, cInfos.getChannel().getBackend()); infos = cInfos.getChannelInfos().collect(Collectors.toList()); assertEquals(2, infos.size()); info = infos.get(0); - assertEquals("DataBuffer2", info.getChannel()); - assertEquals(Backend.SF_DATABUFFER, info.getBackend()); + assertEquals(backend.getKey() + "2", info.getChannel()); + assertEquals(backend, info.getBackend()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getEndPulseId() * 10, 0), info.getGlobalTime()); assertEquals(query.getRange().getEndPulseId(), info.getPulseId()); assertArrayEquals(new int[] {1}, info.getShape()); assertEquals(Type.Int32.getKey(), info.getType()); info = infos.get(1); - assertEquals("DataBuffer2", info.getChannel()); - assertEquals(Backend.SF_DATABUFFER, info.getBackend()); + assertEquals(backend.getKey() + "2", info.getChannel()); + assertEquals(backend, info.getBackend()); assertEquals(TimeUtils.getTimeFromMillis(query.getRange().getStartPulseId() * 10, 0), info.getGlobalTime()); assertEquals(query.getRange().getStartPulseId(), info.getPulseId()); assertArrayEquals(new int[] {1}, info.getShape()); diff --git a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java index 4262c9f..524ce83 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java +++ b/src/test/java/ch/psi/daq/test/queryrest/controller/QueryRestControllerJsonTest.java @@ -2,6 +2,8 @@ package ch.psi.daq.test.queryrest.controller; import java.util.Arrays; +import javax.annotation.Resource; + import org.junit.After; import org.junit.Test; import org.springframework.http.MediaType; @@ -43,6 +45,9 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { public static final String TEST_CHANNEL_WAVEFORM_01 = "testChannelWaveform1"; public static final String[] TEST_CHANNEL_NAMES = new String[] {TEST_CHANNEL_01, TEST_CHANNEL_02}; + @Resource(name = DomainConfig.BEAN_NAME_BACKEND_DEFAULT) + private Backend backend; + @After public void tearDown() throws Exception {} @@ -64,16 +69,16 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("BoolScalar")) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[1]").value("BoolWaveform")) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name())) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[0]").exists()) - // .andExpect( - // MockMvcResultMatchers.jsonPath("$[1].channels[0]").value(DummyArchiverApplianceReader.TEST_CHANNEL_1)) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[1]").exists()) - // .andExpect( - // MockMvcResultMatchers.jsonPath("$[1].channels[1]").value(DummyArchiverApplianceReader.TEST_CHANNEL_2)) - ; + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].backend").value(Backend.SF_IMAGESTORAGE.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[0]").value("BoolScalar")) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[1]").value("BoolWaveform")); } @@ -97,16 +102,25 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[2]").value("UInt32Scalar")) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[3]").value("UInt32Waveform")) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name())) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[0]").doesNotExist()) - ; + .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].backend").value(Backend.SF_IMAGESTORAGE.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[0]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[0]").value("Int32Scalar")) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[1]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[1]").value("Int32Waveform")) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[2]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[2]").value("UInt32Scalar")) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[3]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[3]").value("UInt32Waveform")); } @Test public void testChannelNameQueryBackendOrder() throws Exception { - ChannelsRequest request = new ChannelsRequest("int64", Ordering.desc, Backend.SF_DATABUFFER); + ChannelsRequest request = new ChannelsRequest("int64", Ordering.desc, backend); String content = mapper.writeValueAsString(request); System.out.println(content); @@ -120,7 +134,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.status().isOk()) .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) - .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.SF_DATABUFFER.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(backend.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[0]").value("UInt64Waveform")) @@ -152,13 +166,13 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$[0].backend").value(Backend.SF_DATABUFFER.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[23]").exists()) - .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[25]").doesNotExist()) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name())) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[2]").exists()) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[3]").doesNotExist()) - ; + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[24]").doesNotExist()) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].backend").value(Backend.SF_IMAGESTORAGE.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[23]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[24]").doesNotExist()); // each reload add another channel request.setReload(true); @@ -179,12 +193,12 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[24]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channels[25]").doesNotExist()) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.archiverappliance.name())) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[3]").exists()) - // .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels[4]").doesNotExist()) - ; + .andExpect(MockMvcResultMatchers.jsonPath("$[1].backend").value(Backend.SF_ARCHIVERAPPLIANCE.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].backend").value(Backend.SF_IMAGESTORAGE.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels").isArray()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[24]").exists()) + .andExpect(MockMvcResultMatchers.jsonPath("$[2].channels[25]").doesNotExist()); } @Test @@ -335,7 +349,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { new RequestRangePulseId( 100, 101), - new ChannelName(TEST_CHANNEL_01, Backend.SF_DATABUFFER), + new ChannelName(TEST_CHANNEL_01, backend), new ChannelName(TEST_CHANNEL_02, Backend.SF_ARCHIVERAPPLIANCE)); String content = mapper.writeValueAsString(request); @@ -483,7 +497,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { String response = result.getResponse().getContentAsString(); System.out.println("Response: " + response); - + this.mockMvc.perform(MockMvcRequestBuilders .post(DomainConfig.PATH_QUERY) .contentType(MediaType.APPLICATION_JSON) @@ -539,7 +553,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01)) - .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value( @@ -549,7 +563,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { TestTimeUtils.getTimeStr(1, 10000000))) .andExpect(MockMvcResultMatchers.jsonPath("$[1]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.name").value(TEST_CHANNEL_02)) - .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.backend").value("sf-databuffer")) + .andExpect(MockMvcResultMatchers.jsonPath("$[1].channel.backend").value(backend.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[1].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].pulseId").value(100)) .andExpect(MockMvcResultMatchers.jsonPath("$[1].data[0].globalSeconds").value( @@ -630,7 +644,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01)) - .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value( @@ -682,7 +696,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01)) - .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value( @@ -750,7 +764,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_01)) - .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(1000)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value( @@ -866,7 +880,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_WAVEFORM_01)) - .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value( @@ -927,7 +941,7 @@ public class QueryRestControllerJsonTest extends AbstractDaqRestTest { .andExpect(MockMvcResultMatchers.jsonPath("$[0]").exists()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel").isMap()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.name").value(TEST_CHANNEL_WAVEFORM_01)) - .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(Backend.SF_DATABUFFER.getKey())) + .andExpect(MockMvcResultMatchers.jsonPath("$[0].channel.backend").value(backend.getKey())) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data").isArray()) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].pulseId").value(100)) .andExpect(MockMvcResultMatchers.jsonPath("$[0].data[0].globalSeconds").value( diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/AbstractStreamEventReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/AbstractStreamEventReader.java new file mode 100644 index 0000000..a78f7ec --- /dev/null +++ b/src/test/java/ch/psi/daq/test/queryrest/query/AbstractStreamEventReader.java @@ -0,0 +1,391 @@ +package ch.psi.daq.test.queryrest.query; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +import javax.annotation.Resource; + +import org.apache.commons.lang3.ArrayUtils; + +import com.google.common.collect.Lists; + +import ch.psi.bsread.message.Type; +import ch.psi.daq.common.ordering.Ordering; +import ch.psi.daq.common.time.TimeUtils; +import ch.psi.daq.domain.DataEvent; +import ch.psi.daq.domain.FieldNames; +import ch.psi.daq.domain.backend.Backend; +import ch.psi.daq.domain.config.DomainConfig; +import ch.psi.daq.domain.events.ChannelConfiguration; +import ch.psi.daq.domain.events.ChannelEvent; +import ch.psi.daq.domain.events.impl.ChannelConfigurationImpl; +import ch.psi.daq.domain.events.impl.ChannelEventImpl; +import ch.psi.daq.domain.json.channels.info.ChannelInfo; +import ch.psi.daq.domain.json.channels.info.ChannelInfoImpl; +import ch.psi.daq.domain.query.event.EventQuery; +import ch.psi.daq.domain.query.event.StreamEventQuery; +import ch.psi.daq.domain.query.range.PulseIdRangeQuery; +import ch.psi.daq.domain.query.range.TimeRangeQuery; +import ch.psi.daq.domain.reader.MetaStreamEventQuery; +import ch.psi.daq.domain.reader.StreamEventReader; +import ch.psi.daq.domain.test.backend.TestBackendAccess; +import ch.psi.daq.domain.test.gen.TestDataGen; +import ch.psi.daq.domain.utils.PropertiesUtils; + +public abstract class AbstractStreamEventReader implements StreamEventReader { + private static final Random random = new Random(0); + private static final int KEYSPACE = 1; + private List channels; + private AtomicLong channelNameCallCounter = new AtomicLong(); + + private TestDataGen dataGen; + private Backend backend; + private String testChannelName; + + @Resource(name = DomainConfig.BEAN_NAME_TEST_BACKEND_ACCESS) + private TestBackendAccess testBackendAccess; + + + public AbstractStreamEventReader() { + this.channels = Lists.newArrayList( + "BoolScalar", + "BoolWaveform", + "Int8Scalar", + "Int8Waveform", + "UInt8Scalar", + "UInt8Waveform", + "Int16Scalar", + "Int16Waveform", + "UInt16Scalar", + "UInt16Waveform", + "Int32Scalar", + "Int32Waveform", + "UInt32Scalar", + "UInt32Waveform", + "Int64Scalar", + "Int64Waveform", + "UInt64Scalar", + "UInt64Waveform", + "Float32Scalar", + "Float32Waveform", + "Float64Scalar", + "Float64Waveform", + "StringScalar"); + } + + protected void init(Backend backend){ + this.backend = backend; + this.dataGen = testBackendAccess.getTestDataGen(backend); + this.testChannelName = backend.getKey() + "_TestChannel_"; + } + + @Override + public Backend getBackend() { + return backend; + } + + /** + * @{inheritDoc + */ + @Override + public Stream getChannelStream(String regex) { + channels.add(testChannelName + channelNameCallCounter.incrementAndGet()); + + Stream channelStream = channels.stream(); + if (regex != null) { + Pattern pattern = Pattern.compile(regex.toLowerCase()); + channelStream = channelStream.filter(channel -> pattern.matcher(channel.toLowerCase()).find()); + } + return channelStream; + } + + // @Override + // public Stream getEventStream(PulseIdRangeQuery query) { + // return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), + // query.getEventColumns()) + // .filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER)); + // } + + @Override + public Stream getEventStream(TimeRangeQuery query) { + return getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10) + .filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER)); + } + + public Stream getEventStream(EventQuery eventQuery, Stream queryProviders) { + Stream result = queryProviders.map(ceq -> { + if (ceq instanceof MetaStreamEventQuery) { + return getEvent((MetaStreamEventQuery) ceq); + } else { + throw new UnsupportedOperationException("This is not yet implemented!"); + } + }); + + return result; + } + + public static Stream getDummyEventStream(String channelParam, long startIndex, long endIndex, + String... columns) { + String channelLower = channelParam.toLowerCase(); + String channel = + (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam + : null; + + Stream rangeStream; + + if (channelParam.contains("[") && channelParam.contains("]")) { + rangeStream = + Arrays.stream( + channelParam.substring( + channelParam.indexOf("[") + 1, + channelParam.indexOf("]")) + .split(",") + ) + .map(str -> str.trim()) + .map(Long::parseLong); + } else { + rangeStream = LongStream.rangeClosed(startIndex, endIndex).boxed(); + } + + Stream eventStream = + rangeStream.map( + i -> { + BigDecimal iocTime = + (columns == null || columns.length == 0 || ArrayUtils.contains(columns, + FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) + : PropertiesUtils.DEFAULT_VALUE_DECIMAL; + BigDecimal globalTime = + (columns == null || columns.length == 0 || ArrayUtils.contains(columns, + FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) + : PropertiesUtils.DEFAULT_VALUE_DECIMAL; + long pulseId = + (columns == null || columns.length == 0 || ArrayUtils.contains(columns, + FieldNames.FIELD_PULSE_ID)) ? i : PropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE; + + if (channelLower.contains("waveform")) { + long[] value = random.longs(8).toArray(); + value[0] = i; + value[1] = i; + return new ChannelEventImpl( + channel, + iocTime, + pulseId, + globalTime, + KEYSPACE, + value + ); + + } else if (channelLower.contains("image")) { + int x = 4; + int y = 8; + int[] shape = new int[] {x, y}; + long[] value = random.longs(x * y).toArray(); + value[0] = i; + value[1] = i; + return new ChannelEventImpl( + channel, + iocTime, + pulseId, + globalTime, + KEYSPACE, + value, + shape + ); + } else { + return new ChannelEventImpl( + channel, + iocTime, + pulseId, + globalTime, + KEYSPACE, + i + ); + } + }); + + return eventStream; + } + + private List getDummyEvents(String channel, long startIndex, long endIndex, String... columns) { + return getDummyEventStream(channel, startIndex, endIndex, columns).collect(Collectors.toList()); + } + + /** + * @{inheritDoc + */ + @Override + public List getChannels() { + return Lists.newArrayList(channels); + } + + /** + * @{inheritDoc + */ + @Override + public List getChannels(String regex) { + return Lists.newArrayList(channels).stream().filter(s -> { + return s.contains(regex); + }).collect(Collectors.toList()); + } + + /** + * @{inheritDoc + */ + @Override + public ChannelEvent getEvent(MetaStreamEventQuery queryInfo, String... columns) { + if (queryInfo.getPulseId() > 0) { + return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId(), + columns) + .get(0); + } + return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getGlobalMillis() / 10, + queryInfo.getGlobalMillis() / 10).get(0); + } + + /** + * @{inheritDoc + */ + @Override + public CompletableFuture getEventAsync(MetaStreamEventQuery queryInfo, String... columns) { + // implement when needed + throw new UnsupportedOperationException(); + } + + // /** + // * @{inheritDoc + // */ + // @Override + // public Stream getStreamEventQueryStream(PulseIdRangeQuery query) { + // + // return dataGen.generateMetaPulseId( + // query.getStartPulseId(), + // (query.getEndPulseId() - query.getStartPulseId() + 1), + // i -> i * 10, + // i -> 0, + // i -> i, + // query.getChannel()) + // .stream() + // .map(metaPulse -> { + // metaPulse.setKeyspace(KEYSPACE); + // return metaPulse; + // }); + // } + + public Stream getStreamEventQueryStream(TimeRangeQuery query) { + + return dataGen.generateMetaTime( + KEYSPACE, + 3600, + query.getStartMillis() / 10, + ((query.getEndMillis() - query.getStartMillis()) / 10 + 1), + i -> i * 10, + i -> 0, + i -> i, + query.getChannel()).stream(); + } + + // /** + // * @{inheritDoc + // */ + // @Override + // public Stream getMetaStream(PulseIdRangeQuery query) { + // + // return getStreamEventQueryStream(query).map(r -> { + // return (MetaPulseId) r; + // }); + // + // } + + + /** + * @{inheritDoc + */ + @Override + public Stream> getMetaStream(TimeRangeQuery query) { + + return getStreamEventQueryStream(query).map(r -> { + return (MetaStreamEventQuery) r; + }); + } + + @Override + public Stream getEventStream(Stream> queryInfos, + String... columns) { + return getEventStream(null, queryInfos); + } + + @Override + public Stream getChannelConfiguration(TimeRangeQuery query) { + List configs = new ArrayList<>(); + + BigDecimal time = query.getStartTime(); + configs.add( + new ChannelConfigurationImpl( + query.getChannel(), + time, + TimeUtils.getMillis(time) / 10, + 0, + Type.Int32.getKey(), + new int[] {1}, + false, + ChannelConfiguration.DEFAULT_LOCAL_WRITE, + ChannelConfiguration.DEFAULT_BIN_SIZE_IN_MILLIS, + ChannelConfiguration.SPLIT_COUNT, + ChannelConfiguration.DEFAULT_SOURCE, + ChannelConfiguration.DEFAULT_MODULO, + ChannelConfiguration.DEFAULT_OFFSET, + Backend.SF_DATABUFFER)); + if (query.getEndMillis() > query.getStartMillis()) { + time = query.getEndTime(); + configs.add( + new ChannelConfigurationImpl( + query.getChannel(), + time, + TimeUtils.getMillis(time) / 10, + 1, + Type.Int32.getKey(), + new int[] {1}, + false, + ChannelConfiguration.DEFAULT_LOCAL_WRITE, + ChannelConfiguration.DEFAULT_BIN_SIZE_IN_MILLIS, + ChannelConfiguration.SPLIT_COUNT, + ChannelConfiguration.DEFAULT_SOURCE, + ChannelConfiguration.DEFAULT_MODULO, + ChannelConfiguration.DEFAULT_OFFSET, + Backend.SF_DATABUFFER)); + } + + if (Ordering.desc.equals(query.getOrdering())) { + Collections.reverse(configs); + } + + return configs.stream(); + } + + @Override + public TimeRangeQuery getTimeRangeQuery(PulseIdRangeQuery query) { + return new TimeRangeQuery( + TimeUtils.getTimeFromMillis(query.getStartPulseId() * 10, 0), + TimeUtils.getTimeFromMillis(query.getEndPulseId() * 10, 0), + query); + } + + @Override + public Stream getChannelInfoStream(TimeRangeQuery query) { + return getChannelConfiguration(query) + .map(channelConfiguration -> new ChannelInfoImpl(channelConfiguration)); + } + + @Override + public void truncateCache() {} +} diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java index c89db38..334d316 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyArchiverApplianceReader.java @@ -16,10 +16,10 @@ import ch.psi.daq.domain.query.range.TimeRangeQuery; import ch.psi.daq.domain.reader.DataReader; public class DummyArchiverApplianceReader implements DataReader { - public static final String ARCHIVER_TEST_CHANNEL = "ArchiverTestChannel_"; + public static final String TEST_CHANNEL = Backend.SF_ARCHIVERAPPLIANCE.getKey() + "_TestChannel_"; - public static final String TEST_CHANNEL_1 = "ArchiverChannel_1"; - public static final String TEST_CHANNEL_2 = "ArchiverChannel_2"; + public static final String TEST_CHANNEL_1 = Backend.SF_ARCHIVERAPPLIANCE.getKey() + "_Channel_1"; + public static final String TEST_CHANNEL_2 = Backend.SF_ARCHIVERAPPLIANCE.getKey() + "_Channel_2"; private List channels = Lists.newArrayList(TEST_CHANNEL_1, TEST_CHANNEL_2); private AtomicLong channelNameCallCounter = new AtomicLong(); @@ -32,7 +32,7 @@ public class DummyArchiverApplianceReader implements DataReader { @Override public Stream getChannelStream(String regex) { - channels.add(ARCHIVER_TEST_CHANNEL + channelNameCallCounter.incrementAndGet()); + channels.add(TEST_CHANNEL + channelNameCallCounter.incrementAndGet()); Stream channelStream = channels.stream(); if (regex != null) { @@ -65,8 +65,7 @@ public class DummyArchiverApplianceReader implements DataReader { TimeUtils.getTimeFromMillis(query.getEndPulseId() * 10, 0), query); } - + @Override - public void truncateCache() { - } + public void truncateCache() {} } diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java index 7f1595a..b740e57 100644 --- a/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyCassandraReader.java @@ -1,378 +1,27 @@ package ch.psi.daq.test.queryrest.query; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.LongStream; -import java.util.stream.Stream; import javax.annotation.PostConstruct; -import javax.annotation.Resource; -import org.apache.commons.lang3.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - -import ch.psi.bsread.message.Type; import ch.psi.daq.cassandra.reader.CassandraReader; -import ch.psi.daq.common.ordering.Ordering; -import ch.psi.daq.common.time.TimeUtils; -import ch.psi.daq.domain.DataEvent; -import ch.psi.daq.domain.FieldNames; import ch.psi.daq.domain.backend.Backend; -import ch.psi.daq.domain.config.DomainConfig; import ch.psi.daq.domain.events.ChannelConfiguration; -import ch.psi.daq.domain.events.ChannelEvent; -import ch.psi.daq.domain.events.impl.ChannelConfigurationImpl; -import ch.psi.daq.domain.events.impl.ChannelEventImpl; -import ch.psi.daq.domain.json.channels.info.ChannelInfo; -import ch.psi.daq.domain.json.channels.info.ChannelInfoImpl; -import ch.psi.daq.domain.query.event.EventQuery; -import ch.psi.daq.domain.query.event.StreamEventQuery; -import ch.psi.daq.domain.query.range.PulseIdRangeQuery; import ch.psi.daq.domain.query.range.TimeRangeQuery; -import ch.psi.daq.domain.reader.MetaStreamEventQuery; -import ch.psi.daq.domain.test.backend.TestBackendAccess; -import ch.psi.daq.domain.test.gen.TestDataGen; -import ch.psi.daq.domain.utils.PropertiesUtils; -public class DummyCassandraReader implements CassandraReader { +public class DummyCassandraReader extends AbstractStreamEventReader implements CassandraReader { private static final Logger LOGGER = LoggerFactory.getLogger(DummyCassandraReader.class); - public static final String DATABUFFER_TEST_CHANNEL = "DataBufferTestChannel_"; - private static final Random random = new Random(0); - private static final int KEYSPACE = 1; - private List channels; - private AtomicLong channelNameCallCounter = new AtomicLong(); - - private TestDataGen dataGen; - private Backend backend = Backend.SF_DATABUFFER; - - @Resource(name = DomainConfig.BEAN_NAME_TEST_BACKEND_ACCESS) - private TestBackendAccess testBackendAccess; - - + public DummyCassandraReader() { - this.channels = Lists.newArrayList( - "BoolScalar", - "BoolWaveform", - "Int8Scalar", - "Int8Waveform", - "UInt8Scalar", - "UInt8Waveform", - "Int16Scalar", - "Int16Waveform", - "UInt16Scalar", - "UInt16Waveform", - "Int32Scalar", - "Int32Waveform", - "UInt32Scalar", - "UInt32Waveform", - "Int64Scalar", - "Int64Waveform", - "UInt64Scalar", - "UInt64Waveform", - "Float32Scalar", - "Float32Waveform", - "Float64Scalar", - "Float64Waveform", - "StringScalar"); } @PostConstruct public void afterPropertiesSet() { - dataGen = testBackendAccess.getTestDataGen(backend); - } - - @Override - public Backend getBackend() { - return backend; - } - - /** - * @{inheritDoc - */ - @Override - public Stream getChannelStream(String regex) { - channels.add(DATABUFFER_TEST_CHANNEL + channelNameCallCounter.incrementAndGet()); - - Stream channelStream = channels.stream(); - if (regex != null) { - Pattern pattern = Pattern.compile(regex.toLowerCase()); - channelStream = channelStream.filter(channel -> pattern.matcher(channel.toLowerCase()).find()); - } - return channelStream; - } - - // @Override - // public Stream getEventStream(PulseIdRangeQuery query) { - // return getDummyEventStream(query.getChannel(), query.getStartPulseId(), query.getEndPulseId(), - // query.getEventColumns()) - // .filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER)); - // } - - @Override - public Stream getEventStream(TimeRangeQuery query) { - return getDummyEventStream(query.getChannel(), query.getStartMillis() / 10, query.getEndMillis() / 10) - .filter(query.getFilterOrDefault(EventQuery.NO_OP_FILTER)); - } - - public Stream getEventStream(EventQuery eventQuery, Stream queryProviders) { - Stream result = queryProviders.map(ceq -> { - if (ceq instanceof MetaStreamEventQuery) { - return getEvent((MetaStreamEventQuery) ceq); - } else { - throw new UnsupportedOperationException("This is not yet implemented!"); - } - }); - - return result; - } - - public static Stream getDummyEventStream(String channelParam, long startIndex, long endIndex, - String... columns) { - String channelLower = channelParam.toLowerCase(); - String channel = - (columns == null || columns.length == 0 || ArrayUtils.contains(columns, FieldNames.FIELD_CHANNEL)) ? channelParam - : null; - - Stream rangeStream; - - if (channelParam.contains("[") && channelParam.contains("]")) { - rangeStream = - Arrays.stream( - channelParam.substring( - channelParam.indexOf("[") + 1, - channelParam.indexOf("]")) - .split(",") - ) - .map(str -> str.trim()) - .map(Long::parseLong); - } else { - rangeStream = LongStream.rangeClosed(startIndex, endIndex).boxed(); - } - - Stream eventStream = - rangeStream.map( - i -> { - BigDecimal iocTime = - (columns == null || columns.length == 0 || ArrayUtils.contains(columns, - FieldNames.FIELD_IOC_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) - : PropertiesUtils.DEFAULT_VALUE_DECIMAL; - BigDecimal globalTime = - (columns == null || columns.length == 0 || ArrayUtils.contains(columns, - FieldNames.FIELD_GLOBAL_TIME)) ? TimeUtils.getTimeFromMillis(i * 10, 0) - : PropertiesUtils.DEFAULT_VALUE_DECIMAL; - long pulseId = - (columns == null || columns.length == 0 || ArrayUtils.contains(columns, - FieldNames.FIELD_PULSE_ID)) ? i : PropertiesUtils.DEFAULT_VALUE_BIGINT_PRIMITIVE; - - if (channelLower.contains("waveform")) { - long[] value = random.longs(8).toArray(); - value[0] = i; - value[1] = i; - return new ChannelEventImpl( - channel, - iocTime, - pulseId, - globalTime, - KEYSPACE, - value - ); - - } else if (channelLower.contains("image")) { - int x = 4; - int y = 8; - int[] shape = new int[] {x, y}; - long[] value = random.longs(x * y).toArray(); - value[0] = i; - value[1] = i; - return new ChannelEventImpl( - channel, - iocTime, - pulseId, - globalTime, - KEYSPACE, - value, - shape - ); - } else { - return new ChannelEventImpl( - channel, - iocTime, - pulseId, - globalTime, - KEYSPACE, - i - ); - } - }); - - return eventStream; - } - - private List getDummyEvents(String channel, long startIndex, long endIndex, String... columns) { - return getDummyEventStream(channel, startIndex, endIndex, columns).collect(Collectors.toList()); - } - - /** - * @{inheritDoc - */ - @Override - public List getChannels() { - return Lists.newArrayList(channels); - } - - /** - * @{inheritDoc - */ - @Override - public List getChannels(String regex) { - return Lists.newArrayList(channels).stream().filter(s -> { - return s.contains(regex); - }).collect(Collectors.toList()); - } - - /** - * @{inheritDoc - */ - @Override - public ChannelEvent getEvent(MetaStreamEventQuery queryInfo, String... columns) { - if (queryInfo.getPulseId() > 0) { - return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getPulseId(), queryInfo.getPulseId(), - columns) - .get(0); - } - return (ChannelEvent) getDummyEvents(queryInfo.getChannel(), queryInfo.getGlobalMillis() / 10, - queryInfo.getGlobalMillis() / 10).get(0); - } - - /** - * @{inheritDoc - */ - @Override - public CompletableFuture getEventAsync(MetaStreamEventQuery queryInfo, String... columns) { - // implement when needed - throw new UnsupportedOperationException(); - } - - // /** - // * @{inheritDoc - // */ - // @Override - // public Stream getStreamEventQueryStream(PulseIdRangeQuery query) { - // - // return dataGen.generateMetaPulseId( - // query.getStartPulseId(), - // (query.getEndPulseId() - query.getStartPulseId() + 1), - // i -> i * 10, - // i -> 0, - // i -> i, - // query.getChannel()) - // .stream() - // .map(metaPulse -> { - // metaPulse.setKeyspace(KEYSPACE); - // return metaPulse; - // }); - // } - - public Stream getStreamEventQueryStream(TimeRangeQuery query) { - - return dataGen.generateMetaTime( - KEYSPACE, - 3600, - query.getStartMillis() / 10, - ((query.getEndMillis() - query.getStartMillis()) / 10 + 1), - i -> i * 10, - i -> 0, - i -> i, - query.getChannel()).stream(); - } - - // /** - // * @{inheritDoc - // */ - // @Override - // public Stream getMetaStream(PulseIdRangeQuery query) { - // - // return getStreamEventQueryStream(query).map(r -> { - // return (MetaPulseId) r; - // }); - // - // } - - - /** - * @{inheritDoc - */ - @Override - public Stream> getMetaStream(TimeRangeQuery query) { - - return getStreamEventQueryStream(query).map(r -> { - return (MetaStreamEventQuery) r; - }); - } - - @Override - public Stream getEventStream(Stream> queryInfos) { - return getEventStream(null, queryInfos); - } - - @Override - public Stream getChannelConfiguration(TimeRangeQuery query) { - List configs = new ArrayList<>(); - - BigDecimal time = query.getStartTime(); - configs.add( - new ChannelConfigurationImpl( - query.getChannel(), - time, - TimeUtils.getMillis(time) / 10, - 0, - Type.Int32.getKey(), - new int[] {1}, - false, - ChannelConfiguration.DEFAULT_LOCAL_WRITE, - ChannelConfiguration.DEFAULT_BIN_SIZE_IN_MILLIS, - ChannelConfiguration.SPLIT_COUNT, - ChannelConfiguration.DEFAULT_SOURCE, - ChannelConfiguration.DEFAULT_MODULO, - ChannelConfiguration.DEFAULT_OFFSET, - Backend.SF_DATABUFFER)); - if (query.getEndMillis() > query.getStartMillis()) { - time = query.getEndTime(); - configs.add( - new ChannelConfigurationImpl( - query.getChannel(), - time, - TimeUtils.getMillis(time) / 10, - 1, - Type.Int32.getKey(), - new int[] {1}, - false, - ChannelConfiguration.DEFAULT_LOCAL_WRITE, - ChannelConfiguration.DEFAULT_BIN_SIZE_IN_MILLIS, - ChannelConfiguration.SPLIT_COUNT, - ChannelConfiguration.DEFAULT_SOURCE, - ChannelConfiguration.DEFAULT_MODULO, - ChannelConfiguration.DEFAULT_OFFSET, - Backend.SF_DATABUFFER)); - } - - if(Ordering.desc.equals(query.getOrdering())){ - Collections.reverse(configs); - } - - return configs.stream(); + init(Backend.SF_DATABUFFER); } @Override @@ -408,22 +57,4 @@ public class DummyCassandraReader implements CassandraReader { // implement when needed throw new UnsupportedOperationException(); } - - @Override - public TimeRangeQuery getTimeRangeQuery(PulseIdRangeQuery query) { - return new TimeRangeQuery( - TimeUtils.getTimeFromMillis(query.getStartPulseId() * 10, 0), - TimeUtils.getTimeFromMillis(query.getEndPulseId() * 10, 0), - query); - } - - @Override - public Stream getChannelInfoStream(TimeRangeQuery query) { - return getChannelConfiguration(query) - .map(channelConfiguration -> new ChannelInfoImpl(channelConfiguration)); - } - - @Override - public void truncateCache() { - } } diff --git a/src/test/java/ch/psi/daq/test/queryrest/query/DummyFilestorageReader.java b/src/test/java/ch/psi/daq/test/queryrest/query/DummyFilestorageReader.java new file mode 100644 index 0000000..f0782f8 --- /dev/null +++ b/src/test/java/ch/psi/daq/test/queryrest/query/DummyFilestorageReader.java @@ -0,0 +1,16 @@ +package ch.psi.daq.test.queryrest.query; + +import javax.annotation.PostConstruct; + +import ch.psi.daq.domain.backend.Backend; + +public class DummyFilestorageReader extends AbstractStreamEventReader { + + public DummyFilestorageReader() { + } + + @PostConstruct + public void afterPropertiesSet() { + init(Backend.SF_IMAGESTORAGE); + } +} diff --git a/src/test/resources/queryrest-test.properties b/src/test/resources/queryrest-test.properties index e69de29..9f9176e 100644 --- a/src/test/resources/queryrest-test.properties +++ b/src/test/resources/queryrest-test.properties @@ -0,0 +1 @@ +backend.default=sf-databuffer \ No newline at end of file