package info.peper.vz.rest; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalAccessor; import java.util.LinkedList; import java.util.List; import info.peper.vz.rest.bo.db.Aggregate2; public class FillAggregateTableMain3 { private static final DateTimeFormatter DTF = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.systemDefault()); public static void main(String[] args) throws Exception { try (final Connection con = DriverManager.getConnection( "jdbc:mariadb://mariadb.fritz.box/volkszaehler", "vz", getPassword())) { processKerpen(con); processRoki(con); } } private static void processKerpen(final Connection con) throws SQLException { final long startTimeStamp = getTimestamp("2022-05-20T09:00:00"); final long finalEndTimeStamp = getTimestamp("2025-03-09T15:00:00"); final List aggregates = processHouse(1, startTimeStamp, finalEndTimeStamp, 60*1000, 4, 1, con); saveValues(con, aggregates, 1); } private static void processRoki(final Connection con) throws SQLException { final long startTimeStamp = getTimestamp("2024-06-23T17:00:00"); final long finalEndTimeStamp = getTimestamp("2025-03-09T15:00:00"); final List aggregates = processHouseMulti(2, startTimeStamp, finalEndTimeStamp, 60*1000, new int[] {24}, new int[] {18, 19, 20}, con); saveValues(con, aggregates, 2); } private static List processHouse(final int houseId, final long startTimestamp, final long endTimestamp, final long interval, final int channelIdSolar, final int channelIdMeter, final Connection con) throws SQLException { final List result = new LinkedList(); try (final PreparedStatement stmtSolar = con.prepareStatement("SELECT * FROM volkszaehler.data WHERE channel_id=? AND timestamp>? ORDER BY timestamp;"); final PreparedStatement stmtMeter = con.prepareStatement("SELECT * FROM volkszaehler.data WHERE channel_id=? AND timestamp>? ORDER BY timestamp;");) { stmtSolar.setInt(1, channelIdSolar); stmtSolar.setLong(2, startTimestamp); stmtSolar.setFetchSize(1000); stmtMeter.setInt(1, channelIdMeter); stmtMeter.setLong(2, startTimestamp); stmtMeter.setFetchSize(1000); try (final ResultSet rsSolar = stmtSolar.executeQuery(); final ResultSet rsMeter = stmtMeter.executeQuery()) { System.out.println("Starting..."); long intervalProduced = 0; long intervalObtained = 0; long intervalInjected = 0; long currentTsSolar = startTimestamp; long currentTsMeter = startTimestamp; long intervalStartTimestamp = startTimestamp; long intervalEndTimestamp = startTimestamp + interval; boolean noNextRecordSolar = false; boolean noNextRecordMeter = false; while (currentTsSolar <= endTimestamp || currentTsMeter <= endTimestamp) { currentTsSolar = intervalStartTimestamp; currentTsMeter = intervalStartTimestamp; System.out.print(intervalStartTimestamp + ": "); while (currentTsSolar < intervalEndTimestamp && (noNextRecordSolar || rsSolar.next())) { if (noNextRecordSolar) { noNextRecordSolar = false; } final long tsSolar = rsSolar.getLong("timestamp"); final double value = rsSolar.getDouble("value") > 1.0 ? rsSolar.getDouble("value") : 0.0; final long tsDiff = (tsSolar < intervalEndTimestamp) ? tsSolar - currentTsSolar : intervalEndTimestamp - currentTsSolar; currentTsSolar = tsSolar; intervalProduced += Math.round(value * tsDiff); if (currentTsSolar >= intervalEndTimestamp) { noNextRecordSolar = true; } System.out.print("."); } while (currentTsMeter < intervalEndTimestamp && (noNextRecordMeter || rsMeter.next())) { if (noNextRecordMeter) { noNextRecordMeter = false; } final long tsMeter = rsMeter.getLong("timestamp"); final double value = rsMeter.getDouble("value"); final long tsDiff = (tsMeter < intervalEndTimestamp) ? tsMeter - currentTsMeter : intervalEndTimestamp - currentTsMeter; currentTsMeter = tsMeter; if (value > 0.0) { intervalObtained += Math.round(value * tsDiff); } else { intervalInjected += Math.abs(Math.round(value * tsDiff)); } if (currentTsMeter >= intervalEndTimestamp) { noNextRecordMeter = true; } System.out.print("#"); } System.out.println(" " + intervalProduced + " / " + intervalObtained + " / " + intervalInjected); result.add(new Aggregate2(houseId, intervalStartTimestamp, intervalEndTimestamp, intervalProduced, intervalObtained, intervalInjected)); intervalStartTimestamp += interval; intervalEndTimestamp += interval; intervalInjected = 0; intervalObtained = 0; intervalProduced = 0; } } } return result; } private static List processHouseMulti(final int houseId, final long startTimestamp, final long endTimestamp, final long interval, final int[] channelIdsSolar, final int[] channelIdsMeter, final Connection con) throws SQLException { final List result = new LinkedList(); final PreparedStatement[] stmtSolar = new PreparedStatement[channelIdsSolar.length]; final PreparedStatement[] stmtMeter = new PreparedStatement[channelIdsMeter.length]; final ResultSet[] rsSolar = new ResultSet[channelIdsSolar.length]; final ResultSet[] rsMeter = new ResultSet[channelIdsMeter.length]; long[] currentTsSolar = new long[channelIdsSolar.length]; long[] currentTsMeter = new long[channelIdsMeter.length]; boolean[] noNextRecordSolar = new boolean[channelIdsSolar.length]; boolean[] noNextRecordMeter = new boolean[channelIdsMeter.length]; for (int i = 0; i < channelIdsSolar.length; i++) { stmtSolar[i] = con.prepareStatement("SELECT * FROM volkszaehler.data WHERE channel_id=? AND timestamp>? ORDER BY timestamp;"); stmtSolar[i].setInt(1, channelIdsSolar[i]); stmtSolar[i].setLong(2, startTimestamp); stmtSolar[i].setFetchSize(1000); rsSolar[i] = stmtSolar[i].executeQuery(); currentTsSolar[i] = startTimestamp; noNextRecordSolar[i] = false; } for (int i = 0; i < channelIdsMeter.length; i++) { stmtMeter[i] = con.prepareStatement("SELECT * FROM volkszaehler.data WHERE channel_id=? AND timestamp>? ORDER BY timestamp;"); stmtMeter[i].setInt(1, channelIdsMeter[i]); stmtMeter[i].setLong(2, startTimestamp); stmtMeter[i].setFetchSize(1000); rsMeter[i] = stmtMeter[i].executeQuery(); currentTsMeter[i] = startTimestamp; noNextRecordMeter[i] = false; } System.out.println("Starting..."); long intervalProduced = 0; long intervalObtained = 0; long intervalInjected = 0; long intervalStartTimestamp = startTimestamp; long intervalEndTimestamp = startTimestamp + interval; // Eigentliche Verarbeitung while (getMinOfArray(currentTsSolar) <= endTimestamp || getMinOfArray(currentTsMeter) <= endTimestamp) { System.out.print(intervalStartTimestamp + ": "); for (int iSolar = 0; iSolar < channelIdsSolar.length; iSolar++) { currentTsSolar[iSolar] = intervalStartTimestamp; while (currentTsSolar[iSolar] < intervalEndTimestamp && (noNextRecordSolar[iSolar] || rsSolar[iSolar].next())) { if (noNextRecordSolar[iSolar]) { noNextRecordSolar[iSolar] = false; } final long tsSolar = rsSolar[iSolar].getLong("timestamp"); final double value = rsSolar[iSolar].getDouble("value") > 1.0 ? rsSolar[iSolar].getDouble("value") : 0.0; final long tsDiff = (tsSolar < intervalEndTimestamp) ? tsSolar - currentTsSolar[iSolar] : intervalEndTimestamp - currentTsSolar[iSolar]; currentTsSolar[iSolar] = tsSolar; intervalProduced += Math.round(value * tsDiff); if (currentTsSolar[iSolar] >= intervalEndTimestamp) { noNextRecordSolar[iSolar] = true; } System.out.print("."); } } double meterValue = 0; for (int iMeter = 0; iMeter < channelIdsMeter.length; iMeter++) { currentTsMeter[iMeter] = intervalStartTimestamp; while (currentTsMeter[iMeter] < intervalEndTimestamp && (noNextRecordMeter[iMeter] || rsMeter[iMeter].next())) { if (noNextRecordMeter[iMeter]) { noNextRecordMeter[iMeter] = false; } final long tsMeter = rsMeter[iMeter].getLong("timestamp"); final double value = rsMeter[iMeter].getDouble("value"); final long tsDiff = (tsMeter < intervalEndTimestamp) ? tsMeter - currentTsMeter[iMeter] : intervalEndTimestamp - currentTsMeter[iMeter]; currentTsMeter[iMeter] = tsMeter; meterValue += Math.round(value * tsDiff); if (currentTsMeter[iMeter] >= intervalEndTimestamp) { noNextRecordMeter[iMeter] = true; } System.out.print("#"); } } if (meterValue > 0.0) { intervalObtained += Math.round(meterValue); } else { intervalInjected += Math.abs(Math.round(meterValue)); } System.out.println(" " + intervalProduced + " / " + intervalObtained + " / " + intervalInjected); result.add(new Aggregate2(houseId, intervalStartTimestamp, intervalEndTimestamp, intervalProduced, intervalObtained, intervalInjected)); intervalStartTimestamp += interval; intervalEndTimestamp += interval; intervalInjected = 0; intervalObtained = 0; intervalProduced = 0; } // Aufräumen for (int i = 0; i < channelIdsSolar.length; i++) { rsSolar[i].close(); stmtSolar[i].close(); } for (int i = 0; i < channelIdsMeter.length; i++) { rsMeter[i].close(); stmtMeter[i].close(); } return result; } private static long getMinOfArray(final long[] input) { long result = Long.MAX_VALUE; for (long i : input) { if (i < result) { result = i; } } return result; } private static void saveValues(final Connection con, final List aggregates, final int houseId) throws SQLException { System.out.println("Saving entries... : " + aggregates.size()); try (final PreparedStatement stmt = con.prepareStatement("INSERT INTO tobias_aggregate2 (house_id, timestamp_start, timestamp_end, produced_energy, obtained_energy, injected_energy) VALUES (?, ?, ?, ?, ?, ?)")) { int i = 0; for (Aggregate2 ats : aggregates) { stmt.setInt(1, houseId); stmt.setLong(2, ats.getTimestampStart()); stmt.setLong(3, ats.getTimestampEnd()); stmt.setLong(4, ats.getProducedEnergy()); stmt.setLong(5, ats.getObtainedEnergy()); stmt.setLong(6, ats.getInjectedEnergy()); stmt.addBatch(); i++; if (i % 100000 == 0) { System.out.println(i); stmt.executeBatch(); } } stmt.executeBatch(); } } private static String getPassword() throws IOException { try (final InputStream is = new FileInputStream("db-password.txt")) { final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); return bufferedReader.readLine(); } } private static long getTimestamp(final String dateTime) { final TemporalAccessor tempAccessor = DTF.parse(dateTime); final Instant instant = Instant.from(tempAccessor); return Instant.EPOCH.until(instant, ChronoUnit.MILLIS); } }