package org.apache.kudu.flume.sink;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.util.Iterator;
import java.util.List;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.kudu.annotations.InterfaceAudience;
import org.apache.kudu.annotations.InterfaceStability;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.SessionConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
@InterfaceAudience.Public
/* loaded from: input_file:org/apache/kudu/flume/sink/KuduSink.class */
public class KuduSink extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(KuduSink.class);
    private static final Long DEFAULT_BATCH_SIZE = 100L;
    private static final Long DEFAULT_TIMEOUT_MILLIS = 30000L;
    private static final String DEFAULT_KUDU_EVENT_PRODUCER = "org.apache.kudu.flume.sink.SimpleKuduEventProducer";
    private static final boolean DEFAULT_IGNORE_DUPLICATE_ROWS = true;
    private String masterAddresses;
    private String tableName;
    private long batchSize;
    private long timeoutMillis;
    private boolean ignoreDuplicateRows;
    private KuduTable table;
    private KuduSession session;
    private KuduClient client;
    private KuduEventProducer eventProducer;
    private String eventProducerType;
    private Context producerContext;
    private SinkCounter sinkCounter;

    public KuduSink() {
        this(null);
    }

    @VisibleForTesting
    @InterfaceAudience.Private
    public KuduSink(KuduClient kuduClient) {
        this.client = kuduClient;
    }

    public void start() {
        Preconditions.checkState(this.table == null && this.session == null, "Please call stop before calling start on an old instance.");
        if (this.client == null) {
            this.client = new KuduClient.KuduClientBuilder(this.masterAddresses).build();
        }
        this.session = this.client.newSession();
        this.session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
        this.session.setTimeoutMillis(this.timeoutMillis);
        this.session.setIgnoreAllDuplicateRows(this.ignoreDuplicateRows);
        try {
            this.table = this.client.openTable(this.tableName);
            super.start();
            this.sinkCounter.incrementConnectionCreatedCount();
            this.sinkCounter.start();
        } catch (Exception e) {
            this.sinkCounter.incrementConnectionFailedCount();
            String format = String.format("Could not open table '%s' from Kudu", this.tableName);
            logger.error(format, e);
            throw new FlumeException(format, e);
        }
    }

    public void stop() {
        try {
            if (this.client != null) {
                this.client.shutdown();
            }
            this.client = null;
            this.table = null;
            this.session = null;
            this.sinkCounter.incrementConnectionClosedCount();
            this.sinkCounter.stop();
        } catch (Exception e) {
            throw new FlumeException("Error closing client.", e);
        }
    }

    public void configure(Context context) {
        this.masterAddresses = context.getString(KuduSinkConfigurationConstants.MASTER_ADDRESSES);
        this.tableName = context.getString(KuduSinkConfigurationConstants.TABLE_NAME);
        this.batchSize = context.getLong(KuduSinkConfigurationConstants.BATCH_SIZE, DEFAULT_BATCH_SIZE).longValue();
        this.timeoutMillis = context.getLong(KuduSinkConfigurationConstants.TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS).longValue();
        this.ignoreDuplicateRows = context.getBoolean(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS, true).booleanValue();
        this.eventProducerType = context.getString(KuduSinkConfigurationConstants.PRODUCER);
        Preconditions.checkNotNull(this.masterAddresses, "Master address cannot be empty, please specify 'masterAddresses' in configuration file");
        Preconditions.checkNotNull(this.tableName, "Table name cannot be empty, please specify 'tableName' in configuration file");
        if (this.eventProducerType == null || this.eventProducerType.isEmpty()) {
            this.eventProducerType = DEFAULT_KUDU_EVENT_PRODUCER;
            logger.info("No Kudu event producer defined, will use default");
        }
        this.producerContext = new Context();
        this.producerContext.putAll(context.getSubProperties(KuduSinkConfigurationConstants.PRODUCER_PREFIX));
        try {
            this.eventProducer = (KuduEventProducer) Class.forName(this.eventProducerType).newInstance();
            this.eventProducer.configure(this.producerContext);
        } catch (Exception e) {
            logger.error("Could not instantiate Kudu event producer.", e);
            Throwables.propagate(e);
        }
        this.sinkCounter = new SinkCounter(getName());
    }

    public KuduClient getClient() {
        return this.client;
    }

    public Sink.Status process() throws EventDeliveryException {
        Event take;
        if (this.session.hasPendingOperations()) {
            return Sink.Status.BACKOFF;
        }
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        long j = 0;
        while (j < this.batchSize && (take = channel.take()) != null) {
            try {
                try {
                    this.eventProducer.initialize(take, this.table);
                    Iterator<Operation> it = this.eventProducer.getOperations().iterator();
                    while (it.hasNext()) {
                        this.session.apply(it.next());
                    }
                    j++;
                } catch (Throwable th) {
                    transaction.rollback();
                    logger.error("Failed to commit transaction. Transaction rolled back.", th);
                    if (!(th instanceof Error) && !(th instanceof RuntimeException)) {
                        logger.error("Failed to commit transaction. Transaction rolled back.", th);
                        throw new EventDeliveryException("Failed to commit transaction. Transaction rolled back.", th);
                    }
                    Throwables.propagate(th);
                    transaction.close();
                    return Sink.Status.BACKOFF;
                }
            } catch (Throwable th2) {
                transaction.close();
                throw th2;
            }
        }
        logger.debug("Flushing {} events", Long.valueOf(j));
        List<OperationResponse> flush = this.session.flush();
        if (flush != null) {
            for (OperationResponse operationResponse : flush) {
                if (operationResponse.hasRowError()) {
                    throw new EventDeliveryException("Failed to flush one or more changes. Transaction rolled back: " + operationResponse.getRowError().toString());
                }
            }
        }
        if (j == 0) {
            this.sinkCounter.incrementBatchEmptyCount();
        } else if (j == this.batchSize) {
            this.sinkCounter.incrementBatchCompleteCount();
        } else {
            this.sinkCounter.incrementBatchUnderflowCount();
        }
        transaction.commit();
        if (j == 0) {
            Sink.Status status = Sink.Status.BACKOFF;
            transaction.close();
            return status;
        }
        this.sinkCounter.addToEventDrainSuccessCount(j);
        Sink.Status status2 = Sink.Status.READY;
        transaction.close();
        return status2;
    }

    @VisibleForTesting
    @InterfaceAudience.Private
    KuduEventProducer getEventProducer() {
        return this.eventProducer;
    }
}
