package io.confluent.connect.elasticsearch.bulk;

import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import io.confluent.connect.elasticsearch.LogContext;
import io.confluent.connect.elasticsearch.RetryUtil;
import java.lang.Thread;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/elasticsearch/bulk/BulkProcessor.class */
public class BulkProcessor<R, B> {
    private static final Logger log;
    private static final AtomicLong BATCH_ID_GEN;
    private final Time time;
    private final BulkClient<R, B> bulkClient;
    private final int maxBufferedRecords;
    private final int batchSize;
    private final long lingerMs;
    private final int maxRetries;
    private final long retryBackoffMs;
    private final BehaviorOnMalformedDoc behaviorOnMalformedDoc;
    private final Thread farmer;
    private final ExecutorService executor;
    private final Deque<R> unsentRecords;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile boolean stopRequested = false;
    private volatile boolean flushRequested = false;
    private final AtomicReference<ConnectException> error = new AtomicReference<>();
    private int inFlightRecords = 0;
    private final LogContext logContext = new LogContext();

    /* loaded from: input_file:io/confluent/connect/elasticsearch/bulk/BulkProcessor$BehaviorOnMalformedDoc.class */
    public enum BehaviorOnMalformedDoc {
        IGNORE,
        WARN,
        FAIL;

        public static final BehaviorOnMalformedDoc DEFAULT = FAIL;
        public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() { // from class: io.confluent.connect.elasticsearch.bulk.BulkProcessor.BehaviorOnMalformedDoc.1
            private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(BehaviorOnMalformedDoc.names());

            public void ensureValid(String str, Object obj) {
                if (obj instanceof String) {
                    obj = ((String) obj).toLowerCase(Locale.ROOT);
                }
                this.validator.ensureValid(str, obj);
            }

            public String toString() {
                return this.validator.toString();
            }
        };

        public static String[] names() {
            BehaviorOnMalformedDoc[] values = values();
            String[] strArr = new String[values.length];
            for (int i = 0; i < values.length; i++) {
                strArr[i] = values[i].toString();
            }
            return strArr;
        }

        public static BehaviorOnMalformedDoc forValue(String str) {
            return valueOf(str.toUpperCase(Locale.ROOT));
        }

        @Override // java.lang.Enum
        public String toString() {
            return name().toLowerCase(Locale.ROOT);
        }
    }

    /* loaded from: input_file:io/confluent/connect/elasticsearch/bulk/BulkProcessor$BulkProcessorThread.class */
    private static final class BulkProcessorThread extends Thread {
        private final LogContext parentContext;
        private final int threadId;

