package io.confluent.connect.elasticsearch.integration;

import io.confluent.connect.elasticsearch.ElasticsearchSinkConnector;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
import io.confluent.connect.elasticsearch.helper.ElasticsearchHelperClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.test.TestUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.search.SearchHit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;

/* loaded from: input_file:io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorBaseIT.class */
public class ElasticsearchConnectorBaseIT extends BaseConnectorIT {
    protected static final int NUM_RECORDS = 5;
    protected static final int TASKS_MAX = 1;
    protected static final String CONNECTOR_NAME = "es-connector";
    protected static final String TOPIC = "test";
    protected static ElasticsearchContainer container;
    protected ElasticsearchHelperClient helperClient;
    protected Map<String, String> props;

    @AfterClass
    public static void cleanupAfterAll() {
        container.close();
    }

    @Before
    public void setup() {
        startConnect();
        this.connect.kafka().createTopic(TOPIC);
        this.props = createProps();
        this.helperClient = new ElasticsearchHelperClient(container.getConnectionUrl(), new ElasticsearchSinkConnectorConfig(this.props));
    }

    @After
    public void cleanup() throws IOException {
        stopConnect();
        if (this.helperClient != null) {
            this.helperClient.deleteIndex(TOPIC);
            this.helperClient.close();
        }
    }

    protected Map<String, String> createProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", ElasticsearchSinkConnector.class.getName());
        hashMap.put("topics", TOPIC);
        hashMap.put("tasks.max", Integer.toString(TASKS_MAX));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", JsonConverter.class.getName());
        hashMap.put("value.converter.schemas.enable", "false");
        hashMap.put("connection.url", container.getConnectionUrl());
        hashMap.put("key.ignore", "true");
        hashMap.put("schema.ignore", "true");
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runSimpleTest(Map<String, String> map) throws Exception {
        this.connect.configureConnector(CONNECTOR_NAME, map);
        waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
        writeRecords(NUM_RECORDS);
        verifySearchResults(NUM_RECORDS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifySearchResults(int i) throws Exception {
        waitForRecords(i);
        Iterator it = this.helperClient.search(TOPIC).iterator();
        while (it.hasNext()) {
            SearchHit searchHit = (SearchHit) it.next();
            int intValue = ((Integer) searchHit.getSourceAsMap().get("doc_num")).intValue();
            Assert.assertNotNull(Integer.valueOf(intValue));
            Assert.assertTrue(intValue < i);
            Assert.assertEquals(TOPIC, searchHit.getIndex());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForRecords(int i) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            try {
                return this.helperClient.getDocCount(TOPIC) == ((long) i);
            } catch (ElasticsearchStatusException e) {
                if (e.getMessage().contains("index_not_found_exception")) {
                    return false;
                }
                throw e;
            }
        }, CONSUME_MAX_DURATION_MS, "Sufficient amount of document were not found in ES on time.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeRecords(int i) {
        for (int i2 = 0; i2 < i; i2 += TASKS_MAX) {
            this.connect.kafka().produce(TOPIC, String.valueOf(i2), String.format("{\"doc_num\":%d}", Integer.valueOf(i2)));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeRecordsFromStartIndex(int i, int i2) {
        for (int i3 = i; i3 < i + i2; i3 += TASKS_MAX) {
            this.connect.kafka().produce(TOPIC, String.valueOf(i3), String.format("{\"doc_num\":%d}", Integer.valueOf(i3)));
        }
    }
}
