package org.apache.kafka.connect.runtime;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.MockConnectMetrics;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.FutureCallback;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerWithLicensePropertiesTest.class */
public class WorkerWithLicensePropertiesTest {
    private static final String CONNECTOR_ID = "test-connector";
    private static final String WORKER_ID = "localhost:8083";
    private static final String PRODUCER_SSL_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\"";
    private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
    private Map<String, String> workerProps = new HashMap();
    private WorkerConfig config;
    private Worker worker;

    @Mock
    private Plugins plugins;

    @Mock
    private PluginClassLoader pluginLoader;

    @Mock
    private DelegatingClassLoader delegatingLoader;

    @Mock
    private OffsetBackingStore offsetBackingStore;

    @Mock
    private StatusBackingStore statusBackingStore;

    @Mock
    private ConnectorStatus.Listener connectorStatusListener;

    @Mock
    private Converter internalConverter;

    @Mock
    private Herder herder;

    @Mock
    private CloseableConnectorContext ctx;
    private MockedStatic<Plugins> pluginsUtil;
    private MockedStatic<WorkerConfig> workerConfigUtil;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerWithLicensePropertiesTest$LicensedTestConnector.class */
    public static class LicensedTestConnector extends SourceConnector {
        private static final ConfigDef CONFIG_DEF = new ConfigDef().define("confluent.topic.bootstrap.servers", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "").define("confluent.topic", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "");
        private boolean licenseCheckPassed = false;

        public String version() {
            return "1.0";
        }

        public void start(Map<String, String> map) {
            this.licenseCheckPassed = map.containsKey("confluent.topic.bootstrap.servers") && map.containsKey("confluent.topic") && map.containsKey("confluent.topic.replication.factor") && map.containsKey("confluent.topic.sasl.jaas.config");
        }

        public Class<? extends Task> taskClass() {
            return null;
        }

        public List<Map<String, String>> taskConfigs(int i) {
            return null;
        }

        public void stop() {
        }

        public ConfigDef config() {
            return CONFIG_DEF;
        }

