package org.apache.flink.streaming.connectors.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;

/* loaded from: input_file:org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.class */
public class CassandraCommitter extends CheckpointCommitter {
    private static final long serialVersionUID = 1;
    private final ClusterBuilder builder;
    private transient Cluster cluster;
    private transient Session session;
    private String keySpace;
    private String table;
    private final Map<Integer, Long> lastCommittedCheckpoints;

    public CassandraCommitter(ClusterBuilder clusterBuilder) {
        this.keySpace = "flink_auxiliary";
        this.table = "checkpoints_";
        this.lastCommittedCheckpoints = new HashMap();
        this.builder = clusterBuilder;
        ClosureCleaner.clean(clusterBuilder, true);
    }

    public CassandraCommitter(ClusterBuilder clusterBuilder, String str) {
        this(clusterBuilder);
        this.keySpace = str;
    }

    public void setJobId(String str) throws Exception {
        super.setJobId(str);
        this.table += str;
    }

    public void createResource() throws Exception {
        this.cluster = this.builder.getCluster();
        this.session = this.cluster.connect();
        this.session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':1};", this.keySpace));
        this.session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", this.keySpace, this.table));
        try {
            this.session.close();
        } catch (Exception e) {
            LOG.error("Error while closing session.", e);
        }
        try {
            this.cluster.close();
        } catch (Exception e2) {
            LOG.error("Error while closing cluster.", e2);
        }
    }

    public void open() throws Exception {
        if (this.builder == null) {
            throw new RuntimeException("No ClusterBuilder was set.");
        }
        this.cluster = this.builder.getCluster();
        this.session = this.cluster.connect();
    }

    public void close() throws Exception {
        this.lastCommittedCheckpoints.clear();
        try {
            this.session.close();
        } catch (Exception e) {
            LOG.error("Error while closing session.", e);
        }
        try {
            this.cluster.close();
        } catch (Exception e2) {
            LOG.error("Error while closing cluster.", e2);
        }
    }

    public void commitCheckpoint(int i, long j) {
        this.session.execute(String.format("UPDATE %s.%s set checkpoint_id=%d where sink_id='%s' and sub_id=%d;", this.keySpace, this.table, Long.valueOf(j), this.operatorId, Integer.valueOf(i)));
        this.lastCommittedCheckpoints.put(Integer.valueOf(i), Long.valueOf(j));
    }

    public boolean isCheckpointCommitted(int i, long j) {
        Long l = this.lastCommittedCheckpoints.get(Integer.valueOf(i));
        if (l == null) {
            Iterator<Row> it = this.session.execute(String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", this.keySpace, this.table, this.operatorId, Integer.valueOf(i))).iterator();
            if (it.hasNext()) {
                l = Long.valueOf(it.next().getLong("checkpoint_id"));
                this.lastCommittedCheckpoints.put(Integer.valueOf(i), l);
            }
        }
        return l != null && j <= l.longValue();
    }
}
