package org.apache.kafka.connect.integration;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.class */
public class ConnectorClientPolicyIntegrationTest {
    private static final int NUM_TASKS = 1;
    private static final int NUM_WORKERS = 1;
    private static final String CONNECTOR_NAME = "simple-conn";

    @After
    public void close() {
    }

    @Test
    public void testCreateWithOverridesForNonePolicy() throws Exception {
        Map<String, String> basicConnectorConfig = basicConnectorConfig();
        basicConnectorConfig.put("consumer.override.sasl.jaas.config", "sasl");
        assertFailCreateConnector("None", basicConnectorConfig);
    }

    @Test
    public void testCreateWithNotAllowedOverridesForPrincipalPolicy() throws Exception {
        Map<String, String> basicConnectorConfig = basicConnectorConfig();
        basicConnectorConfig.put("consumer.override.sasl.jaas.config", "sasl");
        basicConnectorConfig.put("consumer.override.auto.offset.reset", "latest");
        assertFailCreateConnector("Principal", basicConnectorConfig);
    }

    @Test
    public void testCreateWithAllowedOverridesForPrincipalPolicy() throws Exception {
        Map<String, String> basicConnectorConfig = basicConnectorConfig();
        basicConnectorConfig.put("consumer.override.security.protocol", "PLAINTEXT");
        assertPassCreateConnector("Principal", basicConnectorConfig);
    }

    @Test
    public void testCreateWithAllowedOverridesForAllPolicy() throws Exception {
        Map<String, String> basicConnectorConfig = basicConnectorConfig();
        basicConnectorConfig.put("consumer.override.client.id", "test");
        assertPassCreateConnector("All", basicConnectorConfig);
    }

    @Test
    public void testCreateWithNoAllowedOverridesForNonePolicy() throws Exception {
        assertPassCreateConnector("None", basicConnectorConfig());
    }

    @Test
    public void testCreateWithAllowedOverridesForDefaultPolicy() throws Exception {
        Map<String, String> basicConnectorConfig = basicConnectorConfig();
        basicConnectorConfig.put("consumer.override.client.id", "test");
        assertPassCreateConnector(null, basicConnectorConfig);
    }

    private EmbeddedConnectCluster connectClusterWithPolicy(String str) throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("offset.flush.interval.ms", String.valueOf(5000));
        if (str != null) {
            hashMap.put("connector.client.config.override.policy", str);
        }
        Properties properties = new Properties();
        properties.put("auto.create.topics.enable", "false");
        EmbeddedConnectCluster build = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(1).numBrokers(1).workerProps(hashMap).brokerProps(properties).build();
        build.start();
        build.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time.");
        return build;
    }

    private void assertFailCreateConnector(String str, Map<String, String> map) throws InterruptedException {
        EmbeddedConnectCluster connectClusterWithPolicy = connectClusterWithPolicy(str);
        try {
            try {
                connectClusterWithPolicy.configureConnector(CONNECTOR_NAME, map);
                Assert.fail("Shouldn't be able to create connector");
                connectClusterWithPolicy.stop();
            } catch (ConnectRestException e) {
                Assert.assertEquals(400L, e.statusCode());
                connectClusterWithPolicy.stop();
            }
        } catch (Throwable th) {
            connectClusterWithPolicy.stop();
            throw th;
        }
    }

    private void assertPassCreateConnector(String str, Map<String, String> map) throws InterruptedException {
        EmbeddedConnectCluster connectClusterWithPolicy = connectClusterWithPolicy(str);
        try {
            try {
                connectClusterWithPolicy.configureConnector(CONNECTOR_NAME, map);
                connectClusterWithPolicy.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector tasks did not start in time.");
                connectClusterWithPolicy.stop();
            } catch (ConnectRestException e) {
                Assert.fail("Should be able to create connector");
                connectClusterWithPolicy.stop();
            }
        } catch (Throwable th) {
            connectClusterWithPolicy.stop();
            throw th;
        }
    }

    public Map<String, String> basicConnectorConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSinkConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put("topics", "test-topic");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        return hashMap;
    }
}
