package org.apache.kafka.connect.integration;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.class */
public class StandaloneWorkerIntegrationTest {
    private static final String CONNECTOR_NAME = "test-connector";
    private static final int NUM_TASKS = 4;
    private static final String TOPIC_NAME = "test-topic";
    private EmbeddedConnectStandalone connect;

    @Before
    public void setup() {
        this.connect = new EmbeddedConnectStandalone.Builder().build();
        this.connect.start();
    }

    @After
    public void cleanup() {
        this.connect.stop();
    }

    @Test
    public void testDynamicLogging() {
        Map allLogLevels = this.connect.allLogLevels();
        Assert.assertFalse("Connect REST API did not list any known loggers", allLogLevels.isEmpty());
        Assert.assertEquals("No loggers should have a non-null last-modified timestamp", Collections.emptyMap(), Utils.filterMap(allLogLevels, StandaloneWorkerIntegrationTest::isModified));
        this.connect.setLogLevel("org.apache.kafka.connect", "ERROR", null);
        Map<String, LoggerLevel> testSetLoggingLevel = testSetLoggingLevel("org.apache.kafka.connect", "DEBUG", null, allLogLevels);
        this.connect.setLogLevel("org.apache.kafka.clients", "WARN", "worker");
        Map<String, LoggerLevel> testSetLoggingLevel2 = testSetLoggingLevel("org.apache.kafka.clients", "INFO", "worker", testSetLoggingLevel);
        LoggerLevel logLevel = this.connect.getLogLevel("org.apache.kafka.clients");
        this.connect.setLogLevel("org.apache.kafka.clients", "INFO", "worker");
        Assert.assertEquals("Log level and last-modified timestamp should not be affected by consecutive identical requests", logLevel, this.connect.getLogLevel("org.apache.kafka.clients"));
        this.connect.setLogLevel("org.apache.kafka.streams", "DEBUG", "cluster");
        testSetLoggingLevel("org.apache.kafka.streams", "TRACE", "cluster", testSetLoggingLevel2);
    }

    private Map<String, LoggerLevel> testSetLoggingLevel(String str, String str2, String str3, Map<String, LoggerLevel> map) {
        long currentTimeMillis = System.currentTimeMillis();
        List logLevel = this.connect.setLogLevel(str, str2, str3);
        if ("cluster".equals(str3)) {
            Assert.assertNull("Modifying log levels with scope=cluster should result in an empty response", logLevel);
        } else {
            Assert.assertTrue(logLevel.contains(str));
            Assert.assertEquals("No loggers outside the namespace '" + str + "' should have been included in the response for a request to modify that namespace", Collections.emptyList(), (List) logLevel.stream().filter(str4 -> {
                return !str4.startsWith(str);
            }).collect(Collectors.toList()));
        }
        LoggerLevel logLevel2 = this.connect.getLogLevel(str);
        Assert.assertNotNull(logLevel2);
        Assert.assertEquals(str2, logLevel2.level());
        Assert.assertNotNull(logLevel2.lastModified());
        Assert.assertTrue("Last-modified timestamp for logger level is " + logLevel2.lastModified() + ", which is before " + currentTimeMillis + ", the most-recent time the level was adjusted", logLevel2.lastModified().longValue() >= currentTimeMillis);
        Map<String, LoggerLevel> allLogLevels = this.connect.allLogLevels();
        Assert.assertEquals("At least one logger in the affected namespace '" + str + "' does not have the expected level of '" + str2 + "', has a null last-modified timestamp, or has a last-modified timestamp that is less recent than " + currentTimeMillis + ", which is when the namespace was last adjusted", Collections.emptyMap(), Utils.filterMap(allLogLevels, entry -> {
            return hasNamespace(entry, str) && !(level(entry).equals(str2) && isModified(entry) && lastModified(entry).longValue() >= currentTimeMillis);
        }));
        Assert.assertEquals("At least one logger was present in the listing of all loggers before the logging level for namespace '" + str + "' was set to '" + str2 + "' that is no longer present", Collections.emptySet(), Utils.diff(HashSet::new, map.keySet(), allLogLevels.keySet()));
        Assert.assertEquals("At least one logger outside of the affected namespace '" + str + "' has a different logging level or last-modified timestamp than it did before the namespace was set to level '" + str2 + "'; none of these loggers should have been affected", Collections.emptyMap(), Utils.filterMap(allLogLevels, entry2 -> {
            return (hasNamespace(entry2, str) || ((LoggerLevel) entry2.getValue()).equals(map.get(entry2.getKey()))) ? false : true;
        }));
        return allLogLevels;
    }

    private static boolean hasNamespace(Map.Entry<String, ?> entry, String str) {
        return entry.getKey().startsWith(str);
    }

    private static boolean isModified(Map.Entry<?, LoggerLevel> entry) {
        return lastModified(entry) != null;
    }

    private static Long lastModified(Map.Entry<?, LoggerLevel> entry) {
        return entry.getValue().lastModified();
    }

    private static String level(Map.Entry<?, LoggerLevel> entry) {
        return entry.getValue().level();
    }

    @Test
    public void testCreateConnectorWithStoppedInitialState() throws Exception {
        this.connect.configureConnector(new CreateConnectorRequest(CONNECTOR_NAME, defaultSourceConnectorProps(TOPIC_NAME), CreateConnectorRequest.InitialState.STOPPED));
        this.connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector was not created in a stopped state");
        Assert.assertEquals(Collections.emptyList(), this.connect.connectorInfo(CONNECTOR_NAME).tasks());
        Assert.assertEquals(Collections.emptyList(), this.connect.taskConfigs(CONNECTOR_NAME));
        this.connect.resumeConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, "Connector or tasks did not start running healthily in time");
    }

    private Map<String, String> defaultSourceConnectorProps(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(NUM_TASKS));
        hashMap.put(MonitorableSourceConnector.TOPIC_CONFIG, str);
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        return hashMap;
    }
}
