package info.peper.vz.rest; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalAccessor; import java.util.LinkedList; import java.util.List; public class FillAggregateTableMain2 { private static final DateTimeFormatter DTF = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()); private static final int[] CHANNELS_TO_USE = new int[] {1, 4, }; private static final List SAVELIST = new LinkedList(); private static class AggregateToSave { private final long startTs; private final long endTs; private final long valuePos; private final long valueNeg; private final int channelId; private AggregateToSave(long startTs, long endTs, long valuePos, long valueNeg, int channelId) { super(); this.startTs = startTs; this.endTs = endTs; this.valuePos = valuePos; this.valueNeg = valueNeg; this.channelId = channelId; } private long getStartTs() { return startTs; } private long getEndTs() { return endTs; } private long getValuePos() { return valuePos; } private long getValueNeg() { return valueNeg; } private int getChannelId() { return channelId; } } public static void main(String[] args) throws Exception { try (final Connection con = DriverManager.getConnection( "jdbc:mariadb://mariadb.fritz.box/volkszaehler", "vz", getPassword())) { // long startTimeStamp = getTimestamp("2025-01-01T09:00:00"); final long startTimeStamp = getTimestamp("2022-05-20T09:00:00"); final long finalEndTimeStamp = getTimestamp("2025-03-01T00:00:00"); for (int channelId : CHANNELS_TO_USE) { processChannel(startTimeStamp, finalEndTimeStamp, 60*1000, channelId, con); } System.out.println("Saving... " + SAVELIST.size()); // saveValues(con); // for (AggregateToSave ats : SAVELIST) { // saveValues(con, ats.startTs, ats.endTs, ats.channelId, new long[] {ats.valuePos, ats.valueNeg}); // System.out.println("####"); // } } } private static void processChannel(final long startTimestamp, final long endTimestamp, final long interval, final int channelId, final Connection con) throws SQLException { long currentTimestamp = startTimestamp; long intervalStartTimestamp = startTimestamp; long intervalEndTimestamp = startTimestamp + interval; long intervalPos = 0; long intervalNeg = 0; try (final PreparedStatement stmt = con.prepareStatement("SELECT * FROM volkszaehler.data WHERE channel_id=? AND timestamp>? ORDER BY timestamp;")) { stmt.setInt(1, channelId); stmt.setLong(2, startTimestamp); stmt.setFetchSize(1000); boolean noNextRecord = false; try (final ResultSet rs = stmt.executeQuery()) { long sumDiff = 0; while (noNextRecord || (currentTimestamp <= endTimestamp && rs.next())) { noNextRecord = false; final long timestamp = rs.getLong("timestamp"); final long value; if (rs.getDouble("value") >= 0 && rs.getDouble("value") < 1) { value = 0; } else { value = rs.getLong("value"); } final long tsDiff; if (timestamp < intervalEndTimestamp) { tsDiff = timestamp - currentTimestamp; } else { tsDiff = intervalEndTimestamp - currentTimestamp; } // final long tsDiff = Math.min(timestamp - currentTimestamp, intervalEndTimestamp - currentTimestamp); sumDiff += tsDiff; currentTimestamp = timestamp; if (value > 0) { intervalPos += (value * tsDiff); } else if (value < 0) { intervalNeg += (-value * tsDiff); } if (timestamp >= intervalEndTimestamp) { if (sumDiff != interval) { System.err.println("sumDiff: " + sumDiff + " / interval: " + interval); } sumDiff = 0; if (intervalStartTimestamp % (86400000*10) == 0) { System.out.println( DTF.format(Instant.ofEpochMilli(intervalStartTimestamp)) + " - " + DTF.format(Instant.ofEpochMilli(intervalEndTimestamp)) + " / " + (intervalPos/3600) + " / " + (intervalNeg/3600)); } SAVELIST.add(new AggregateToSave(intervalStartTimestamp, intervalEndTimestamp, intervalPos/3600, intervalNeg/3600, channelId)); intervalStartTimestamp += interval; intervalEndTimestamp += interval; currentTimestamp = intervalStartTimestamp; intervalPos = 0; intervalNeg = 0; noNextRecord = true; } } } } } private static String getPassword() throws IOException { try (final InputStream is = new FileInputStream("db-password.txt")) { final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); return bufferedReader.readLine(); } } private static long getTimestamp(final String dateTime) { final TemporalAccessor tempAccessor = DTF.parse(dateTime); final Instant instant = Instant.from(tempAccessor); return Instant.EPOCH.until(instant, ChronoUnit.MILLIS); } private static void saveValues(final Connection con, final long startTimestamp, final long endTimestamp, final int channelId, final long[] values) throws SQLException { try (final PreparedStatement stmt = con.prepareStatement("INSERT INTO tobias_aggregate (channel_id, timestamp_start, timestamp_end, sum_positive, sum_negative) VALUES (?, ?, ?, ?, ?)")) { stmt.setInt(1, channelId); stmt.setLong(2, startTimestamp); stmt.setLong(3, endTimestamp); stmt.setLong(4, values[0]); stmt.setLong(5, values[1]); stmt.execute(); } } private static void saveValues(final Connection con) throws SQLException { try (final PreparedStatement stmt = con.prepareStatement("INSERT INTO tobias_aggregate (channel_id, timestamp_start, timestamp_end, sum_positive, sum_negative) VALUES (?, ?, ?, ?, ?)")) { int i = 0; for (AggregateToSave ats : SAVELIST) { stmt.setInt(1, ats.channelId); stmt.setLong(2, ats.startTs); stmt.setLong(3, ats.endTs); stmt.setLong(4, ats.valuePos); stmt.setLong(5, ats.valueNeg); stmt.addBatch(); i++; if (i % 100000 == 0) { System.out.println(i); stmt.executeBatch(); } } stmt.executeBatch(); } } }