        public boolean licenseCheckPassed() {
            return this.licenseCheckPassed;
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerWithLicensePropertiesTest$UnlicensedTestConnector.class */
    public static class UnlicensedTestConnector extends SourceConnector {
        private static final ConfigDef CONFIG_DEF = new ConfigDef().define("foot", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "");
        private boolean foundLicenseProperties = false;

        public String version() {
            return "1.0";
        }

        public void start(Map<String, String> map) {
            this.foundLicenseProperties = map.keySet().stream().anyMatch(str -> {
                return str.startsWith("confluent.");
            });
        }

        public Class<? extends Task> taskClass() {
            return null;
        }

        public List<Map<String, String>> taskConfigs(int i) {
            return null;
        }

        public void stop() {
        }

        public ConfigDef config() {
            return CONFIG_DEF;
        }

        public boolean foundLicenseProperties() {
            return this.foundLicenseProperties;
        }
    }

    @Before
    public void setup() {
        this.workerProps.put("bootstrap.servers", WORKER_ID);
        this.workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        this.workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        this.workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
        this.workerProps.put("sasl.jaas.config", PRODUCER_SSL_CONFIG);
        this.workerProps.put("metric.reporters", MockConnectMetrics.MockMetricsReporter.class.getName());
        this.config = new StandaloneConfig(this.workerProps);
        this.pluginsUtil = Mockito.mockStatic(Plugins.class);
    }

    @After
    public void tearDown() throws Exception {
        this.pluginsUtil.close();
        this.workerConfigUtil.close();
    }

    @Test
    public void testWorkerInjectsLicensePropertiesIntoLicensedConnector() throws Exception {
        LicensedTestConnector licensedTestConnector = new LicensedTestConnector();
        expectStartStorage();
        expectClusterId();
        String name = licensedTestConnector.getClass().getName();
        Mockito.when(this.plugins.connectorLoader(name)).thenReturn(this.pluginLoader);
        Mockito.when(this.plugins.newConnector(name)).thenReturn(licensedTestConnector);
        Mockito.when(this.plugins.withClassLoader((ClassLoader) Mockito.any(ClassLoader.class), (Runnable) Mockito.any(Runnable.class))).thenCallRealMethod();
        Mockito.when(this.plugins.withClassLoader((ClassLoader) Mockito.any(ClassLoader.class))).thenCallRealMethod();
        Map singletonMap = Collections.singletonMap("schemas.enable", "false");
        Mockito.when(this.plugins.newInternalConverter(false, JsonConverter.class.getName(), singletonMap)).thenReturn(this.internalConverter);
        Mockito.when(this.plugins.newInternalConverter(true, JsonConverter.class.getName(), singletonMap)).thenReturn(this.internalConverter);
        HashMap hashMap = new HashMap();
        hashMap.put("topics", "foo,bar");
        hashMap.put("tasks.max", "1");
        hashMap.put("name", CONNECTOR_ID);
        hashMap.put("connector.class", name);
        Mockito.when(Plugins.compareAndSwapLoaders(this.pluginLoader)).thenReturn(this.delegatingLoader);
        Mockito.when(Plugins.compareAndSwapLoaders(this.delegatingLoader)).thenReturn(this.pluginLoader);
        this.connectorStatusListener.onStartup(CONNECTOR_ID);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        FutureCallback futureCallback = new FutureCallback();
        this.worker.startConnector(CONNECTOR_ID, hashMap, this.ctx, this.connectorStatusListener, TargetState.STARTED, futureCallback);
        Assert.assertEquals(new HashSet(Arrays.asList(CONNECTOR_ID)), this.worker.connectorNames());
        futureCallback.get(5L, TimeUnit.SECONDS);
        Assert.assertTrue(licensedTestConnector.licenseCheckPassed());
        ((Plugins) Mockito.verify(this.plugins, Mockito.times(1))).connectorLoader(name);
        ((Plugins) Mockito.verify(this.plugins, Mockito.times(1))).newInternalConverter(false, JsonConverter.class.getName(), singletonMap);
        ((Plugins) Mockito.verify(this.plugins, Mockito.times(1))).newInternalConverter(true, JsonConverter.class.getName(), singletonMap);
    }

    @Test
    public void testStartUnlicensedConnectorWithNoInjectedProperties() throws Exception {
        UnlicensedTestConnector unlicensedTestConnector = new UnlicensedTestConnector();
        expectStartStorage();
        expectClusterId();
        String name = unlicensedTestConnector.getClass().getName();
        Mockito.when(this.plugins.connectorLoader(name)).thenReturn(this.pluginLoader);
        Mockito.when(this.plugins.newConnector(name)).thenReturn(unlicensedTestConnector);
        Mockito.when(this.plugins.withClassLoader((ClassLoader) Mockito.any(ClassLoader.class), (Runnable) Mockito.any(Runnable.class))).thenCallRealMethod();
        Mockito.when(this.plugins.withClassLoader((ClassLoader) Mockito.any(ClassLoader.class))).thenCallRealMethod();
        Map singletonMap = Collections.singletonMap("schemas.enable", "false");
        Mockito.when(this.plugins.newInternalConverter(false, JsonConverter.class.getName(), singletonMap)).thenReturn(this.internalConverter);
        Mockito.when(this.plugins.newInternalConverter(true, JsonConverter.class.getName(), singletonMap)).thenReturn(this.internalConverter);
        HashMap hashMap = new HashMap();
        hashMap.put("topics", "foo,bar");
        hashMap.put("tasks.max", "1");
        hashMap.put("name", CONNECTOR_ID);
        hashMap.put("connector.class", name);
        Mockito.when(Plugins.compareAndSwapLoaders(this.pluginLoader)).thenReturn(this.delegatingLoader, new ClassLoader[]{this.delegatingLoader});
        Mockito.when(Plugins.compareAndSwapLoaders(this.delegatingLoader)).thenReturn(this.pluginLoader);
        this.connectorStatusListener.onStartup(CONNECTOR_ID);
        this.worker = new Worker(WORKER_ID, new MockTime(), this.plugins, this.config, this.offsetBackingStore, this.noneConnectorClientConfigOverridePolicy);
        this.worker.herder = this.herder;
        this.worker.start();
        Assert.assertEquals(Collections.emptySet(), this.worker.connectorNames());
        FutureCallback futureCallback = new FutureCallback();
        this.worker.startConnector(CONNECTOR_ID, hashMap, this.ctx, this.connectorStatusListener, TargetState.STARTED, futureCallback);
        Assert.assertEquals(new HashSet(Arrays.asList(CONNECTOR_ID)), this.worker.connectorNames());
        futureCallback.get(5L, TimeUnit.SECONDS);
        Assert.assertFalse(unlicensedTestConnector.foundLicenseProperties());
        ((Plugins) Mockito.verify(this.plugins, Mockito.times(1))).connectorLoader(name);
        ((Plugins) Mockito.verify(this.plugins, Mockito.times(1))).newInternalConverter(false, JsonConverter.class.getName(), singletonMap);
        ((Plugins) Mockito.verify(this.plugins, Mockito.times(1))).newInternalConverter(true, JsonConverter.class.getName(), singletonMap);
    }

    private void expectClusterId() {
        this.workerConfigUtil = Mockito.mockStatic(WorkerConfig.class, "lookupKafkaClusterId");
        Mockito.when(WorkerConfig.lookupKafkaClusterId((WorkerConfig) Mockito.any(WorkerConfig.class))).thenReturn("test-cluster");
    }

    private void expectStartStorage() {
        this.offsetBackingStore.start();
    }
}
