package org.apache.kudu.flume.sink;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.List;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.kudu.client.AsyncKuduClient;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/kudu/flume/sink/KuduSink.class */
public class KuduSink extends AbstractSink implements Configurable {
    private static final int DEFAULT_BATCH_SIZE = 1000;
    private static final String DEFAULT_KUDU_OPERATION_PRODUCER = "org.apache.kudu.flume.sink.SimpleKuduOperationsProducer";
    private static final boolean DEFAULT_IGNORE_DUPLICATE_ROWS = true;
    private String masterAddresses;
    private String tableName;
    private int batchSize;
    private long timeoutMillis;
    private boolean ignoreDuplicateRows;
    private KuduTable table;
    private KuduSession session;
    private KuduClient client;
    private KuduOperationsProducer operationsProducer;
    private SinkCounter sinkCounter;
    private static final Logger logger = LoggerFactory.getLogger(KuduSink.class);
    private static final Long DEFAULT_TIMEOUT_MILLIS = Long.valueOf(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);

    public KuduSink() {
        this(null);
    }

    @VisibleForTesting
    @InterfaceAudience.Private
    public KuduSink(KuduClient kuduClient) {
        this.client = kuduClient;
    }

    public void start() {
        Preconditions.checkState(this.table == null && this.session == null, "Please call stop before calling start on an old instance.");
        if (this.client == null) {
            this.client = new KuduClient.KuduClientBuilder(this.masterAddresses).build();
        }
        this.session = this.client.newSession();
        this.session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
        this.session.setTimeoutMillis(this.timeoutMillis);
        this.session.setIgnoreAllDuplicateRows(this.ignoreDuplicateRows);
        this.session.setMutationBufferSpace(this.batchSize);
        try {
            this.table = this.client.openTable(this.tableName);
            this.operationsProducer.initialize(this.table);
            super.start();
            this.sinkCounter.incrementConnectionCreatedCount();
            this.sinkCounter.start();
        } catch (Exception e) {
            this.sinkCounter.incrementConnectionFailedCount();
            String format = String.format("Could not open Kudu table '%s'", this.tableName);
            logger.error(format, e);
            throw new FlumeException(format, e);
        }
    }

    public void stop() {
        Exception exc = null;
        try {
            this.operationsProducer.close();
        } catch (Exception e) {
            exc = e;
            logger.error("Error closing operations producer", e);
        }
        try {
            if (this.client != null) {
                this.client.shutdown();
            }
            this.client = null;
            this.table = null;
            this.session = null;
        } catch (Exception e2) {
            exc = e2;
            logger.error("Error closing client", e2);
        }
        this.sinkCounter.incrementConnectionClosedCount();
        this.sinkCounter.stop();
        if (exc != null) {
            throw new FlumeException("Error stopping sink", exc);
        }
    }

    public void configure(Context context) {
        this.masterAddresses = context.getString(KuduSinkConfigurationConstants.MASTER_ADDRESSES);
        Preconditions.checkNotNull(this.masterAddresses, "Missing master addresses. Please specify property '$s'.", KuduSinkConfigurationConstants.MASTER_ADDRESSES);
        this.tableName = context.getString(KuduSinkConfigurationConstants.TABLE_NAME);
        Preconditions.checkNotNull(this.tableName, "Missing table name. Please specify property '%s'", KuduSinkConfigurationConstants.TABLE_NAME);
        this.batchSize = context.getInteger(KuduSinkConfigurationConstants.BATCH_SIZE, Integer.valueOf(DEFAULT_BATCH_SIZE)).intValue();
        this.timeoutMillis = context.getLong(KuduSinkConfigurationConstants.TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS).longValue();
        this.ignoreDuplicateRows = context.getBoolean(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS, true).booleanValue();
        String string = context.getString(KuduSinkConfigurationConstants.PRODUCER);
        if (string == null || string.isEmpty()) {
            string = DEFAULT_KUDU_OPERATION_PRODUCER;
            logger.warn("No Kudu operations producer provided, using default");
        }
        Context context2 = new Context();
        context2.putAll(context.getSubProperties(KuduSinkConfigurationConstants.PRODUCER_PREFIX));
        try {
            this.operationsProducer = (KuduOperationsProducer) Class.forName(string).newInstance();
            this.operationsProducer.configure(context2);
            this.sinkCounter = new SinkCounter(getName());
        } catch (Exception e) {
            logger.error("Could not instantiate Kudu operations producer", e);
            throw new RuntimeException(e);
        }
    }

    public KuduClient getClient() {
        return this.client;
    }

    public Sink.Status process() throws EventDeliveryException {
        Event take;
        if (this.session.hasPendingOperations()) {
            return Sink.Status.BACKOFF;
        }
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            long j = 0;
            while (j < this.batchSize && (take = channel.take()) != null) {
                try {
                    Iterator<Operation> it = this.operationsProducer.getOperations(take).iterator();
                    while (it.hasNext()) {
                        this.session.apply(it.next());
                    }
                    j++;
                } catch (Throwable th) {
                    transaction.rollback();
                    logger.error("Failed to commit transaction. Transaction rolled back.", th);
                    if ((th instanceof Error) || (th instanceof RuntimeException)) {
                        throw new RuntimeException(th);
                    }
                    logger.error("Failed to commit transaction. Transaction rolled back.", th);
                    throw new EventDeliveryException("Failed to commit transaction. Transaction rolled back.", th);
                }
            }
            logger.debug("Flushing {} events", Long.valueOf(j));
            List<OperationResponse> flush = this.session.flush();
            if (flush != null) {
                for (OperationResponse operationResponse : flush) {
                    if (operationResponse.hasRowError()) {
                        throw new EventDeliveryException("Failed to flush one or more changes. Transaction rolled back: " + operationResponse.getRowError().toString());
                    }
                }
            }
            if (j == 0) {
                this.sinkCounter.incrementBatchEmptyCount();
            } else if (j == this.batchSize) {
                this.sinkCounter.incrementBatchCompleteCount();
            } else {
                this.sinkCounter.incrementBatchUnderflowCount();
            }
            transaction.commit();
            if (j == 0) {
                Sink.Status status = Sink.Status.BACKOFF;
                transaction.close();
                return status;
            }
            this.sinkCounter.addToEventDrainSuccessCount(j);
            Sink.Status status2 = Sink.Status.READY;
            transaction.close();
            return status2;
        } catch (Throwable th2) {
            transaction.close();
            throw th2;
        }
    }
}
