package org.apache.flink.contrib.streaming.state;

import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.Callable;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.GenericFoldingState;
import org.apache.flink.runtime.state.GenericListState;
import org.apache.flink.runtime.state.GenericReducingState;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/DbStateBackend.class */
public class DbStateBackend extends AbstractStateBackend {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(DbStateBackend.class);
    private Random rnd;
    private transient Environment env;
    private transient String jobId;
    private final DbBackendConfig dbConfig;
    private final DbAdapter dbAdapter;
    private ShardedConnection connections;
    private final int numSqlRetries;
    private final int sqlRetrySleep;
    private transient PreparedStatement insertStatement;
    private String operatorIdentifier;
    private AbstractStateBackend nonPartitionedStateBackend;

    public DbStateBackend(DbBackendConfig dbBackendConfig) {
        this.nonPartitionedStateBackend = null;
        this.dbConfig = dbBackendConfig;
        this.dbAdapter = dbBackendConfig.getDbAdapter();
        this.numSqlRetries = dbBackendConfig.getMaxNumberOfSqlRetries();
        this.sqlRetrySleep = dbBackendConfig.getSleepBetweenSqlRetries();
    }

    public DbStateBackend(DbBackendConfig dbBackendConfig, AbstractStateBackend abstractStateBackend) {
        this(dbBackendConfig);
        this.nonPartitionedStateBackend = abstractStateBackend;
    }

    public ShardedConnection getConnections() {
        return this.connections;
    }

    public boolean isInitialized() {
        return this.connections != null;
    }

    public Environment getEnvironment() {
        return this.env;
    }

    public DbBackendConfig getConfiguration() {
        return this.dbConfig;
    }

    public <S extends Serializable> StateHandle<S> checkpointStateSerializable(final S s, final long j, final long j2) throws Exception {
        return this.nonPartitionedStateBackend == null ? (StateHandle) SQLRetrier.retry(new Callable<DbStateHandle<S>>() { // from class: org.apache.flink.contrib.streaming.state.DbStateBackend.1
            @Override // java.util.concurrent.Callable
            public DbStateHandle<S> call() throws Exception {
                long nextLong = DbStateBackend.this.rnd.nextLong();
                DbStateBackend.this.dbAdapter.setCheckpointInsertParams(DbStateBackend.this.jobId, DbStateBackend.this.insertStatement, j, j2, nextLong, InstantiationUtil.serializeObject(s));
                DbStateBackend.this.insertStatement.executeUpdate();
                return new DbStateHandle<>(DbStateBackend.this.jobId, j, j2, nextLong, DbStateBackend.this.dbConfig, r0.length);
            }
        }, this.numSqlRetries, this.sqlRetrySleep) : this.nonPartitionedStateBackend.checkpointStateSerializable(s, j, j2);
    }

    public AbstractStateBackend.CheckpointStateOutputStream createCheckpointStateOutputStream(long j, long j2) throws Exception {
        if (this.nonPartitionedStateBackend == null) {
            throw new UnsupportedOperationException("Use ceckpointStateSerializable instead.");
        }
        return this.nonPartitionedStateBackend.createCheckpointStateOutputStream(j, j2);
    }

    protected <N, T> ValueState<T> createValueState(TypeSerializer<N> typeSerializer, ValueStateDescriptor<T> valueStateDescriptor) throws Exception {
        if (valueStateDescriptor.isSerializerInitialized()) {
            return new LazyDbValueState(this.operatorIdentifier + "_" + valueStateDescriptor.getName(), this.env.getTaskInfo().getIndexOfThisSubtask() == 0, getConnections(), getConfiguration(), this.keySerializer, typeSerializer, valueStateDescriptor);
        }
        throw new IllegalArgumentException("state descriptor serializer not initialized");
    }

    protected <N, T> ListState<T> createListState(TypeSerializer<N> typeSerializer, ListStateDescriptor<T> listStateDescriptor) throws Exception {
        if (listStateDescriptor.isSerializerInitialized()) {
            return new GenericListState(createValueState(typeSerializer, new ValueStateDescriptor<>(listStateDescriptor.getName(), new ArrayListSerializer(listStateDescriptor.getSerializer()), (Object) null)));
        }
        throw new IllegalArgumentException("state descriptor serializer not initialized");
    }

    protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> typeSerializer, ReducingStateDescriptor<T> reducingStateDescriptor) throws Exception {
        if (reducingStateDescriptor.isSerializerInitialized()) {
            return new GenericReducingState(createValueState(typeSerializer, new ValueStateDescriptor<>(reducingStateDescriptor.getName(), reducingStateDescriptor.getSerializer(), (Object) null)), reducingStateDescriptor.getReduceFunction());
        }
        throw new IllegalArgumentException("state descriptor serializer not initialized");
    }

    protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> typeSerializer, FoldingStateDescriptor<T, ACC> foldingStateDescriptor) throws Exception {
        if (foldingStateDescriptor.isSerializerInitialized()) {
            return new GenericFoldingState(createValueState(typeSerializer, new ValueStateDescriptor<>(foldingStateDescriptor.getName(), foldingStateDescriptor.getSerializer(), foldingStateDescriptor.getDefaultValue())), foldingStateDescriptor.getFoldFunction());
        }
        throw new IllegalArgumentException("state descriptor serializer not initialized");
    }

    public void initializeForJob(Environment environment, String str, TypeSerializer<?> typeSerializer) throws Exception {
        super.initializeForJob(environment, str, typeSerializer);
        this.operatorIdentifier = str;
        this.rnd = new Random();
        this.env = environment;
        this.jobId = environment.getJobID().toString().substring(0, 16);
        this.connections = this.dbConfig.createShardedConnection();
        this.connections.setTransactionIsolation(1);
        if (this.nonPartitionedStateBackend == null) {
            this.insertStatement = (PreparedStatement) SQLRetrier.retry(new Callable<PreparedStatement>() { // from class: org.apache.flink.contrib.streaming.state.DbStateBackend.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public PreparedStatement call() throws SQLException {
                    DbStateBackend.this.dbAdapter.createCheckpointsTable(DbStateBackend.this.jobId, DbStateBackend.this.getConnections().getFirst());
                    return DbStateBackend.this.dbAdapter.prepareCheckpointInsert(DbStateBackend.this.jobId, DbStateBackend.this.getConnections().getFirst());
                }
            }, this.numSqlRetries, this.sqlRetrySleep);
        } else {
            this.nonPartitionedStateBackend.initializeForJob(environment, str, typeSerializer);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Database state backend successfully initialized");
        }
    }

    public void close() throws Exception {
        ShardedConnection shardedConnection = this.connections;
        Throwable th = null;
        try {
            if (this.nonPartitionedStateBackend == null) {
                this.insertStatement.close();
            } else {
                this.nonPartitionedStateBackend.close();
            }
            if (shardedConnection != null) {
                if (0 == 0) {
                    shardedConnection.close();
                    return;
                }
                try {
                    shardedConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (shardedConnection != null) {
                if (0 != 0) {
                    try {
                        shardedConnection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    shardedConnection.close();
                }
            }
            throw th3;
        }
    }

    public void disposeAllStateForCurrentJob() throws Exception {
        if (this.nonPartitionedStateBackend == null) {
            this.dbAdapter.disposeAllStateForJob(this.jobId, this.connections.getFirst());
        } else {
            this.nonPartitionedStateBackend.disposeAllStateForCurrentJob();
        }
    }
}
