package org.apache.kafka.connect.integration;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectorHealth;
import org.apache.kafka.connect.health.ConnectorState;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.health.TaskState;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/RestExtensionIntegrationTest.class */
public class RestExtensionIntegrationTest {
    private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
    private static final long CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
    private static final int NUM_WORKERS = 1;
    private EmbeddedConnectCluster connect;

    /* loaded from: input_file:org/apache/kafka/connect/integration/RestExtensionIntegrationTest$IntegrationTestRestExtension.class */
    public static class IntegrationTestRestExtension implements ConnectRestExtension {
        private static IntegrationTestRestExtension instance;
        public ConnectRestExtensionContext restPluginContext;

        @Path("integration-test-rest-extension")
        /* loaded from: input_file:org/apache/kafka/connect/integration/RestExtensionIntegrationTest$IntegrationTestRestExtension$IntegrationTestRestExtensionResource.class */
        public static class IntegrationTestRestExtensionResource {
            @GET
            @Path("/registered")
            public boolean isRegistered() {
                return true;
            }
        }

        public void register(ConnectRestExtensionContext connectRestExtensionContext) {
            instance = this;
            this.restPluginContext = connectRestExtensionContext;
            connectRestExtensionContext.clusterState().connectors();
            connectRestExtensionContext.configurable().register(new IntegrationTestRestExtensionResource());
        }

        public void close() {
        }

        public void configure(Map<String, ?> map) {
        }

        public String version() {
            return "test";
        }
    }

    @Test
    public void testRestExtensionApi() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("rest.extension.classes", IntegrationTestRestExtension.class.getName());
        this.connect = new EmbeddedConnectCluster.Builder().name("connect-cluster").numWorkers(NUM_WORKERS).numBrokers(NUM_WORKERS).workerProps(hashMap).build();
        this.connect.start();
        this.connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Initial group of workers did not start in time.");
        WorkerHandle orElseThrow = this.connect.workers().stream().findFirst().orElseThrow(() -> {
            return new AssertionError("At least one worker handle should be available");
        });
        TestUtils.waitForCondition(this::extensionIsRegistered, REST_EXTENSION_REGISTRATION_TIMEOUT_MS, "REST extension was never registered");
        ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle("test-conn");
        try {
            HashMap hashMap2 = new HashMap();
            hashMap2.put("connector.class", MonitorableSinkConnector.class.getSimpleName());
            hashMap2.put("tasks.max", String.valueOf(NUM_WORKERS));
            hashMap2.put("topics", "test-topic");
            connectorHandle.taskHandle(connectorHandle.name() + "-0");
            StartAndStopLatch expectedStarts = connectorHandle.expectedStarts(NUM_WORKERS);
            this.connect.configureConnector(connectorHandle.name(), hashMap2);
            this.connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorHandle.name(), NUM_WORKERS, "Connector tasks did not start in time.");
            expectedStarts.await(CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            String format = String.format("%s:%d", orElseThrow.url().getHost(), Integer.valueOf(orElseThrow.url().getPort()));
            ConnectorHealth connectorHealth = new ConnectorHealth(connectorHandle.name(), new ConnectorState("RUNNING", format, (String) null), Collections.singletonMap(0, new TaskState(0, "RUNNING", format, (String) null)), ConnectorType.SINK);
            hashMap2.put("name", connectorHandle.name());
            TestUtils.waitForCondition(() -> {
                return verifyConnectorHealth(connectorHandle.name(), connectorHealth);
            }, CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, "Connector health and/or config was never accessible by the REST extension");
            RuntimeHandles.get().deleteConnector(connectorHandle.name());
        } catch (Throwable th) {
            RuntimeHandles.get().deleteConnector(connectorHandle.name());
            throw th;
        }
    }

    @After
    public void close() {
        this.connect.stop();
        IntegrationTestRestExtension unused = IntegrationTestRestExtension.instance = null;
    }

    private boolean extensionIsRegistered() {
        try {
            return this.connect.requestGet(this.connect.endpointForResource("integration-test-rest-extension/registered")).getStatus() < Response.Status.BAD_REQUEST.getStatusCode();
        } catch (ConnectException e) {
            return false;
        }
    }

    private boolean verifyConnectorHealth(String str, ConnectorHealth connectorHealth) {
        ConnectorHealth connectorHealth2 = IntegrationTestRestExtension.instance.restPluginContext.clusterState().connectorHealth(str);
        if (connectorHealth2.tasksState().isEmpty()) {
            return false;
        }
        Assert.assertEquals(connectorHealth, connectorHealth2);
        return true;
    }
}
