package io.confluent.connect.elasticsearch;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.connect.elasticsearch.DataConverter;
import io.confluent.connect.elasticsearch.ElasticsearchWriter;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;

@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
/* loaded from: input_file:io/confluent/connect/elasticsearch/ElasticsearchWriterTest.class */
public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
    private final String key = "key";
    private final Schema schema = createSchema();
    private final Struct record = createRecord(this.schema);
    private final Schema otherSchema = createOtherSchema();
    private final Struct otherRecord = createOtherRecord(this.otherSchema);
    private boolean ignoreKey;
    private boolean ignoreSchema;

    @Override // io.confluent.connect.elasticsearch.ElasticsearchSinkTestBase
    @Before
    public void setUp() throws Exception {
        this.ignoreKey = false;
        this.ignoreSchema = false;
        super.setUp();
    }

    @Test
    public void testWriter() throws Exception {
        writeDataAndRefresh(initWriter(this.client), prepareData(2));
        verifySearchResults(Collections.singletonList(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, this.record, 1L)));
    }

    @Test
    public void testWriterIgnoreKey() throws Exception {
        this.ignoreKey = true;
        Collection<SinkRecord> prepareData = prepareData(2);
        writeDataAndRefresh(initWriter(this.client), prepareData);
        verifySearchResults(prepareData);
    }

    @Test
    public void testWriterIgnoreSchema() throws Exception {
        this.ignoreKey = true;
        this.ignoreSchema = true;
        Collection<SinkRecord> prepareData = prepareData(2);
        writeDataAndRefresh(initWriter(this.client), prepareData);
        verifySearchResults(prepareData);
    }

    @Test
    public void testTopicIndexOverride() throws Exception {
        this.ignoreKey = true;
        this.ignoreSchema = true;
        Collection<SinkRecord> prepareData = prepareData(2);
        writeDataAndRefresh(initWriter(this.client, Collections.emptySet(), Collections.emptySet(), Collections.singletonMap("topic", "index"), false, DataConverter.BehaviorOnNullValues.IGNORE), prepareData);
        verifySearchResults(prepareData, "index");
    }

    @Test
    public void testIncompatible() throws Exception {
        this.ignoreKey = true;
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.otherSchema, this.otherRecord, 0L));
        ElasticsearchWriter initWriter = initWriter(this.client);
        initWriter.write(arrayList);
        Thread.sleep(5000L);
        arrayList.clear();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, this.record, 1L));
        initWriter.write(arrayList);
        try {
            initWriter.flush();
            fail("should fail because of mapper_parsing_exception");
        } catch (ConnectException e) {
        }
    }

    @Test
    public void testCompatible() throws Exception {
        this.ignoreKey = true;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        SinkRecord sinkRecord = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, this.record, 0L);
        arrayList.add(sinkRecord);
        arrayList2.add(sinkRecord);
        SinkRecord sinkRecord2 = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, this.record, 1L);
        arrayList.add(sinkRecord2);
        arrayList2.add(sinkRecord2);
        ElasticsearchWriter initWriter = initWriter(this.client);
        initWriter.write(arrayList);
        arrayList.clear();
        SinkRecord sinkRecord3 = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.otherSchema, this.otherRecord, 2L);
        arrayList.add(sinkRecord3);
        arrayList2.add(sinkRecord3);
        SinkRecord sinkRecord4 = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.otherSchema, this.otherRecord, 3L);
        arrayList.add(sinkRecord4);
        arrayList2.add(sinkRecord4);
        writeDataAndRefresh(initWriter, arrayList);
        verifySearchResults(arrayList2);
    }

    @Test
    public void testSafeRedeliveryRegularKey() throws Exception {
        Struct struct = new Struct(this.schema);
        struct.put("user", "foo");
        struct.put("message", "hi");
        SinkRecord sinkRecord = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, struct, 0L);
        Struct struct2 = new Struct(this.schema);
        struct2.put("user", "foo");
        struct2.put("message", "bye");
        SinkRecord sinkRecord2 = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, struct2, 1L);
        ElasticsearchWriter initWriter = initWriter(this.client);
        initWriter.write(Arrays.asList(sinkRecord, sinkRecord2));
        initWriter.flush();
        writeDataAndRefresh(initWriter, Collections.singleton(sinkRecord));
        verifySearchResults(Collections.singleton(sinkRecord2));
    }

    @Test
    public void testSafeRedeliveryOffsetInKey() throws Exception {
        this.ignoreKey = true;
        Struct struct = new Struct(this.schema);
        struct.put("user", "foo");
        struct.put("message", "hi");
        SinkRecord sinkRecord = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, struct, 0L);
        Struct struct2 = new Struct(this.schema);
        struct2.put("user", "foo");
        struct2.put("message", "bye");
        List asList = Arrays.asList(sinkRecord, new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, struct2, 1L));
        ElasticsearchWriter initWriter = initWriter(this.client);
        initWriter.write(asList);
        initWriter.flush();
        writeDataAndRefresh(initWriter, asList);
        verifySearchResults(asList);
    }

    @Test
    public void testMap() throws Exception {
        Schema build = SchemaBuilder.struct().name("struct").field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build()).build();
        HashMap hashMap = new HashMap();
        hashMap.put(1, "One");
        hashMap.put(2, "Two");
        Struct struct = new Struct(build);
        struct.put("map", hashMap);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", build, struct, 0L));
        writeDataAndRefresh(initWriter(this.client), arrayList);
        verifySearchResults(arrayList);
    }

    @Test
    public void testStringKeyedMap() throws Exception {
        Schema build = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build();
        HashMap hashMap = new HashMap();
        hashMap.put("One", 1);
        hashMap.put("Two", 2);
        writeDataAndRefresh(initWriter(this.client), Collections.singletonList(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", build, hashMap, 0L)));
        verifySearchResults(Collections.singletonList(new ObjectMapper().writeValueAsString(hashMap)), "topic");
    }

    @Test
    public void testDecimal() throws Exception {
        BigDecimal bigDecimal = new BigDecimal(new BigInteger(ByteBuffer.allocate(4).putInt(2).array()), 2);
        Schema build = SchemaBuilder.struct().name("struct").field("decimal", Decimal.schema(2)).build();
        Struct struct = new Struct(build);
        struct.put("decimal", bigDecimal);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", build, struct, 0L));
        writeDataAndRefresh(initWriter(this.client), arrayList);
        verifySearchResults(arrayList);
    }

    @Test
    public void testBytes() throws Exception {
        Schema build = SchemaBuilder.struct().name("struct").field("bytes", SchemaBuilder.BYTES_SCHEMA).build();
        Struct struct = new Struct(build);
        struct.put("bytes", new byte[]{42});
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", build, struct, 0L));
        writeDataAndRefresh(initWriter(this.client), arrayList);
        verifySearchResults(arrayList);
    }

    @Test
    public void testIgnoreNullValue() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, (Object) null, 0L));
        writeDataAndRefresh(initWriter(this.client, DataConverter.BehaviorOnNullValues.IGNORE), arrayList);
        verifySearchResults(new ArrayList());
    }

    @Test
    public void testDeleteOnNullValue() throws Exception {
        ElasticsearchWriter initWriter = initWriter(this.client, DataConverter.BehaviorOnNullValues.DELETE);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key1", this.schema, this.record, 0L));
        SinkRecord sinkRecord = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key2", this.otherSchema, this.otherRecord, 1L);
        arrayList.add(sinkRecord);
        initWriter.write(arrayList);
        initWriter.flush();
        refresh();
        verifySearchResults(arrayList);
        SinkRecord sinkRecord2 = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key1", this.schema, (Object) null, 2L);
        arrayList.clear();
        arrayList.add(sinkRecord2);
        writeDataAndRefresh(initWriter, arrayList);
        arrayList.clear();
        arrayList.add(sinkRecord);
        verifySearchResults(arrayList);
    }

    @Test
    public void testIneffectiveDelete() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, (Object) null, 0L));
        writeDataAndRefresh(initWriter(this.client, DataConverter.BehaviorOnNullValues.DELETE), arrayList);
        verifySearchResults(new ArrayList());
    }

    @Test
    public void testDeleteWithNullKey() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, (Object) null, this.schema, (Object) null, 0L));
        writeDataAndRefresh(initWriter(this.client, DataConverter.BehaviorOnNullValues.DELETE), arrayList);
        verifySearchResults(new ArrayList());
    }

    @Test
    public void testFailOnNullValue() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, (Object) null, 0L));
        try {
            writeDataAndRefresh(initWriter(this.client, DataConverter.BehaviorOnNullValues.FAIL), arrayList);
            fail("should fail because of behavior.on.null.values=fail");
        } catch (DataException e) {
        }
    }

    @Test
    public void testInvalidRecordException() throws Exception {
        this.ignoreSchema = true;
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, (Object) null, (Schema) null, new byte[]{42}, 0L));
        ElasticsearchWriter initWriter = initWriter(this.client);
        assertEquals("Key is used as document id and can not be null.", ((Exception) assertThrows(ConnectException.class, () -> {
            initWriter.write(arrayList);
        })).getMessage());
    }

    @Test
    public void testDropInvalidRecord() throws Exception {
        this.ignoreSchema = true;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Schema build = SchemaBuilder.struct().name("struct").field("bytes", SchemaBuilder.BYTES_SCHEMA).build();
        Struct struct = new Struct(build);
        struct.put("bytes", new byte[]{42});
        SinkRecord sinkRecord = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, (Object) null, build, struct, 0L);
        SinkRecord sinkRecord2 = new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", build, struct, 1L);
        arrayList.add(sinkRecord2);
        arrayList.add(sinkRecord);
        arrayList2.add(sinkRecord2);
        writeDataAndRefresh(initWriter(this.client, true), arrayList);
        verifySearchResults(arrayList2, this.ignoreKey, this.ignoreSchema);
    }

    @Test
    public void testDropInvalidRecordThrowsOnOtherErrors() throws Exception {
        this.ignoreSchema = true;
        ArrayList arrayList = new ArrayList();
        Schema build = SchemaBuilder.struct().name("struct").field("bytes", SchemaBuilder.BYTES_SCHEMA).build();
        Struct struct = new Struct(build);
        struct.put("bytes", new byte[]{42});
        arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", build, struct, 1L));
        ElasticsearchWriter initWriter = initWriter(this.client, true);
        initWriter.stop();
        MatcherAssert.assertThat(assertThrows(ConnectException.class, () -> {
            initWriter.write(arrayList);
        }).getMessage(), Matchers.containsString("Stopping"));
    }

    private Collection<SinkRecord> prepareData(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new SinkRecord("topic", 12, Schema.STRING_SCHEMA, "key", this.schema, this.record, i2));
        }
        return arrayList;
    }

    private ElasticsearchWriter initWriter(ElasticsearchClient elasticsearchClient) {
        return initWriter(elasticsearchClient, false, DataConverter.BehaviorOnNullValues.IGNORE);
    }

    private ElasticsearchWriter initWriter(ElasticsearchClient elasticsearchClient, boolean z) {
        return initWriter(elasticsearchClient, z, DataConverter.BehaviorOnNullValues.IGNORE);
    }

    private ElasticsearchWriter initWriter(ElasticsearchClient elasticsearchClient, DataConverter.BehaviorOnNullValues behaviorOnNullValues) {
        return initWriter(elasticsearchClient, false, behaviorOnNullValues);
    }

    private ElasticsearchWriter initWriter(ElasticsearchClient elasticsearchClient, boolean z, DataConverter.BehaviorOnNullValues behaviorOnNullValues) {
        return initWriter(elasticsearchClient, Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), z, behaviorOnNullValues);
    }

    private ElasticsearchWriter initWriter(ElasticsearchClient elasticsearchClient, Set<String> set, Set<String> set2, Map<String, String> map, boolean z, DataConverter.BehaviorOnNullValues behaviorOnNullValues) {
        ElasticsearchWriter build = new ElasticsearchWriter.Builder(elasticsearchClient).setType("kafka-connect").setIgnoreKey(this.ignoreKey, set).setIgnoreSchema(this.ignoreSchema, set2).setTopicToIndexMap(map).setFlushTimoutMs(10000L).setMaxBufferedRecords(10000).setMaxInFlightRequests(1).setBatchSize(2).setLingerMs(1000L).setRetryBackoffMs(1000L).setMaxRetry(3).setDropInvalidMessage(z).setBehaviorOnNullValues(behaviorOnNullValues).build();
        build.start();
        build.createIndicesForTopics(Collections.singleton("topic"));
        return build;
    }

    private void writeDataAndRefresh(ElasticsearchWriter elasticsearchWriter, Collection<SinkRecord> collection) throws Exception {
        elasticsearchWriter.write(collection);
        elasticsearchWriter.flush();
        elasticsearchWriter.stop();
        refresh();
    }

    private void verifySearchResults(Collection<SinkRecord> collection) throws Exception {
        verifySearchResults(collection, this.ignoreKey, this.ignoreSchema);
    }

    private void verifySearchResults(Collection<?> collection, String str) throws Exception {
        verifySearchResults(collection, str, this.ignoreKey, this.ignoreSchema);
    }
}
