package io.confluent.connect.elasticsearch.bulk;

import io.confluent.connect.elasticsearch.bulk.BulkProcessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/confluent/connect/elasticsearch/bulk/BulkProcessorTest.class */
public class BulkProcessorTest {
    Client client;

    /* loaded from: input_file:io/confluent/connect/elasticsearch/bulk/BulkProcessorTest$Client.class */
    private static final class Client implements BulkClient<Integer, List<Integer>> {
        private final Queue<Expectation> expectQ;
        private volatile boolean executeMetExpectations;

        private Client() {
            this.expectQ = new LinkedList();
            this.executeMetExpectations = true;
        }

        public List<Integer> bulkRequest(List<Integer> list) {
            ArrayList arrayList = new ArrayList(list.size());
            Iterator<Integer> it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            return arrayList;
        }

        public void expect(List<Integer> list, BulkResponse bulkResponse) {
            this.expectQ.add(new Expectation(list, bulkResponse));
        }

        public boolean expectationsMet() {
            return this.expectQ.isEmpty() && this.executeMetExpectations;
        }

        public BulkResponse execute(List<Integer> list) throws IOException {
            try {
                Expectation remove = this.expectQ.remove();
                Assert.assertEquals(remove.request, list);
                this.executeMetExpectations &= true;
                return remove.response;
            } catch (Throwable th) {
                this.executeMetExpectations = false;
                throw th;
            }
        }

        /* renamed from: bulkRequest, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m4bulkRequest(List list) {
            return bulkRequest((List<Integer>) list);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/connect/elasticsearch/bulk/BulkProcessorTest$Expectation.class */
    public static class Expectation {
        final List<Integer> request;
        final BulkResponse response;

        private Expectation(List<Integer> list, BulkResponse bulkResponse) {
            this.request = list;
            this.response = bulkResponse;
        }
    }

    @Before
    public void createClient() {
        this.client = new Client();
    }

    @After
    public void checkClient() {
        Assert.assertTrue(this.client.expectationsMet());
    }

    @Test
    public void batchingAndLingering() throws InterruptedException, ExecutionException {
        BulkProcessor bulkProcessor = new BulkProcessor(Time.SYSTEM, this.client, 100, 5, 5, 5L, 0, 0L, BulkProcessor.BehaviorOnMalformedDoc.DEFAULT);
        bulkProcessor.add(1, 10L);
        bulkProcessor.add(2, 10L);
        bulkProcessor.add(3, 10L);
        bulkProcessor.add(4, 10L);
        bulkProcessor.add(5, 10L);
        bulkProcessor.add(6, 10L);
        bulkProcessor.add(7, 10L);
        bulkProcessor.add(8, 10L);
        bulkProcessor.add(9, 10L);
        bulkProcessor.add(10, 10L);
        bulkProcessor.add(11, 10L);
        bulkProcessor.add(12, 10L);
        this.client.expect(Arrays.asList(1, 2, 3, 4, 5), BulkResponse.success());
        this.client.expect(Arrays.asList(6, 7, 8, 9, 10), BulkResponse.success());
        this.client.expect(Arrays.asList(11, 12), BulkResponse.success());
        Assert.assertTrue(((BulkResponse) bulkProcessor.submitBatchWhenReady().get()).succeeded);
        Assert.assertTrue(((BulkResponse) bulkProcessor.submitBatchWhenReady().get()).succeeded);
        Assert.assertTrue(((BulkResponse) bulkProcessor.submitBatchWhenReady().get()).succeeded);
    }

    @Test
    public void flushing() {
        BulkProcessor bulkProcessor = new BulkProcessor(Time.SYSTEM, this.client, 100, 5, 5, 100000L, 0, 0L, BulkProcessor.BehaviorOnMalformedDoc.DEFAULT);
        this.client.expect(Arrays.asList(1, 2, 3), BulkResponse.success());
        bulkProcessor.start();
        bulkProcessor.add(1, 10L);
        bulkProcessor.add(2, 10L);
        bulkProcessor.add(3, 10L);
        Assert.assertFalse(this.client.expectationsMet());
        bulkProcessor.flush(100L);
    }

    @Test
    public void addBlocksWhenBufferFull() {
        BulkProcessor bulkProcessor = new BulkProcessor(Time.SYSTEM, this.client, 1, 1, 1, 10L, 0, 0L, BulkProcessor.BehaviorOnMalformedDoc.DEFAULT);
        bulkProcessor.add(42, 10L);
        Assert.assertEquals(1L, bulkProcessor.bufferedRecords());
        try {
            bulkProcessor.add(43, 10L);
            Assert.fail();
        } catch (ConnectException e) {
        }
    }

    @Test
    public void retriableErrors() throws InterruptedException, ExecutionException {
        BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc = BulkProcessor.BehaviorOnMalformedDoc.DEFAULT;
        this.client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retiable error"));
        this.client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retriable error again"));
        this.client.expect(Arrays.asList(42, 43), BulkResponse.success());
        BulkProcessor bulkProcessor = new BulkProcessor(Time.SYSTEM, this.client, 100, 5, 2, 5L, 3, 1L, behaviorOnMalformedDoc);
        bulkProcessor.add(42, 10L);
        bulkProcessor.add(43, 10L);
        Assert.assertTrue(((BulkResponse) bulkProcessor.submitBatchWhenReady().get()).succeeded);
    }

