package org.apache.flink.batch.connectors.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import java.io.IOException;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.cassandra.shaded.com.google.common.base.Strings;
import org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.Futures;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.class */
public abstract class CassandraOutputFormatBase<OUT> extends RichOutputFormat<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormatBase.class);
    private final String insertQuery;
    private final ClusterBuilder builder;
    private transient Cluster cluster;
    private transient Session session;
    private transient PreparedStatement prepared;
    private transient FutureCallback<ResultSet> callback;
    private transient Throwable exception = null;

    public CassandraOutputFormatBase(String str, ClusterBuilder clusterBuilder) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "Query cannot be null or empty");
        Preconditions.checkArgument(clusterBuilder != null, "Builder cannot be null");
        this.insertQuery = str;
        this.builder = clusterBuilder;
    }

    public void configure(Configuration configuration) {
        this.cluster = this.builder.getCluster();
    }

    public void open(int i, int i2) throws IOException {
        this.session = this.cluster.connect();
        this.prepared = this.session.prepare(this.insertQuery);
        this.callback = new FutureCallback<ResultSet>() { // from class: org.apache.flink.batch.connectors.cassandra.CassandraOutputFormatBase.1
            @Override // org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(ResultSet resultSet) {
                CassandraOutputFormatBase.this.onWriteSuccess(resultSet);
            }

            @Override // org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                CassandraOutputFormatBase.this.onWriteFailure(th);
            }
        };
    }

    public void writeRecord(OUT out) throws IOException {
        if (this.exception != null) {
            throw new IOException("write record failed", this.exception);
        }
        Futures.addCallback(this.session.executeAsync(this.prepared.bind(extractFields(out))), this.callback);
    }

    protected abstract Object[] extractFields(OUT out);

    protected void onWriteSuccess(ResultSet resultSet) {
    }

    protected void onWriteFailure(Throwable th) {
        this.exception = th;
    }

    public void close() throws IOException {
        try {
            if (this.session != null) {
                this.session.close();
            }
        } catch (Exception e) {
            LOG.error("Error while closing session.", e);
        }
        try {
            if (this.cluster != null) {
                this.cluster.close();
            }
        } catch (Exception e2) {
            LOG.error("Error while closing cluster.", e2);
        }
    }
}
