package org.apache.kafka.connect.integration;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/ConfluentLicenseInjectionIntegrationTest.class */
public class ConfluentLicenseInjectionIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(ConfluentLicenseInjectionIntegrationTest.class);
    private static final int NUM_TASKS = 1;
    private EmbeddedConnectCluster connect;
    private Map<String, String> workerProps = new HashMap();
    private Properties brokerProps = new Properties();

    @Before
    public void setup() {
        this.brokerProps.put("auto.create.topics.enable", String.valueOf(false));
    }

    @After
    public void close() {
        this.connect.stop();
    }

    @Test
    public void testInjectingDefaultLicensePropertiesIntoConnectors() throws InterruptedException {
        String num = Integer.toString(1);
        this.workerProps.put("offset.storage.replication.factor", num);
        this.workerProps.put("config.storage.replication.factor", num);
        this.workerProps.put("status.storage.replication.factor", num);
        this.workerProps.put("confluent.topic.replication.factor", num);
        this.connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1").workerProps(this.workerProps).numWorkers(1).numBrokers(1).brokerProps(this.brokerProps).build();
        this.connect.start();
        this.connect.assertions().assertExactlyNumBrokersAreUp(1, "Brokers did not start in time.");
        this.connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker did not start in time.");
        log.info("Completed startup of {} Kafka brokers and {} Connect workers", 1, 1);
        String simpleName = ConfluentLicensedMonitorableSourceConnector.class.getSimpleName();
        String simpleName2 = ConfluentUnlicensedMonitorableSourceConnector.class.getSimpleName();
        HashMap hashMap = new HashMap();
        hashMap.put("name", "licensedConnector");
        hashMap.put("connector.class", simpleName);
        hashMap.put("tasks.max", "1");
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(simpleName, hashMap, 1, "Validating licensed connector configuration produced an unexpected number of errors.");
        hashMap.put("foo", "something");
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(simpleName, hashMap, 0, "Validating licensed connector configuration produced an unexpected error.");
        this.connect.configureConnector("licensedConnector", hashMap);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("licensedConnector", 1, "Licensed connector tasks did not start in time.");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("name", "unlicensedConnector");
        hashMap2.put("connector.class", simpleName2);
        hashMap2.put("tasks.max", "1");
        hashMap2.put("bar", "something else");
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(simpleName2, hashMap2, 0, "Validating unlicensed connector configuration produced an unexpected error.");
        this.connect.configureConnector("unlicensedConnector", hashMap2);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("unlicensedConnector", 1, "Unlicensed connector tasks did not start in time.");
        HashMap hashMap3 = new HashMap();
        hashMap3.put("name", "anotherLicensedConnector");
        hashMap3.put("connector.class", simpleName);
        hashMap3.put("tasks.max", "1");
        hashMap3.put("foo", "something or other");
        hashMap3.put("confluent.license", "magic");
        hashMap3.put("confluent.topic", "my-license-topic");
        hashMap3.put("confluent.topic.replication.factor", "1");
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(simpleName, hashMap3, 1, "Validating licensed connector configuration produced an unexpected number of errors.");
        hashMap3.put("confluent.topic.bootstrap.servers", "localhost:8083");
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(simpleName, hashMap3, 0, "Validating overridden unlicensed connector configuration produced an unexpected error.");
        this.connect.configureConnector("anotherLicensedConnector", hashMap3);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("anotherLicensedConnector", 1, "Overridden licensed connector tasks did not start in time.");
        log.info("Stopping the Connect worker");
        this.connect.removeWorker();
        log.info("Starting the Connect worker");
        this.connect.startConnect();
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("licensedConnector", 1, "Licensed connector tasks did not restart in time.");
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("unlicensedConnector", 1, "Unlicensed connector tasks did not restart in time.");
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("anotherLicensedConnector", 1, "Overridden licensed connector tasks did not restart in time.");
    }

    @Test
    public void testNotInjectingDefaultLicensePropertiesIntoConnectorWhenFeatureIsDisabled() throws InterruptedException {
        String num = Integer.toString(1);
        this.workerProps.put("offset.storage.replication.factor", num);
        this.workerProps.put("config.storage.replication.factor", num);
        this.workerProps.put("status.storage.replication.factor", num);
        this.workerProps.put("confluent.topic.replication.factor", num);
        this.workerProps.put("confluent.license.inject.into.connectors", "false");
        this.connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1").workerProps(this.workerProps).numWorkers(1).numBrokers(1).brokerProps(this.brokerProps).build();
        this.connect.start();
        this.connect.assertions().assertExactlyNumBrokersAreUp(1, "Brokers did not start in time.");
        this.connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker did not start in time.");
        log.info("Completed startup of {} Kafka brokers and {} Connect workers", 1, 1);
        String simpleName = ConfluentLicensedMonitorableSourceConnector.class.getSimpleName();
        String simpleName2 = ConfluentUnlicensedMonitorableSourceConnector.class.getSimpleName();
        HashMap hashMap = new HashMap();
        hashMap.put("name", "licensedConnector");
        hashMap.put("connector.class", simpleName);
        hashMap.put("tasks.max", "1");
        hashMap.put("foo", "something");
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(simpleName, hashMap, 4, "Validating licensed connector configuration produced an unexpected number of errors.");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("name", "unlicensedConnector");
        hashMap2.put("connector.class", simpleName2);
        hashMap2.put("tasks.max", "1");
        hashMap2.put("bar", "something else");
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(simpleName2, hashMap2, 0, "Validating unlicensed connector configuration produced an unexpected error.");
        this.connect.configureConnector("unlicensedConnector", hashMap2);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("unlicensedConnector", 1, "Unlicensed connector tasks did not start in time.");
        HashMap hashMap3 = new HashMap();
        hashMap3.put("name", "anotherLicensedConnector");
        hashMap3.put("connector.class", simpleName);
        hashMap3.put("tasks.max", "1");
        hashMap3.put("foo", "something or other");
        hashMap3.put("confluent.license", "magic");
        hashMap3.put("confluent.topic", "my-license-topic");
        hashMap3.put("confluent.topic.replication.factor", "1");
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(simpleName, hashMap3, 1, "Validating licensed connector configuration produced an unexpected number of errors.");
        hashMap3.put("confluent.topic.bootstrap.servers", "localhost:8083");
        this.connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(simpleName, hashMap3, 0, "Validating overridden unlicensed connector configuration produced an unexpected error.");
        this.connect.configureConnector("anotherLicensedConnector", hashMap3);
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("anotherLicensedConnector", 1, "Overridden licensed connector tasks did not start in time.");
        log.info("Stopping the Connect worker");
        this.connect.removeWorker();
        log.info("Starting the Connect worker");
        this.connect.startConnect();
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("unlicensedConnector", 1, "Unlicensed connector tasks did not restart in time.");
        this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("anotherLicensedConnector", 1, "Overridden licensed connector tasks did not restart in time.");
    }
}
