package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.flink.api.java.tuple.Tuple2;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/MySqlAdapter.class */
public class MySqlAdapter implements DbAdapter {
    private static final long serialVersionUID = 1;

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public void createCheckpointsTable(String str, Connection connection) throws SQLException {
        Statement createStatement = connection.createStatement();
        Throwable th = null;
        try {
            try {
                createStatement.executeUpdate("CREATE TABLE IF NOT EXISTS checkpoints_" + str + " (checkpointId bigint, timestamp bigint, handleId bigint,checkpoint blob,PRIMARY KEY (handleId))");
                if (createStatement != null) {
                    if (0 == 0) {
                        createStatement.close();
                        return;
                    }
                    try {
                        createStatement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createStatement != null) {
                if (th != null) {
                    try {
                        createStatement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createStatement.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public PreparedStatement prepareCheckpointInsert(String str, Connection connection) throws SQLException {
        return connection.prepareStatement("INSERT INTO checkpoints_" + str + " (checkpointId, timestamp, handleId, checkpoint) VALUES (?,?,?,?)");
    }

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public void setCheckpointInsertParams(String str, PreparedStatement preparedStatement, long j, long j2, long j3, byte[] bArr) throws SQLException {
        preparedStatement.setLong(1, j);
        preparedStatement.setLong(2, j2);
        preparedStatement.setLong(3, j3);
        preparedStatement.setBytes(4, bArr);
    }

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public byte[] getCheckpoint(String str, Connection connection, long j, long j2, long j3) throws SQLException {
        Statement createStatement = connection.createStatement();
        Throwable th = null;
        try {
            ResultSet executeQuery = createStatement.executeQuery("SELECT checkpoint FROM checkpoints_" + str + " WHERE handleId = " + j3);
            if (!executeQuery.next()) {
                throw new SQLException("Checkpoint cannot be found in the database.");
            }
            byte[] bytes = executeQuery.getBytes(1);
            if (createStatement != null) {
                if (0 != 0) {
                    try {
                        createStatement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createStatement.close();
                }
            }
            return bytes;
        } catch (Throwable th3) {
            if (createStatement != null) {
                if (0 != 0) {
                    try {
                        createStatement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createStatement.close();
                }
            }
            throw th3;
        }
    }

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public void deleteCheckpoint(String str, Connection connection, long j, long j2, long j3) throws SQLException {
        Statement createStatement = connection.createStatement();
        Throwable th = null;
        try {
            try {
                createStatement.executeUpdate("DELETE FROM checkpoints_" + str + " WHERE handleId = " + j3);
                if (createStatement != null) {
                    if (0 == 0) {
                        createStatement.close();
                        return;
                    }
                    try {
                        createStatement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createStatement != null) {
                if (th != null) {
                    try {
                        createStatement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createStatement.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public void disposeAllStateForJob(String str, Connection connection) throws SQLException {
        Statement createStatement = connection.createStatement();
        Throwable th = null;
        try {
            try {
                createStatement.executeUpdate("DROP TABLE checkpoints_" + str);
                if (createStatement != null) {
                    if (0 == 0) {
                        createStatement.close();
                        return;
                    }
                    try {
                        createStatement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createStatement != null) {
                if (th != null) {
                    try {
                        createStatement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createStatement.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public void createKVStateTable(String str, Connection connection) throws SQLException {
        validateStateId(str);
        Statement createStatement = connection.createStatement();
        Throwable th = null;
        try {
            try {
                createStatement.executeUpdate("CREATE TABLE IF NOT EXISTS " + str + " (timestamp bigint, k varbinary(256), v blob, PRIMARY KEY (k, timestamp) )");
                if (createStatement != null) {
                    if (0 == 0) {
                        createStatement.close();
                        return;
                    }
                    try {
                        createStatement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createStatement != null) {
                if (th != null) {
                    try {
                        createStatement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createStatement.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public String prepareKVCheckpointInsert(String str) throws SQLException {
        validateStateId(str);
        return "INSERT INTO " + str + " (timestamp, k, v) VALUES (?,?,?) ON DUPLICATE KEY UPDATE v=? ";
    }

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public String prepareKeyLookup(String str) throws SQLException {
        validateStateId(str);
        return "SELECT v FROM " + str + " WHERE k = ? AND timestamp <= ? ORDER BY timestamp DESC LIMIT 1";
    }

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public byte[] lookupKey(String str, PreparedStatement preparedStatement, byte[] bArr, long j) throws SQLException {
        preparedStatement.setBytes(1, bArr);
        preparedStatement.setLong(2, j);
        ResultSet executeQuery = preparedStatement.executeQuery();
        if (executeQuery.next()) {
            return executeQuery.getBytes(1);
        }
        return null;
    }

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public void cleanupFailedCheckpoints(String str, Connection connection, long j, long j2) throws SQLException {
        validateStateId(str);
        Statement createStatement = connection.createStatement();
        Throwable th = null;
        try {
            createStatement.executeUpdate("DELETE FROM " + str + " WHERE timestamp > " + j + " AND timestamp < " + j2);
            if (createStatement != null) {
                if (0 == 0) {
                    createStatement.close();
                    return;
                }
                try {
                    createStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createStatement != null) {
                if (0 != 0) {
                    try {
                        createStatement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createStatement.close();
                }
            }
            throw th3;
        }
    }

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public void compactKvStates(String str, Connection connection, long j, long j2) throws SQLException {
        validateStateId(str);
        Statement createStatement = connection.createStatement();
        Throwable th = null;
        try {
            createStatement.executeUpdate("DELETE state.* FROM " + str + " AS state JOIN ( \tSELECT MAX(timestamp) AS maxts, k FROM " + str + " \tWHERE timestamp BETWEEN " + j + " AND " + j2 + " \tGROUP BY k ) m ON state.k = m.k AND state.timestamp >= " + j);
            if (createStatement != null) {
                if (0 == 0) {
                    createStatement.close();
                    return;
                }
                try {
                    createStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createStatement != null) {
                if (0 != 0) {
                    try {
                        createStatement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createStatement.close();
                }
            }
            throw th3;
        }
    }

    protected static void validateStateId(String str) {
        if (!str.matches("[a-zA-Z0-9_]+")) {
            throw new RuntimeException("State name contains invalid characters: " + str);
        }
    }

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public void insertBatch(final String str, DbBackendConfig dbBackendConfig, Connection connection, final PreparedStatement preparedStatement, final long j, final List<Tuple2<byte[], byte[]>> list) throws IOException {
        SQLRetrier.retry(new Callable<Void>() { // from class: org.apache.flink.contrib.streaming.state.MySqlAdapter.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                for (Tuple2 tuple2 : list) {
                    MySqlAdapter.this.setKvInsertParams(str, preparedStatement, j, (byte[]) tuple2.f0, (byte[]) tuple2.f1);
                    preparedStatement.addBatch();
                }
                preparedStatement.executeBatch();
                preparedStatement.clearBatch();
                return null;
            }
        }, new Callable<Void>() { // from class: org.apache.flink.contrib.streaming.state.MySqlAdapter.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                preparedStatement.clearBatch();
                return null;
            }
        }, dbBackendConfig.getMaxNumberOfSqlRetries(), dbBackendConfig.getSleepBetweenSqlRetries());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setKvInsertParams(String str, PreparedStatement preparedStatement, long j, byte[] bArr, byte[] bArr2) throws SQLException {
        preparedStatement.setLong(1, j);
        preparedStatement.setBytes(2, bArr);
        if (bArr2 != null) {
            preparedStatement.setBytes(3, bArr2);
            preparedStatement.setBytes(4, bArr2);
        } else {
            preparedStatement.setNull(3, 2004);
            preparedStatement.setNull(4, 2004);
        }
    }

    @Override // org.apache.flink.contrib.streaming.state.DbAdapter
    public void keepAlive(Connection connection) throws SQLException {
        Statement createStatement = connection.createStatement();
        Throwable th = null;
        try {
            createStatement.executeQuery("SELECT 1");
            if (createStatement != null) {
                if (0 == 0) {
                    createStatement.close();
                    return;
                }
                try {
                    createStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createStatement != null) {
                if (0 != 0) {
                    try {
                        createStatement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createStatement.close();
                }
            }
            throw th3;
        }
    }
}
