Mit funktionierendem ScheduledTask für Berechnen der neuen Aggregates

This commit is contained in:
tobias 2025-03-13 22:50:56 +01:00
parent 05b93e03fb
commit 00153ed90b
3 changed files with 183 additions and 86 deletions

View File

@ -4,13 +4,172 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import info.peper.vz.rest.bo.db.Aggregate2;
import info.peper.vz.rest.bo.db.Data;
public final class Calculator {
private static final Logger LOG = LoggerFactory.getLogger(Calculator.class);
public static final long INTERVAL = 60*1000;
public static void processHouseMulti(final int houseId,
final long startTimestamp,
final long endTimestamp,
final long interval,
final int[] channelIdsSolar,
final int[] channelIdsMeter,
final JdbcTemplate jdbcTemplate) {
LOG.info("Processing house " + houseId + "...");
final List<Object[]> paramsForSave = new LinkedList<Object[]>();
@SuppressWarnings("unchecked")
Iterator<Data>[] iterSolar = new Iterator[channelIdsSolar.length];
@SuppressWarnings("unchecked")
Iterator<Data>[] iterMeter = new Iterator[channelIdsMeter.length];
long[] currentTsSolar = new long[channelIdsSolar.length];
long[] currentTsMeter = new long[channelIdsMeter.length];
boolean[] noNextRecordSolar = new boolean[channelIdsSolar.length];
boolean[] noNextRecordMeter = new boolean[channelIdsMeter.length];
Data[] currentDataSolar = new Data[channelIdsSolar.length];
Data[] currentDataMeter = new Data[channelIdsMeter.length];
for (int i = 0; i < channelIdsSolar.length; i++) {
List<Data> dataSolar = jdbcTemplate.query("SELECT * FROM volkszaehler.data WHERE channel_id=? AND timestamp>? ORDER BY timestamp;",
(rs, rowNum) -> new Data(rs.getInt("channel_id"), rs.getLong("timestamp"), rs.getDouble("value")),
channelIdsSolar[i], startTimestamp);
iterSolar[i] = dataSolar.iterator();
currentTsSolar[i] = startTimestamp;
noNextRecordSolar[i] = false;
}
for (int i = 0; i < channelIdsMeter.length; i++) {
List<Data> dataMeter = jdbcTemplate.query("SELECT * FROM volkszaehler.data WHERE channel_id=? AND timestamp>? ORDER BY timestamp;",
(rs, rowNum) -> new Data(rs.getInt("channel_id"), rs.getLong("timestamp"), rs.getDouble("value")),
channelIdsMeter[i], startTimestamp);
iterMeter[i] = dataMeter.iterator();
currentTsMeter[i] = startTimestamp;
noNextRecordMeter[i] = false;
}
long intervalProduced = 0;
long intervalObtained = 0;
long intervalInjected = 0;
long intervalStartTimestamp = startTimestamp;
long intervalEndTimestamp = startTimestamp + interval;
// Eigentliche Verarbeitung
while (getMinOfArray(currentTsSolar) <= endTimestamp || getMinOfArray(currentTsMeter) <= endTimestamp) {
LOG.debug(intervalStartTimestamp + "...");
for (int iSolar = 0; iSolar < channelIdsSolar.length; iSolar++) {
currentTsSolar[iSolar] = intervalStartTimestamp;
while (currentTsSolar[iSolar] < intervalEndTimestamp && (noNextRecordSolar[iSolar] || iterSolar[iSolar].hasNext())) {
if (noNextRecordSolar[iSolar]) {
noNextRecordSolar[iSolar] = false;
} else {
currentDataSolar[iSolar] = iterSolar[iSolar].next();
}
final long tsSolar = currentDataSolar[iSolar].getTimestamp();
final double value = currentDataSolar[iSolar].getValue() > 1.0 ? currentDataSolar[iSolar].getValue() : 0.0;
final long tsDiff = (tsSolar < intervalEndTimestamp) ? tsSolar - currentTsSolar[iSolar] : intervalEndTimestamp - currentTsSolar[iSolar];
currentTsSolar[iSolar] = tsSolar;
intervalProduced += Math.round(value * tsDiff);
if (currentTsSolar[iSolar] >= intervalEndTimestamp) {
noNextRecordSolar[iSolar] = true;
}
}
}
double meterValue = 0;
for (int iMeter = 0; iMeter < channelIdsMeter.length; iMeter++) {
currentTsMeter[iMeter] = intervalStartTimestamp;
while (currentTsMeter[iMeter] < intervalEndTimestamp && (noNextRecordMeter[iMeter] || iterMeter[iMeter].hasNext())) {
if (noNextRecordMeter[iMeter]) {
noNextRecordMeter[iMeter] = false;
} else {
currentDataMeter[iMeter] = iterMeter[iMeter].next();
}
final long tsMeter = currentDataMeter[iMeter].getTimestamp();
final double value = currentDataMeter[iMeter].getValue();
final long tsDiff = (tsMeter < intervalEndTimestamp) ? tsMeter - currentTsMeter[iMeter] : intervalEndTimestamp - currentTsMeter[iMeter];
currentTsMeter[iMeter] = tsMeter;
meterValue += Math.round(value * tsDiff);
if (currentTsMeter[iMeter] >= intervalEndTimestamp) {
noNextRecordMeter[iMeter] = true;
}
}
}
if (meterValue > 0.0) {
intervalObtained += Math.round(meterValue);
} else {
intervalInjected += Math.abs(Math.round(meterValue));
}
boolean readyToSave = true;
for (long endTsSolar : currentTsSolar) {
if (endTsSolar < intervalEndTimestamp) {
readyToSave = false;
}
}
for (long endTsMeter : currentTsMeter) {
if (endTsMeter < intervalEndTimestamp) {
readyToSave = false;
}
}
LOG.debug(" " + intervalProduced + " / " + intervalObtained + " / " + intervalInjected);
if (readyToSave) {
paramsForSave.add(new Object[] {houseId, intervalStartTimestamp, intervalEndTimestamp, intervalProduced, intervalObtained, intervalInjected});
} else {
LOG.warn("Not enough data at the end");
}
intervalStartTimestamp += interval;
intervalEndTimestamp += interval;
intervalInjected = 0;
intervalObtained = 0;
intervalProduced = 0;
}
LOG.info("Saving " + paramsForSave.size() + " new records...");
final int[] batchUpdateResults = jdbcTemplate.batchUpdate(
"INSERT INTO volkszaehler.tobias_aggregate2 (house_id, timestamp_start, timestamp_end, produced_energy, obtained_energy, injected_energy) VALUES (?, ?, ?, ?, ?, ?);",
paramsForSave, new int[] {Types.INTEGER, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT});
for (int batchUpdateResult : batchUpdateResults) {
if (batchUpdateResult != 1) {
LOG.error("Something went wrong while saving!");
}
}
}
public static List<Aggregate2> processHouseMulti(final int houseId,
final long startTimestamp,
final long endTimestamp,

View File

@ -1,79 +0,0 @@
package info.peper.vz.rest;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAccessor;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
@Configuration
class LoadDatabase {
private static final DateTimeFormatter DTF = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault());
private static final int[] CHANNELS_TO_USE = new int[] {1, 4, };
private static final Logger LOG = LoggerFactory.getLogger(LoadDatabase.class);
static class Data {
final int channelId;
final long timestamp;
final int value;
Data(int channelId, long timestamp, int value) {
super();
this.channelId = channelId;
this.timestamp = timestamp;
this.value = value;
}
}
@Autowired
JdbcTemplate jdbcTemplate;
@Bean
CommandLineRunner initDatabase(AggregateRepository repository) {
return args -> {
LOG.info("################ jdbcTemplate: " + jdbcTemplate.getClass().getName());
// final long startTimestamp = getTimestamp("2022-06-01T00:00:00");
// final long endTimestamp = getTimestamp("2022-06-02T00:00:00");
// final List<Data> data = jdbcTemplate.query(
// "SELECT * FROM volkszaehler.data WHERE channel_id=? AND timestamp>? AND timestamp<=? ORDER BY timestamp;",
// (rs, rowNum) -> new Data(rs.getInt("channel_id"), rs.getLong("timestamp"), rs.getInt("value")),
// 1 ,startTimestamp, endTimestamp);
//
// LOG.info("########## Count: " + data.size());
// long currentTimestamp = startTimestamp;
// long wattMillisecondsPos = 0;
// long wattMillisecondsNeg = 0;
// for (Data d : data) {
// final long rsTimestamp = d.timestamp;
// final long rsValue = d.value;
// final long tsDiff = rsTimestamp - Math.min(endTimestamp, currentTimestamp);
// currentTimestamp = rsTimestamp;
// if (rsValue > 0) {
// wattMillisecondsPos += (rsValue * tsDiff);
// } else if (rsValue < 0) {
// wattMillisecondsNeg += (-rsValue * tsDiff);
// }
// }
// repository.save(new Aggregate(1, startTimestamp, endTimestamp, wattMillisecondsPos, wattMillisecondsNeg));
};
}
long getTimestamp(final String dateTime) {
final TemporalAccessor tempAccessor = DTF.parse(dateTime);
final Instant instant = Instant.from(tempAccessor);
return Instant.EPOCH.until(instant, ChronoUnit.MILLIS);
}
}

View File

@ -1,5 +1,6 @@
package info.peper.vz.rest;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
@ -17,8 +18,8 @@ public class ScheduledTasks {
@Autowired
private JdbcTemplate jdbcTemplate;
@Scheduled(fixedRate = 5000)
public void printSomething() {
@Scheduled(fixedDelay = 60000)
public void calculateAggregates() {
final List<Integer> houseIds = jdbcTemplate.query("SELECT house_id FROM tobias_house GROUP BY house_id ORDER BY house_id;",
(rs, rowNum) -> rs.getInt("house_id"));
for (int houseId : houseIds) {
@ -27,11 +28,27 @@ public class ScheduledTasks {
}
private void processHouse(final int houseId) {
LOG.info("Processing house " + houseId + "...");
final long lastTimestamp = jdbcTemplate.queryForObject("SELECT MAX(timestamp_end) FROM tobias_aggregate2 WHERE house_id=?", Long.class, houseId);
LOG.info("- last timestamp: " + lastTimestamp);
long currentTimeRound = (System.currentTimeMillis()/60000)*60000;
// LOG.info(System.currentTimeMillis() + "");
// LOG.info(currentTimeRound + "");
final List<Integer> channelIdsMeter = jdbcTemplate.query("SELECT channel_id FROM tobias_house WHERE house_id=? AND type=1;", (rs, rowNum) -> rs.getInt("channel_id"), houseId);
final List<Integer> channelIdsSolar = jdbcTemplate.query("SELECT channel_id FROM tobias_house WHERE house_id=? AND type=2;", (rs, rowNum) -> rs.getInt("channel_id"), houseId);
long currentTimeLastInterval = (System.currentTimeMillis()/Calculator.INTERVAL)*Calculator.INTERVAL;
if (currentTimeLastInterval > lastTimestamp) {
Calculator.processHouseMulti(houseId, lastTimestamp, currentTimeLastInterval, Calculator.INTERVAL, getAsArray(channelIdsSolar), getAsArray(channelIdsMeter), jdbcTemplate);
} else {
LOG.warn("Nothing to do for house " + houseId + "...");
}
}
private int[] getAsArray(final List<Integer> ints) {
final int[] result = new int[ints.size()];
final Iterator<Integer> iter = ints.iterator();
for (int i = 0; iter.hasNext(); i++) {
result[i] = iter.next().intValue();
}
return result;
}
}