package ch.psi.daq.rest; import java.io.IOException; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import javax.servlet.http.HttpServletResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import ch.psi.daq.cassandra.writer.CassandraWriter; import ch.psi.daq.domain.cassandra.ChannelEvent; import ch.psi.daq.domain.cassandra.DataEvent; import ch.psi.daq.hazelcast.query.processor.QueryProcessor; import ch.psi.daq.rest.queries.AbstractQuery; import ch.psi.daq.rest.queries.PulseRangeQuery; import ch.psi.daq.rest.queries.TimeRangeQuery; @RestController public class DaqRestController { private static final Logger logger = LoggerFactory.getLogger(DaqRestController.class); @Autowired private CassandraWriter cassandraWriter; @Autowired private ResponseStreamWriter responseStreamWriter; @Autowired private QueryProcessor queryProcessor; /** * * @param query * @param res * @throws IOException */ @RequestMapping(value = "/pulserange") public void pulseRange(@RequestBody PulseRangeQuery query, HttpServletResponse res) throws IOException { logger.debug("PulseRangeQuery received: {}", query); executeQuery(query, res); } /** * * @param query * @param res * @throws IOException */ @RequestMapping(value = "/timerange") public void timeRange(@RequestBody TimeRangeQuery query, HttpServletResponse res) throws IOException { logger.debug("TimeRangeQuery received: {}", query); executeQuery(query, res); } /** * * @param query * @param res * @throws IOException */ private void executeQuery(AbstractQuery query, HttpServletResponse res) throws IOException { // all the magic happens here Map> process = queryProcessor.process(query); Stream flatStreams = process.values().stream().flatMap(s -> { return s; }); // write the response back to the client using java 8 streams responseStreamWriter.respond(flatStreams, query, res); } @RequestMapping(value = "/write") public void writeDummyEntry() { int pulseId = 100; ChannelEvent event = new ChannelEvent( "dummy-test", pulseId, 0, pulseId, pulseId, 0, "data_" + UUID.randomUUID().toString()); CompletableFuture future = cassandraWriter.writeAsync(3, 0, event); future.join(); } }