package org.apache.flink.streaming.connectors.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.Futures;
import org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.class */
public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
    protected static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBase.class);
    protected transient Cluster cluster;
    protected transient Session session;
    protected volatile transient Throwable exception;
    protected transient FutureCallback<V> callback;
    private final ClusterBuilder builder;
    private final AtomicInteger updatesPending = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: protected */
    public CassandraSinkBase(ClusterBuilder clusterBuilder) {
        this.builder = clusterBuilder;
        ClosureCleaner.clean(clusterBuilder, true);
    }

    public void open(Configuration configuration) {
        this.callback = new FutureCallback<V>() { // from class: org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.1
            @Override // org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(V v) {
                if (CassandraSinkBase.this.updatesPending.decrementAndGet() == 0) {
                    synchronized (CassandraSinkBase.this.updatesPending) {
                        CassandraSinkBase.this.updatesPending.notifyAll();
                    }
                }
            }

            @Override // org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                if (CassandraSinkBase.this.updatesPending.decrementAndGet() == 0) {
                    synchronized (CassandraSinkBase.this.updatesPending) {
                        CassandraSinkBase.this.updatesPending.notifyAll();
                    }
                }
                CassandraSinkBase.this.exception = th;
                CassandraSinkBase.LOG.error("Error while sending value.", th);
            }
        };
        this.cluster = this.builder.getCluster();
        this.session = this.cluster.connect();
    }

    public void invoke(IN in) throws Exception {
        if (this.exception != null) {
            throw new IOException("Error while sending value.", this.exception);
        }
        ListenableFuture<V> send = send(in);
        this.updatesPending.incrementAndGet();
        Futures.addCallback(send, this.callback);
    }

    public abstract ListenableFuture<V> send(IN in);

    public void close() throws Exception {
        try {
            if (this.exception != null) {
                throw new IOException("Error while sending value.", this.exception);
            }
            while (this.updatesPending.get() > 0) {
                synchronized (this.updatesPending) {
                    this.updatesPending.wait();
                }
            }
            try {
                if (this.session != null) {
                    this.session.close();
                }
            } catch (Exception e) {
                LOG.error("Error while closing session.", e);
            }
            try {
                if (this.cluster != null) {
                    this.cluster.close();
                }
            } catch (Exception e2) {
                LOG.error("Error while closing cluster.", e2);
            }
        } catch (Throwable th) {
            try {
                if (this.session != null) {
                    this.session.close();
                }
            } catch (Exception e3) {
                LOG.error("Error while closing session.", e3);
            }
            try {
                if (this.cluster != null) {
                    this.cluster.close();
                }
            } catch (Exception e4) {
                LOG.error("Error while closing cluster.", e4);
            }
            throw th;
        }
    }
}
