package org.apache.kafka.connect.integration;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Filter;
import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.class */
public class ConnectorValidationIntegrationTest {
    private static final String WORKER_GROUP_ID = "connect-worker-group-id";
    private static EmbeddedConnectCluster connect;

    /* loaded from: input_file:org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest$AbstractTestConverter.class */
    public static abstract class AbstractTestConverter extends TestConverter {
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest$TestConverter.class */
    public static abstract class TestConverter implements Converter, HeaderConverter {
        public ConfigDef config() {
            return null;
        }

        public void configure(Map<String, ?> map, boolean z) {
        }

        public byte[] fromConnectData(String str, Schema schema, Object obj) {
            return null;
        }

        public SchemaAndValue toConnectData(String str, byte[] bArr) {
            return null;
        }

        public void close() throws IOException {
        }

        public void configure(Map<String, ?> map) {
        }

        public SchemaAndValue toConnectHeader(String str, String str2, byte[] bArr) {
            return null;
        }

        public byte[] fromConnectHeader(String str, String str2, Schema schema, Object obj) {
            return new byte[0];
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest$TestConverterWithConstructorThatThrowsException.class */
    public static class TestConverterWithConstructorThatThrowsException extends TestConverter {
        public TestConverterWithConstructorThatThrowsException() {
            throw new ConnectException("whoops");
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest$TestConverterWithNoConfigDef.class */
    public static class TestConverterWithNoConfigDef extends TestConverter {
        @Override // org.apache.kafka.connect.integration.ConnectorValidationIntegrationTest.TestConverter
        public ConfigDef config() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest$TestConverterWithPrivateConstructor.class */
    public static class TestConverterWithPrivateConstructor extends TestConverter {
        private TestConverterWithPrivateConstructor() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest$TestConverterWithSinglePropertyConfigDef.class */
    public static class TestConverterWithSinglePropertyConfigDef extends TestConverter {
        public static final String BOOLEAN_PROPERTY_NAME = "prop";

        @Override // org.apache.kafka.connect.integration.ConnectorValidationIntegrationTest.TestConverter
        public ConfigDef config() {
            return new ConfigDef().define(BOOLEAN_PROPERTY_NAME, ConfigDef.Type.BOOLEAN, ConfigDef.Importance.HIGH, "");
        }
    }

    @BeforeClass
    public static void setup() {
        HashMap hashMap = new HashMap();
        hashMap.put("group.id", WORKER_GROUP_ID);
        connect = new EmbeddedConnectCluster.Builder().name("connector-validation-connect-cluster").workerProps(hashMap).numWorkers(1).numBrokers(1).build();
        connect.start();
    }

    @AfterClass
    public static void close() {
        if (connect != null) {
            EmbeddedConnectCluster embeddedConnectCluster = connect;
            embeddedConnectCluster.getClass();
            Utils.closeQuietly(() -> {
                embeddedConnectCluster.stop();
            }, "Embedded Connect cluster");
        }
    }

    @Test
    public void testSinkConnectorHasNeitherTopicsListNorTopicsRegex() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.remove("topics");
        defaultSinkConnectorProps.remove("topics.regex");
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 2, "Sink connector config should fail preflight validation when neither topics list nor topics regex are provided");
    }

    @Test
    public void testSinkConnectorHasBothTopicsListAndTopicsRegex() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("topics", "t1");
        defaultSinkConnectorProps.put("topics.regex", "r.*");
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 2, "Sink connector config should fail preflight validation when both topics list and topics regex are provided");
    }

    @Test
    public void testSinkConnectorDeadLetterQueueTopicInTopicsList() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("topics", "t1");
        defaultSinkConnectorProps.put("errors.deadletterqueue.topic.name", "t1");
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Sink connector config should fail preflight validation when DLQ topic is included in topics list");
    }

    @Test
    public void testSinkConnectorDeadLetterQueueTopicMatchesTopicsRegex() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("topics.regex", "r.*");
        defaultSinkConnectorProps.put("errors.deadletterqueue.topic.name", "ruh.roh");
        defaultSinkConnectorProps.remove("topics");
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Sink connector config should fail preflight validation when DLQ topic matches topics regex");
    }

    @Test
    public void testSinkConnectorDefaultGroupIdConflictsWithWorkerGroupId() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("name", "worker-group-id");
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Sink connector config should fail preflight validation when default consumer group ID conflicts with Connect worker group ID");
    }

