package io.confluent.connect.elasticsearch.integration;

import io.confluent.common.utils.IntegrationTest;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnector;
import io.confluent.connect.elasticsearch.jest.JestElasticsearchClient;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.core.Search;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.MountableFile;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/connect/elasticsearch/integration/SecurityIT.class */
public class SecurityIT {
    protected static ElasticsearchContainer container;
    private EmbeddedConnectCluster connect;
    private static final String MESSAGE_KEY = "message-key";
    private static final String MESSAGE_VAL = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }";
    private static final String CONNECTOR_NAME = "elastic-sink";
    private static final String KAFKA_TOPIC = "test-elasticsearch-sink";
    private static final String TYPE_NAME = "kafka-connect";
    private static final int TASKS_MAX = 1;
    private static final int NUM_MSG = 200;
    private static final Logger log = LoggerFactory.getLogger(SecurityIT.class);
    private static final long VERIFY_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(2);

    /* renamed from: io.confluent.connect.elasticsearch.integration.SecurityIT$1, reason: invalid class name */
    /* loaded from: input_file:io/confluent/connect/elasticsearch/integration/SecurityIT$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$testcontainers$containers$output$OutputFrame$OutputType = new int[OutputFrame.OutputType.values().length];

        static {
            try {
                $SwitchMap$org$testcontainers$containers$output$OutputFrame$OutputType[OutputFrame.OutputType.STDOUT.ordinal()] = SecurityIT.TASKS_MAX;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$testcontainers$containers$output$OutputFrame$OutputType[OutputFrame.OutputType.STDERR.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$testcontainers$containers$output$OutputFrame$OutputType[OutputFrame.OutputType.END.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @BeforeClass
    public static void setupBeforeAll() {
        container = new ElasticsearchContainer(ElasticsearchIntegrationTestBase.getElasticsearchDockerImageName() + ":" + ElasticsearchIntegrationTestBase.getElasticsearchContainerVersion());
        container.withSharedMemorySize(2147483648L);
        container.withCopyFileToContainer(MountableFile.forHostPath("./src/test/resources/elasticsearch.yml"), "/usr/share/elasticsearch/config/elasticsearch.yml");
        container.withCopyFileToContainer(MountableFile.forHostPath("./src/test/resources/certs"), "/usr/share/elasticsearch/config/certs");
        container.withLogConsumer(SecurityIT::containerLog);
        container.waitingFor(Wait.forLogMessage(".*(Security is enabled|license .* valid).*", TASKS_MAX).withStartupTimeout(Duration.ofMinutes(20L)));
    }

    @AfterClass
    public static void teardownAfterAll() {
        container.close();
    }

    @Before
    public void setup() throws IOException {
        this.connect = new EmbeddedConnectCluster.Builder().name("elastic-sink-cluster").build();
        this.connect.start();
    }

    @After
    public void close() {
        this.connect.stop();
    }

    private Map<String, String> getProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", ElasticsearchSinkConnector.class.getName());
        hashMap.put("topics", KAFKA_TOPIC);
        hashMap.put("tasks.max", Integer.toString(TASKS_MAX));
        hashMap.put("value.converter", JsonConverter.class.getName());
        hashMap.put("type.name", TYPE_NAME);
        hashMap.put("batch.size", "1");
        return hashMap;
    }

    @Test
    @Ignore("Enable this once a fix is found for Jenkins; see CC-4886")
    public void testSecureConnection() throws Throwable {
        String format = String.format("https://%s:%d", container.getContainerIpAddress(), container.getMappedPort(9200));
        log.info("Creating connector for {}", format);
        this.connect.kafka().createTopic(KAFKA_TOPIC, TASKS_MAX);
        Map<String, String> props = getProps();
        props.put("connection.url", format);
        props.put("elastic.security.protocol", "SSL");
        props.put("elastic.https.ssl.keystore.location", "./src/test/resources/certs/keystore.jks");
        props.put("elastic.https.ssl.keystore.password", "asdfasdf");
        props.put("elastic.https.ssl.key.password", "asdfasdf");
        props.put("elastic.https.ssl.truststore.location", "./src/test/resources/certs/truststore.jks");
        props.put("elastic.https.ssl.truststore.password", "asdfasdf");
        this.connect.configureConnector(CONNECTOR_NAME, props);
        TestUtils.waitForCondition(() -> {
            ConnectorStateInfo connectorStatus = this.connect.connectorStatus(CONNECTOR_NAME);
            return (connectorStatus == null || connectorStatus.tasks() == null || connectorStatus.tasks().size() != TASKS_MAX) ? false : true;
        }, "Timed out waiting for connector task to start");
        for (int i = 0; i < NUM_MSG; i += TASKS_MAX) {
            this.connect.kafka().produce(KAFKA_TOPIC, MESSAGE_KEY + i, MESSAGE_VAL);
        }
        verify(getClient(props));
        verify(getClient(props));
    }

    private JestClient getClient(Map<String, String> map) {
        JestClientFactory jestClientFactory = new JestClientFactory();
        jestClientFactory.setHttpClientConfig(JestElasticsearchClient.getClientConfig(map));
        return jestClientFactory.getObject();
    }

    private void verify(JestClient jestClient) throws Throwable {
        Search build = ((Search.Builder) ((Search.Builder) new Search.Builder("{}").addIndex(KAFKA_TOPIC)).addType(TYPE_NAME)).build();
        TestUtils.waitForCondition(() -> {
            try {
                int asInt = jestClient.execute(build).getJsonObject().get("hits").getAsJsonObject().get("total").getAsJsonObject().get("value").getAsInt();
                log.debug("Found {} documents", Integer.valueOf(asInt));
                return asInt == NUM_MSG;
            } catch (Exception e) {
                log.error("Retrying after failing to read data from Elastic: {}", e.getMessage(), e);
                return false;
            }
        }, VERIFY_TIMEOUT_MS, "Could not read data from Elastic");
    }

    protected static void containerLog(OutputFrame outputFrame) {
        switch (AnonymousClass1.$SwitchMap$org$testcontainers$containers$output$OutputFrame$OutputType[outputFrame.getType().ordinal()]) {
            case TASKS_MAX /* 1 */:
                System.out.print("\u001b[33m" + outputFrame.getUtf8String());
                System.out.print("\u001b[0m");
                return;
            case 2:
                System.err.print("\u001b[31m" + outputFrame.getUtf8String());
                System.out.print("\u001b[0m");
                return;
            case 3:
            default:
                return;
        }
    }
}
