package io.confluent.connect.elasticsearch.integration;

import io.confluent.connect.elasticsearch.ElasticsearchSinkConnector;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
import io.confluent.connect.elasticsearch.helper.ElasticsearchHelperClient;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.test.TestUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.security.user.User;
import org.elasticsearch.client.security.user.privileges.IndicesPrivileges;
import org.elasticsearch.client.security.user.privileges.Role;
import org.elasticsearch.search.SearchHit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;

/* loaded from: input_file:io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorBaseIT.class */
public class ElasticsearchConnectorBaseIT extends BaseConnectorIT {
    protected static final int NUM_RECORDS = 5;
    protected static final int TASKS_MAX = 1;
    protected static final String CONNECTOR_NAME = "es-connector";
    protected static final String TOPIC = "test";
    public static final String ELASTIC_MINIMAL_PRIVILEGES_NAME = "frank";
    public static final String ELASTIC_MINIMAL_PRIVILEGES_PASSWORD = "WatermelonInEasterHay";
    public static final String ELASTIC_DATA_STREAM_MINIMAL_PRIVILEGES_NAME = "bob";
    public static final String ELASTIC_DS_MINIMAL_PRIVILEGES_PASSWORD = "PeachesInGeorgia";
    private static final String ES_SINK_CONNECTOR_ROLE = "es_sink_connector_role";
    private static final String ES_SINK_CONNECTOR_DS_ROLE = "es_sink_connector_ds_role";
    protected static ElasticsearchContainer container;
    protected boolean isDataStream;
    protected ElasticsearchHelperClient helperClient;
    protected Map<String, String> props;
    protected String index;

    @AfterClass
    public static void cleanupAfterAll() {
        container.close();
    }

    @Before
    public void setup() {
        this.index = TOPIC;
        this.isDataStream = false;
        startConnect();
        this.connect.kafka().createTopic(TOPIC);
        this.props = createProps();
        this.helperClient = container.getHelperClient(this.props);
    }

    @After
    public void cleanup() throws IOException {
        stopConnect();
        if (!container.isRunning() || this.helperClient == null) {
            return;
        }
        try {
            this.helperClient.deleteIndex(this.index, this.isDataStream);
            this.helperClient.close();
        } catch (ConnectException e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> createProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", ElasticsearchSinkConnector.class.getName());
        hashMap.put("topics", TOPIC);
        hashMap.put("tasks.max", Integer.toString(TASKS_MAX));
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", JsonConverter.class.getName());
        hashMap.put("value.converter.schemas.enable", "false");
        hashMap.put("connection.url", container.getConnectionUrl());
        hashMap.put("key.ignore", "true");
        hashMap.put("schema.ignore", "true");
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runSimpleTest(Map<String, String> map) throws Exception {
        this.connect.configureConnector(CONNECTOR_NAME, map);
        waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
        writeRecords(NUM_RECORDS);
        verifySearchResults(NUM_RECORDS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setDataStream() {
        this.isDataStream = true;
        this.props.put("data.stream.type", "logs");
        this.props.put("data.stream.dataset", "dataset");
        this.index = "logs-dataset-test";
        this.props.put("connection.username", ELASTIC_DATA_STREAM_MINIMAL_PRIVILEGES_NAME);
        this.props.put("connection.password", ELASTIC_DS_MINIMAL_PRIVILEGES_PASSWORD);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setupFromContainer() {
        this.props.put("connection.url", container.getConnectionUrl());
        this.helperClient = new ElasticsearchHelperClient(this.props.get("connection.url"), new ElasticsearchSinkConnectorConfig(this.props));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifySearchResults(int i) throws Exception {
        waitForRecords(i);
        Iterator it = this.helperClient.search(this.index).iterator();
        while (it.hasNext()) {
            SearchHit searchHit = (SearchHit) it.next();
            int intValue = ((Integer) searchHit.getSourceAsMap().get("doc_num")).intValue();
            Assert.assertNotNull(Integer.valueOf(intValue));
            Assert.assertTrue(intValue < i);
            if (this.isDataStream) {
                Assert.assertTrue(searchHit.getIndex().contains(this.index));
            } else {
                Assert.assertEquals(this.index, searchHit.getIndex());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForRecords(int i) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            try {
                return this.helperClient.getDocCount(this.index) == ((long) i);
            } catch (ElasticsearchStatusException e) {
                if (e.getMessage().contains("index_not_found_exception")) {
                    return false;
                }
                throw e;
            }
        }, CONSUME_MAX_DURATION_MS, "Sufficient amount of document were not found in ES on time.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeRecords(int i) {
        writeRecordsFromStartIndex(0, i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeRecordsFromStartIndex(int i, int i2) {
        for (int i3 = i; i3 < i + i2; i3 += TASKS_MAX) {
            this.connect.kafka().produce(TOPIC, String.valueOf(i3), String.format("{\"doc_num\":%d,\"@timestamp\":\"2021-04-28T11:11:22.%03dZ\"}", Integer.valueOf(i3), Integer.valueOf(i3)));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static List<Role> getRoles() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(getMinimalPrivilegesRole(false));
        arrayList.add(getMinimalPrivilegesRole(true));
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Map<User, String> getUsers() {
        HashMap hashMap = new HashMap();
        hashMap.put(getMinimalPrivilegesUser(true), getMinimalPrivilegesPassword(true));
        hashMap.put(getMinimalPrivilegesUser(false), getMinimalPrivilegesPassword(false));
        return hashMap;
    }

    private static Role getMinimalPrivilegesRole(boolean z) {
        IndicesPrivileges build = IndicesPrivileges.builder().indices(new String[]{"*"}).privileges(new String[]{"create_index", "read", "write", "view_index_metadata"}).build();
        Role.Builder builder = Role.builder();
        return (z ? builder.clusterPrivileges(new String[]{"monitor"}) : builder).name(z ? ES_SINK_CONNECTOR_DS_ROLE : ES_SINK_CONNECTOR_ROLE).indicesPrivileges(new IndicesPrivileges[]{build}).build();
    }

    private static User getMinimalPrivilegesUser(boolean z) {
        return new User(z ? ELASTIC_DATA_STREAM_MINIMAL_PRIVILEGES_NAME : ELASTIC_MINIMAL_PRIVILEGES_NAME, Collections.singletonList(z ? ES_SINK_CONNECTOR_DS_ROLE : ES_SINK_CONNECTOR_ROLE));
    }

    private static String getMinimalPrivilegesPassword(boolean z) {
        return z ? ELASTIC_DS_MINIMAL_PRIVILEGES_PASSWORD : ELASTIC_MINIMAL_PRIVILEGES_PASSWORD;
    }
}