    @Test
    public void testSinkConnectorOverriddenGroupIdConflictsWithWorkerGroupId() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("consumer.override.group.id", WORKER_GROUP_ID);
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Sink connector config should fail preflight validation when overridden consumer group ID conflicts with Connect worker group ID");
    }

    @Test
    public void testSourceConnectorHasDuplicateTopicCreationGroups() throws InterruptedException {
        Map<String, String> defaultSourceConnectorProps = defaultSourceConnectorProps();
        defaultSourceConnectorProps.put("topic.creation.groups", "g1, g2, g1");
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSourceConnectorProps.get("connector.class"), defaultSourceConnectorProps, 1, "Source connector config should fail preflight validation when the same topic creation group is specified multiple times");
    }

    @Test
    public void testConnectorHasDuplicateTransformations() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("transforms", "t, t");
        defaultSinkConnectorProps.put("transforms.t.type", Filter.class.getName());
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when the same transformation is specified multiple times");
    }

    @Test
    public void testConnectorHasMissingTransformClass() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("transforms", "t");
        defaultSinkConnectorProps.put("transforms.t.type", "WheresTheFruit");
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when a transformation with a class not found on the worker is specified");
    }

    @Test
    public void testConnectorHasInvalidTransformClass() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("transforms", "t");
        defaultSinkConnectorProps.put("transforms.t.type", MonitorableSinkConnector.class.getName());
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when a transformation with a class of the wrong type is specified");
    }

    @Test
    public void testConnectorHasNegationForUndefinedPredicate() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("transforms", "t");
        defaultSinkConnectorProps.put("transforms.t.type", Filter.class.getName());
        defaultSinkConnectorProps.put("transforms.t.negate", "true");
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when an undefined predicate is negated");
    }

    @Test
    public void testConnectorHasDuplicatePredicates() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("predicates", "p, p");
        defaultSinkConnectorProps.put("predicates.p.type", RecordIsTombstone.class.getName());
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when the same predicate is specified multiple times");
    }

    @Test
    public void testConnectorHasMissingPredicateClass() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("predicates", "p");
        defaultSinkConnectorProps.put("predicates.p.type", "WheresTheFruit");
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when a predicate with a class not found on the worker is specified");
    }

    @Test
    public void testConnectorHasInvalidPredicateClass() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("predicates", "p");
        defaultSinkConnectorProps.put("predicates.p.type", MonitorableSinkConnector.class.getName());
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when a predicate with a class of the wrong type is specified");
    }

    @Test
    public void testConnectorHasMissingConverterClass() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("key.converter", "WheresTheFruit");
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when a converter with a class not found on the worker is specified");
    }

    @Test
    public void testConnectorHasInvalidConverterClassType() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("key.converter", MonitorableSinkConnector.class.getName());
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when a converter with a class of the wrong type is specified", 0L);
    }

    @Test
    public void testConnectorHasAbstractConverter() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("key.converter", AbstractTestConverter.class.getName());
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when an abstract converter class is specified");
    }

    @Test
    public void testConnectorHasConverterWithNoSuitableConstructor() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("key.converter", TestConverterWithPrivateConstructor.class.getName());
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when a converter class with no suitable constructor is specified");
    }

    @Test
    public void testConnectorHasConverterThatThrowsExceptionOnInstantiation() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("key.converter", TestConverterWithConstructorThatThrowsException.class.getName());
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when a converter class that throws an exception on instantiation is specified");
    }

    @Test
    public void testConnectorHasMissingHeaderConverterClass() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("header.converter", "WheresTheFruit");
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when a header converter with a class not found on the worker is specified");
    }

    @Test
    public void testConnectorHasInvalidHeaderConverterClassType() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("header.converter", MonitorableSinkConnector.class.getName());
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when a header converter with a class of the wrong type is specified");
    }

    @Test
    public void testConnectorHasAbstractHeaderConverter() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("header.converter", AbstractTestConverter.class.getName());
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when an abstract header converter class is specified");
    }

    @Test
    public void testConnectorHasHeaderConverterWithNoSuitableConstructor() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("header.converter", TestConverterWithPrivateConstructor.class.getName());
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when a header converter class with no suitable constructor is specified");
    }

    @Test
    public void testConnectorHasHeaderConverterThatThrowsExceptionOnInstantiation() throws InterruptedException {
        Map<String, String> defaultSinkConnectorProps = defaultSinkConnectorProps();
        defaultSinkConnectorProps.put("header.converter", TestConverterWithConstructorThatThrowsException.class.getName());
        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(defaultSinkConnectorProps.get("connector.class"), defaultSinkConnectorProps, 1, "Connector config should fail preflight validation when a header converter class that throws an exception on instantiation is specified");
    }

    private Map<String, String> defaultSourceConnectorProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("name", "source-connector");
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", "1");
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, "t1");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        return hashMap;
    }

    private Map<String, String> defaultSinkConnectorProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("name", "sink-connector");
        hashMap.put("connector.class", MonitorableSinkConnector.class.getSimpleName());
        hashMap.put("tasks.max", "1");
        hashMap.put("topics", "t1");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        return hashMap;
    }
}
