diff --git a/src/main/java/info/peper/vz/rest/Calculator.java b/src/main/java/info/peper/vz/rest/Calculator.java index 7b0d518..4a0c74d 100644 --- a/src/main/java/info/peper/vz/rest/Calculator.java +++ b/src/main/java/info/peper/vz/rest/Calculator.java @@ -4,13 +4,172 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Types; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; + import info.peper.vz.rest.bo.db.Aggregate2; +import info.peper.vz.rest.bo.db.Data; public final class Calculator { + private static final Logger LOG = LoggerFactory.getLogger(Calculator.class); + public static final long INTERVAL = 60*1000; + + public static void processHouseMulti(final int houseId, + final long startTimestamp, + final long endTimestamp, + final long interval, + final int[] channelIdsSolar, + final int[] channelIdsMeter, + final JdbcTemplate jdbcTemplate) { + + LOG.info("Processing house " + houseId + "..."); + + final List paramsForSave = new LinkedList(); + + @SuppressWarnings("unchecked") + Iterator[] iterSolar = new Iterator[channelIdsSolar.length]; + @SuppressWarnings("unchecked") + Iterator[] iterMeter = new Iterator[channelIdsMeter.length]; + long[] currentTsSolar = new long[channelIdsSolar.length]; + long[] currentTsMeter = new long[channelIdsMeter.length]; + boolean[] noNextRecordSolar = new boolean[channelIdsSolar.length]; + boolean[] noNextRecordMeter = new boolean[channelIdsMeter.length]; + Data[] currentDataSolar = new Data[channelIdsSolar.length]; + Data[] currentDataMeter = new Data[channelIdsMeter.length]; + + for (int i = 0; i < channelIdsSolar.length; i++) { + + List dataSolar = jdbcTemplate.query("SELECT * FROM volkszaehler.data WHERE channel_id=? AND timestamp>? ORDER BY timestamp;", + (rs, rowNum) -> new Data(rs.getInt("channel_id"), rs.getLong("timestamp"), rs.getDouble("value")), + channelIdsSolar[i], startTimestamp); + iterSolar[i] = dataSolar.iterator(); + currentTsSolar[i] = startTimestamp; + noNextRecordSolar[i] = false; + } + + for (int i = 0; i < channelIdsMeter.length; i++) { + List dataMeter = jdbcTemplate.query("SELECT * FROM volkszaehler.data WHERE channel_id=? AND timestamp>? ORDER BY timestamp;", + (rs, rowNum) -> new Data(rs.getInt("channel_id"), rs.getLong("timestamp"), rs.getDouble("value")), + channelIdsMeter[i], startTimestamp); + iterMeter[i] = dataMeter.iterator(); + currentTsMeter[i] = startTimestamp; + noNextRecordMeter[i] = false; + } + + long intervalProduced = 0; + long intervalObtained = 0; + long intervalInjected = 0; + + long intervalStartTimestamp = startTimestamp; + long intervalEndTimestamp = startTimestamp + interval; + + // Eigentliche Verarbeitung + + while (getMinOfArray(currentTsSolar) <= endTimestamp || getMinOfArray(currentTsMeter) <= endTimestamp) { + LOG.debug(intervalStartTimestamp + "..."); + + for (int iSolar = 0; iSolar < channelIdsSolar.length; iSolar++) { + currentTsSolar[iSolar] = intervalStartTimestamp; + + while (currentTsSolar[iSolar] < intervalEndTimestamp && (noNextRecordSolar[iSolar] || iterSolar[iSolar].hasNext())) { + if (noNextRecordSolar[iSolar]) { + noNextRecordSolar[iSolar] = false; + } else { + currentDataSolar[iSolar] = iterSolar[iSolar].next(); + } + + final long tsSolar = currentDataSolar[iSolar].getTimestamp(); + final double value = currentDataSolar[iSolar].getValue() > 1.0 ? currentDataSolar[iSolar].getValue() : 0.0; + final long tsDiff = (tsSolar < intervalEndTimestamp) ? tsSolar - currentTsSolar[iSolar] : intervalEndTimestamp - currentTsSolar[iSolar]; + + currentTsSolar[iSolar] = tsSolar; + + intervalProduced += Math.round(value * tsDiff); + + if (currentTsSolar[iSolar] >= intervalEndTimestamp) { + noNextRecordSolar[iSolar] = true; + } + } + } + + double meterValue = 0; + for (int iMeter = 0; iMeter < channelIdsMeter.length; iMeter++) { + currentTsMeter[iMeter] = intervalStartTimestamp; + + while (currentTsMeter[iMeter] < intervalEndTimestamp && (noNextRecordMeter[iMeter] || iterMeter[iMeter].hasNext())) { + if (noNextRecordMeter[iMeter]) { + noNextRecordMeter[iMeter] = false; + } else { + currentDataMeter[iMeter] = iterMeter[iMeter].next(); + } + final long tsMeter = currentDataMeter[iMeter].getTimestamp(); + final double value = currentDataMeter[iMeter].getValue(); + final long tsDiff = (tsMeter < intervalEndTimestamp) ? tsMeter - currentTsMeter[iMeter] : intervalEndTimestamp - currentTsMeter[iMeter]; + + currentTsMeter[iMeter] = tsMeter; + + meterValue += Math.round(value * tsDiff); + + if (currentTsMeter[iMeter] >= intervalEndTimestamp) { + noNextRecordMeter[iMeter] = true; + } + } + } + if (meterValue > 0.0) { + intervalObtained += Math.round(meterValue); + } else { + intervalInjected += Math.abs(Math.round(meterValue)); + } + + boolean readyToSave = true; + for (long endTsSolar : currentTsSolar) { + if (endTsSolar < intervalEndTimestamp) { + readyToSave = false; + } + } + for (long endTsMeter : currentTsMeter) { + if (endTsMeter < intervalEndTimestamp) { + readyToSave = false; + } + } + + LOG.debug(" " + intervalProduced + " / " + intervalObtained + " / " + intervalInjected); + + if (readyToSave) { + paramsForSave.add(new Object[] {houseId, intervalStartTimestamp, intervalEndTimestamp, intervalProduced, intervalObtained, intervalInjected}); + } else { + LOG.warn("Not enough data at the end"); + } + + intervalStartTimestamp += interval; + intervalEndTimestamp += interval; + + intervalInjected = 0; + intervalObtained = 0; + intervalProduced = 0; + } + + LOG.info("Saving " + paramsForSave.size() + " new records..."); + + final int[] batchUpdateResults = jdbcTemplate.batchUpdate( + "INSERT INTO volkszaehler.tobias_aggregate2 (house_id, timestamp_start, timestamp_end, produced_energy, obtained_energy, injected_energy) VALUES (?, ?, ?, ?, ?, ?);", + paramsForSave, new int[] {Types.INTEGER, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT}); + + for (int batchUpdateResult : batchUpdateResults) { + if (batchUpdateResult != 1) { + LOG.error("Something went wrong while saving!"); + } + } + } + + public static List processHouseMulti(final int houseId, final long startTimestamp, final long endTimestamp, diff --git a/src/main/java/info/peper/vz/rest/LoadDatabase.java b/src/main/java/info/peper/vz/rest/LoadDatabase.java deleted file mode 100644 index fdb4a8d..0000000 --- a/src/main/java/info/peper/vz/rest/LoadDatabase.java +++ /dev/null @@ -1,79 +0,0 @@ -package info.peper.vz.rest; - -import java.time.Instant; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.time.temporal.ChronoUnit; -import java.time.temporal.TemporalAccessor; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.CommandLineRunner; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.jdbc.core.JdbcTemplate; - -@Configuration -class LoadDatabase { - - private static final DateTimeFormatter DTF = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()); - private static final int[] CHANNELS_TO_USE = new int[] {1, 4, }; - private static final Logger LOG = LoggerFactory.getLogger(LoadDatabase.class); - - static class Data { - final int channelId; - final long timestamp; - final int value; - - Data(int channelId, long timestamp, int value) { - super(); - this.channelId = channelId; - this.timestamp = timestamp; - this.value = value; - } - - } - - @Autowired - JdbcTemplate jdbcTemplate; - - @Bean - CommandLineRunner initDatabase(AggregateRepository repository) { - - return args -> { - LOG.info("################ jdbcTemplate: " + jdbcTemplate.getClass().getName()); -// final long startTimestamp = getTimestamp("2022-06-01T00:00:00"); -// final long endTimestamp = getTimestamp("2022-06-02T00:00:00"); -// final List data = jdbcTemplate.query( -// "SELECT * FROM volkszaehler.data WHERE channel_id=? AND timestamp>? AND timestamp<=? ORDER BY timestamp;", -// (rs, rowNum) -> new Data(rs.getInt("channel_id"), rs.getLong("timestamp"), rs.getInt("value")), -// 1 ,startTimestamp, endTimestamp); -// -// LOG.info("########## Count: " + data.size()); -// long currentTimestamp = startTimestamp; -// long wattMillisecondsPos = 0; -// long wattMillisecondsNeg = 0; -// for (Data d : data) { -// final long rsTimestamp = d.timestamp; -// final long rsValue = d.value; -// final long tsDiff = rsTimestamp - Math.min(endTimestamp, currentTimestamp); -// currentTimestamp = rsTimestamp; -// if (rsValue > 0) { -// wattMillisecondsPos += (rsValue * tsDiff); -// } else if (rsValue < 0) { -// wattMillisecondsNeg += (-rsValue * tsDiff); -// } -// } -// repository.save(new Aggregate(1, startTimestamp, endTimestamp, wattMillisecondsPos, wattMillisecondsNeg)); - }; - } - - long getTimestamp(final String dateTime) { - final TemporalAccessor tempAccessor = DTF.parse(dateTime); - final Instant instant = Instant.from(tempAccessor); - return Instant.EPOCH.until(instant, ChronoUnit.MILLIS); - } - -} diff --git a/src/main/java/info/peper/vz/rest/ScheduledTasks.java b/src/main/java/info/peper/vz/rest/ScheduledTasks.java index 475422d..3299837 100644 --- a/src/main/java/info/peper/vz/rest/ScheduledTasks.java +++ b/src/main/java/info/peper/vz/rest/ScheduledTasks.java @@ -1,5 +1,6 @@ package info.peper.vz.rest; +import java.util.Iterator; import java.util.List; import org.slf4j.Logger; @@ -17,8 +18,8 @@ public class ScheduledTasks { @Autowired private JdbcTemplate jdbcTemplate; - @Scheduled(fixedRate = 5000) - public void printSomething() { + @Scheduled(fixedDelay = 60000) + public void calculateAggregates() { final List houseIds = jdbcTemplate.query("SELECT house_id FROM tobias_house GROUP BY house_id ORDER BY house_id;", (rs, rowNum) -> rs.getInt("house_id")); for (int houseId : houseIds) { @@ -27,11 +28,27 @@ public class ScheduledTasks { } private void processHouse(final int houseId) { - LOG.info("Processing house " + houseId + "..."); final long lastTimestamp = jdbcTemplate.queryForObject("SELECT MAX(timestamp_end) FROM tobias_aggregate2 WHERE house_id=?", Long.class, houseId); - LOG.info("- last timestamp: " + lastTimestamp); - long currentTimeRound = (System.currentTimeMillis()/60000)*60000; -// LOG.info(System.currentTimeMillis() + ""); -// LOG.info(currentTimeRound + ""); + + final List channelIdsMeter = jdbcTemplate.query("SELECT channel_id FROM tobias_house WHERE house_id=? AND type=1;", (rs, rowNum) -> rs.getInt("channel_id"), houseId); + final List channelIdsSolar = jdbcTemplate.query("SELECT channel_id FROM tobias_house WHERE house_id=? AND type=2;", (rs, rowNum) -> rs.getInt("channel_id"), houseId); + + long currentTimeLastInterval = (System.currentTimeMillis()/Calculator.INTERVAL)*Calculator.INTERVAL; + + if (currentTimeLastInterval > lastTimestamp) { + Calculator.processHouseMulti(houseId, lastTimestamp, currentTimeLastInterval, Calculator.INTERVAL, getAsArray(channelIdsSolar), getAsArray(channelIdsMeter), jdbcTemplate); + } else { + LOG.warn("Nothing to do for house " + houseId + "..."); + } + + } + + private int[] getAsArray(final List ints) { + final int[] result = new int[ints.size()]; + final Iterator iter = ints.iterator(); + for (int i = 0; iter.hasNext(); i++) { + result[i] = iter.next().intValue(); + } + return result; } }