        public BulkProcessorThread(LogContext logContext, Runnable runnable, int i, int i2) {
            super(runnable, String.format("BulkProcessor@%d-%d", Integer.valueOf(i), Integer.valueOf(i2)));
            this.parentContext = logContext;
            this.threadId = i2;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            LogContext create = this.parentContext.create("Thread" + this.threadId);
            Throwable th = null;
            try {
                super.run();
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/connect/elasticsearch/bulk/BulkProcessor$BulkTask.class */
    public final class BulkTask implements Callable<BulkResponse> {
        final long batchId = BulkProcessor.BATCH_ID_GEN.incrementAndGet();
        final List<R> batch;

        BulkTask(List<R> list) {
            this.batch = list;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public BulkResponse call() throws Exception {
            try {
                BulkResponse execute = execute();
                BulkProcessor.this.onBatchCompletion(this.batch.size());
                return execute;
            } catch (Exception e) {
                BulkProcessor.this.failAndStop(e);
                throw e;
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private BulkResponse execute() throws Exception {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                Object bulkRequest = BulkProcessor.this.bulkClient.bulkRequest(this.batch);
                int i = BulkProcessor.this.maxRetries + 1;
                int i2 = 1;
                int i3 = 0;
                while (true) {
                    try {
                        BulkProcessor.log.trace("Executing batch {} of {} records with attempt {}/{}", new Object[]{Long.valueOf(this.batchId), Integer.valueOf(this.batch.size()), Integer.valueOf(i2), Integer.valueOf(i)});
                        BulkResponse execute = BulkProcessor.this.bulkClient.execute(bulkRequest);
                        if (execute.isSucceeded()) {
                            if (BulkProcessor.log.isDebugEnabled()) {
                                BulkProcessor.log.debug("Completed batch {} of {} records with attempt {}/{} in {} ms", new Object[]{Long.valueOf(this.batchId), Integer.valueOf(this.batch.size()), Integer.valueOf(i2), Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                            }
                            return execute;
                        }
                        if (!BulkProcessor.this.responseContainsMalformedDocError(execute)) {
                            execute.isRetriable();
                            throw new ConnectException("Bulk request failed: " + execute.getErrorInfo());
                        }
                        execute.isRetriable();
                        handleMalformedDoc(execute);
                        return execute;
                    } catch (Exception e) {
                        if (1 == 0 || i2 >= i) {
                            BulkProcessor.log.error("Failed to execute batch {} of {} records after total of {} attempt(s)", new Object[]{Long.valueOf(this.batchId), Integer.valueOf(this.batch.size()), Integer.valueOf(i2), e});
                            throw e;
                        }
                        long computeRandomRetryWaitTimeInMillis = RetryUtil.computeRandomRetryWaitTimeInMillis(i3, BulkProcessor.this.retryBackoffMs);
                        BulkProcessor.log.warn("Failed to execute batch {} of {} records with attempt {}/{}, will attempt retry after {} ms. Failure reason: {}", new Object[]{Long.valueOf(this.batchId), Integer.valueOf(this.batch.size()), Integer.valueOf(i2), Integer.valueOf(i), Long.valueOf(computeRandomRetryWaitTimeInMillis), e.getMessage()});
                        BulkProcessor.this.time.sleep(computeRandomRetryWaitTimeInMillis);
                        if (Thread.interrupted()) {
                            BulkProcessor.log.error("Retrying batch {} of {} records interrupted after attempt {}/{}", new Object[]{Long.valueOf(this.batchId), Integer.valueOf(this.batch.size()), Integer.valueOf(i2), Integer.valueOf(i), e});
                            throw e;
                        }
                        i2++;
                        i3++;
                    }
                }
            } catch (Exception e2) {
                BulkProcessor.log.error("Failed to create bulk request from batch {} of {} records", new Object[]{Long.valueOf(this.batchId), Integer.valueOf(this.batch.size()), e2});
                throw e2;
            }
        }

        private void handleMalformedDoc(BulkResponse bulkResponse) {
            switch (BulkProcessor.this.behaviorOnMalformedDoc) {
                case IGNORE:
                    BulkProcessor.log.debug("Encountered an illegal document error when executing batch {} of {} records. Ignoring and will not index record. Error was {}", new Object[]{Long.valueOf(this.batchId), Integer.valueOf(this.batch.size()), bulkResponse.getErrorInfo()});
                    return;
                case WARN:
                    BulkProcessor.log.warn("Encountered an illegal document error when executing batch {} of {} records. Ignoring and will not index record. Error was {}", new Object[]{Long.valueOf(this.batchId), Integer.valueOf(this.batch.size()), bulkResponse.getErrorInfo()});
                    return;
                case FAIL:
                    BulkProcessor.log.error("Encountered an illegal document error when executing batch {} of {} records. Error was {} (to ignore future records like this change the configuration property '{}' from '{}' to '{}').", new Object[]{Long.valueOf(this.batchId), Integer.valueOf(this.batch.size()), bulkResponse.getErrorInfo(), ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, BehaviorOnMalformedDoc.FAIL, BehaviorOnMalformedDoc.IGNORE});
                    throw new ConnectException("Bulk request failed: " + bulkResponse.getErrorInfo());
                default:
                    throw new RuntimeException(String.format("Unknown value for %s enum: %s", BehaviorOnMalformedDoc.class.getSimpleName(), BulkProcessor.this.behaviorOnMalformedDoc));
            }
        }
    }

    public BulkProcessor(Time time, BulkClient<R, B> bulkClient, int i, int i2, int i3, long j, int i4, long j2, BehaviorOnMalformedDoc behaviorOnMalformedDoc) {
        this.time = time;
        this.bulkClient = bulkClient;
        this.maxBufferedRecords = i;
        this.batchSize = i3;
        this.lingerMs = j;
        this.maxRetries = i4;
        this.retryBackoffMs = j2;
        this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;
        this.unsentRecords = new ArrayDeque(i);
        ThreadFactory makeThreadFactory = makeThreadFactory();
        this.farmer = makeThreadFactory.newThread(farmerTask());
        this.executor = Executors.newFixedThreadPool(i2, makeThreadFactory);
    }

    private ThreadFactory makeThreadFactory() {
        final AtomicInteger atomicInteger = new AtomicInteger();
        final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() { // from class: io.confluent.connect.elasticsearch.bulk.BulkProcessor.1
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                BulkProcessor.log.error("Uncaught exception in BulkProcessor thread {}", thread, th);
                BulkProcessor.this.failAndStop(th);
            }
        };
        return new ThreadFactory() { // from class: io.confluent.connect.elasticsearch.bulk.BulkProcessor.2
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                int andIncrement = atomicInteger.getAndIncrement();
                BulkProcessorThread bulkProcessorThread = new BulkProcessorThread(BulkProcessor.this.logContext, runnable, System.identityHashCode(this), andIncrement);
                bulkProcessorThread.setDaemon(true);
                bulkProcessorThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
                return bulkProcessorThread;
            }
        };
    }

    Runnable farmerTask() {
        return () -> {
            LogContext create = this.logContext.create("Farmer1");
            Throwable th = null;
            try {
                log.debug("Starting farmer task");
                try {
                    ArrayList<Future> arrayList = new ArrayList();
                    while (!this.stopRequested) {
                        arrayList.add(submitBatchWhenReady());
                        ArrayList arrayList2 = new ArrayList();
                        for (Future future : arrayList) {
                            if (future.isDone()) {
                                log.debug("Bulk request completed with status {}", (BulkResponse) future.get());
                            } else {
                                arrayList2.add(future);
                            }
                        }
                        log.debug("Processing next batch with {} outstanding batch requests in flight", Integer.valueOf(arrayList2.size()));
                        arrayList = arrayList2;
                    }
                    log.debug("Finished farmer task");
                    if (create != null) {
                        if (0 == 0) {
                            create.close();
                            return;
                        }
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (InterruptedException | ExecutionException e) {
                    throw new ConnectException(e);
                }
            } catch (Throwable th3) {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        };
    }

    synchronized Future<BulkResponse> submitBatchWhenReady() throws InterruptedException {
        long milliseconds = this.time.milliseconds();
        long j = 0;
        while (true) {
            long j2 = j;
            if (this.stopRequested || canSubmit(j2)) {
                break;
            }
            wait(Math.max(0L, this.lingerMs - j2));
            j = this.time.milliseconds() - milliseconds;
        }
        return this.stopRequested ? CompletableFuture.completedFuture(BulkResponse.failure(false, "request not submitted during shutdown")) : submitBatch();
    }

    private synchronized Future<BulkResponse> submitBatch() {
        int size = this.unsentRecords.size();
        if (!$assertionsDisabled && size <= 0) {
            throw new AssertionError();
        }
        int min = Math.min(this.batchSize, size);
        ArrayList arrayList = new ArrayList(min);
        for (int i = 0; i < min; i++) {
            arrayList.add(this.unsentRecords.removeFirst());
        }
        this.inFlightRecords += min;
        log.debug("Submitting batch of {} records; {} unsent and {} total in-flight records", new Object[]{Integer.valueOf(min), Integer.valueOf(size), Integer.valueOf(this.inFlightRecords)});
        return this.executor.submit(new BulkTask(arrayList));
    }

    private synchronized boolean canSubmit(long j) {
        return !this.unsentRecords.isEmpty() && (this.flushRequested || j >= this.lingerMs || this.unsentRecords.size() >= this.batchSize);
    }

    public void start() {
        this.farmer.start();
    }

    public void stop() {
        log.debug("stop");
        this.stopRequested = true;
        synchronized (this) {
            this.executor.shutdown();
            notifyAll();
        }
    }

    public void awaitStop(long j) {
        if (!$assertionsDisabled && !this.stopRequested) {
            throw new AssertionError();
        }
        try {
            try {
                if (this.executor.awaitTermination(j, TimeUnit.MILLISECONDS)) {
                } else {
                    throw new ConnectException("Timed-out waiting for executor termination");
                }
            } catch (InterruptedException e) {
                throw new ConnectException(e);
            }
        } finally {
            this.executor.shutdownNow();
        }
    }

    public boolean isStopping() {
        return this.stopRequested;
    }

    public boolean isFailed() {
        return this.error.get() != null;
    }

    public boolean isTerminal() {
        return isStopping() || isFailed();
    }

    public void throwIfStopping() {
        if (this.stopRequested) {
            throw new ConnectException("Stopping");
        }
    }

    public void throwIfFailed() {
        if (isFailed()) {
            throw this.error.get();
        }
    }

    public void throwIfTerminal() {
        throwIfFailed();
        throwIfStopping();
    }

    public synchronized void add(R r, long j) {
        throwIfTerminal();
        int bufferedRecords = bufferedRecords();
        if (bufferedRecords >= this.maxBufferedRecords) {
            log.trace("Buffer full at {} records, so waiting up to {} ms before adding", Integer.valueOf(bufferedRecords), Long.valueOf(j));
            long milliseconds = this.time.milliseconds();
            long milliseconds2 = this.time.milliseconds();
            while (true) {
                long j2 = milliseconds2 - milliseconds;
                if (isTerminal() || j2 >= j || bufferedRecords() < this.maxBufferedRecords) {
                    break;
                }
                try {
                    wait(j - j2);
                    milliseconds2 = this.time.milliseconds();
                } catch (InterruptedException e) {
                    throw new ConnectException(e);
                }
            }
            throwIfTerminal();
            if (bufferedRecords() >= this.maxBufferedRecords) {
                throw new ConnectException("Add timeout expired before buffer availability");
            }
            log.debug("Adding record to queue after waiting {} ms", Long.valueOf(this.time.milliseconds() - milliseconds));
        } else {
            log.trace("Adding record to queue");
        }
        this.unsentRecords.addLast(r);
        notifyAll();
    }

    public void flush(long j) {
        long milliseconds = this.time.milliseconds();
        try {
            try {
                this.flushRequested = true;
                synchronized (this) {
                    notifyAll();
                    long milliseconds2 = this.time.milliseconds() - milliseconds;
                    while (!isTerminal() && milliseconds2 < j && bufferedRecords() > 0) {
                        wait(j - milliseconds2);
                        milliseconds2 = this.time.milliseconds() - milliseconds;
                    }
                    throwIfTerminal();
                    if (bufferedRecords() > 0) {
                        throw new ConnectException("Flush timeout expired with unflushed records: " + bufferedRecords());
                    }
                }
                log.debug("Flushed bulk processor (total time={} ms)", Long.valueOf(this.time.milliseconds() - milliseconds));
            } catch (InterruptedException e) {
                throw new ConnectException(e);
            }
        } finally {
            this.flushRequested = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean responseContainsMalformedDocError(BulkResponse bulkResponse) {
        return bulkResponse.getErrorInfo().contains("mapper_parsing_exception") || bulkResponse.getErrorInfo().contains("illegal_argument_exception");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void onBatchCompletion(int i) {
        this.inFlightRecords -= i;
        if (!$assertionsDisabled && this.inFlightRecords < 0) {
            throw new AssertionError();
        }
        notifyAll();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void failAndStop(Throwable th) {
        this.error.compareAndSet(null, toConnectException(th));
        stop();
    }

    public synchronized int bufferedRecords() {
        return this.unsentRecords.size() + this.inFlightRecords;
    }

    private static ConnectException toConnectException(Throwable th) {
        return th instanceof ConnectException ? (ConnectException) th : new ConnectException(th);
    }

    static {
        $assertionsDisabled = !BulkProcessor.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(BulkProcessor.class);
        BATCH_ID_GEN = new AtomicLong();
    }
}