    @Test
    public void retriableErrorsHitMaxRetries() throws InterruptedException, ExecutionException {
        BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc = BulkProcessor.BehaviorOnMalformedDoc.DEFAULT;
        this.client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retiable error"));
        this.client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retriable error again"));
        this.client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a final retriable error again"));
        BulkProcessor bulkProcessor = new BulkProcessor(Time.SYSTEM, this.client, 100, 5, 2, 5L, 2, 1L, behaviorOnMalformedDoc);
        bulkProcessor.add(42, 10L);
        bulkProcessor.add(43, 10L);
        try {
            bulkProcessor.submitBatchWhenReady().get();
            Assert.fail();
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause().getMessage().contains("a final retriable error again"));
        }
    }

    @Test
    public void unretriableErrors() throws InterruptedException {
        BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc = BulkProcessor.BehaviorOnMalformedDoc.DEFAULT;
        this.client.expect(Arrays.asList(42, 43), BulkResponse.failure(false, "an unretriable error"));
        BulkProcessor bulkProcessor = new BulkProcessor(Time.SYSTEM, this.client, 100, 5, 2, 5L, 3, 1L, behaviorOnMalformedDoc);
        bulkProcessor.add(42, 10L);
        bulkProcessor.add(43, 10L);
        try {
            bulkProcessor.submitBatchWhenReady().get();
            Assert.fail();
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause().getMessage().contains("an unretriable error"));
        }
    }

    @Test
    public void failOnMalformedDoc() throws InterruptedException {
        BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc = BulkProcessor.BehaviorOnMalformedDoc.FAIL;
        this.client.expect(Arrays.asList(42, 43), BulkResponse.failure(false, " [{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse\",\"caused_by\":{\"type\":\"illegal_argument_exception\",\"reason\":\"object\n field starting or ending with a [.] makes object resolution ambiguous: [avjpz{{.}}wjzse{{..}}gal9d]\"}}]"));
        BulkProcessor bulkProcessor = new BulkProcessor(Time.SYSTEM, this.client, 100, 5, 2, 5L, 3, 1L, behaviorOnMalformedDoc);
        bulkProcessor.start();
        bulkProcessor.add(42, 1L);
        bulkProcessor.add(43, 1L);
        try {
            bulkProcessor.flush(1000L);
            Assert.fail();
        } catch (ConnectException e) {
            Assert.assertTrue(e.getMessage().contains("mapper_parsing_exception"));
        }
    }

    @Test
    public void ignoreOrWarnOnMalformedDoc() throws InterruptedException {
        for (BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc : Arrays.asList(BulkProcessor.BehaviorOnMalformedDoc.WARN, BulkProcessor.BehaviorOnMalformedDoc.IGNORE)) {
            this.client.expect(Arrays.asList(42, 43), BulkResponse.failure(false, " [{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse\",\"caused_by\":{\"type\":\"illegal_argument_exception\",\"reason\":\"object\n field starting or ending with a [.] makes object resolution ambiguous: [avjpz{{.}}wjzse{{..}}gal9d]\"}}]"));
            BulkProcessor bulkProcessor = new BulkProcessor(Time.SYSTEM, this.client, 100, 5, 2, 5L, 3, 1L, behaviorOnMalformedDoc);
            bulkProcessor.start();
            bulkProcessor.add(42, 1L);
            bulkProcessor.add(43, 1L);
            try {
                bulkProcessor.flush(1000L);
            } catch (ConnectException e) {
                Assert.fail(e.getMessage());
            }
        }
    }

    @Test
    public void farmerTaskPropogatesException() {
        BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc = BulkProcessor.BehaviorOnMalformedDoc.DEFAULT;
        this.client.expect(Arrays.asList(42, 43), BulkResponse.failure(false, "an unretriable error"));
        BulkProcessor bulkProcessor = new BulkProcessor(Time.SYSTEM, this.client, 100, 5, 2, 5L, 3, 1L, behaviorOnMalformedDoc);
        bulkProcessor.add(42, 10L);
        bulkProcessor.add(43, 10L);
        Runnable farmerTask = bulkProcessor.farmerTask();
        MatcherAssert.assertThat(Assert.assertThrows(ConnectException.class, () -> {
            farmerTask.run();
            bulkProcessor.throwIfFailed();
        }).getMessage(), Matchers.containsString("an unretriable error"));
    }

    @Test
    public void terminateRetriesWhenInterruptedInSleep() throws Exception {
        BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc = BulkProcessor.BehaviorOnMalformedDoc.DEFAULT;
        Time time = (Time) Mockito.mock(Time.class);
        ((Time) Mockito.doAnswer(invocationOnMock -> {
            Thread.currentThread().interrupt();
            return null;
        }).when(time)).sleep(ArgumentMatchers.anyLong());
        this.client.expect(Arrays.asList(42, 43), BulkResponse.failure(true, "a retriable error"));
        BulkProcessor bulkProcessor = new BulkProcessor(time, this.client, 100, 5, 2, 5L, 3, 1L, behaviorOnMalformedDoc);
        bulkProcessor.add(42, 10L);
        bulkProcessor.add(43, 10L);
        MatcherAssert.assertThat(((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        })).getMessage(), Matchers.containsString("a retriable error"));
    }
}
