package org.apache.kafka.connect.integration;

import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/connect/integration/RestForwardingIntegrationTest.class */
public class RestForwardingIntegrationTest {
    private Map<String, Object> sslConfig;

    @Mock
    private Plugins plugins;
    private RestServer followerServer;

    @Mock
    private Herder followerHerder;
    private RestServer leaderServer;

    @Mock
    private Herder leaderHerder;
    private SslContextFactory factory;
    private CloseableHttpClient httpClient;
    private Collection<CloseableHttpResponse> responses;

    @Before
    public void setUp() throws IOException, GeneralSecurityException {
        this.sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert");
        this.responses = new ArrayList();
    }

    @After
    public void tearDown() throws IOException {
        AutoCloseable autoCloseable;
        AutoCloseable autoCloseable2;
        AutoCloseable autoCloseable3;
        Iterator<CloseableHttpResponse> it = this.responses.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        AtomicReference atomicReference = new AtomicReference();
        AutoCloseable[] autoCloseableArr = new AutoCloseable[4];
        autoCloseableArr[0] = this.httpClient;
        if (this.followerServer != null) {
            RestServer restServer = this.followerServer;
            restServer.getClass();
            autoCloseable = restServer::stop;
        } else {
            autoCloseable = null;
        }
        autoCloseableArr[1] = autoCloseable;
        if (this.leaderServer != null) {
            RestServer restServer2 = this.leaderServer;
            restServer2.getClass();
            autoCloseable2 = restServer2::stop;
        } else {
            autoCloseable2 = null;
        }
        autoCloseableArr[2] = autoCloseable2;
        if (this.factory != null) {
            SslContextFactory sslContextFactory = this.factory;
            sslContextFactory.getClass();
            autoCloseable3 = sslContextFactory::stop;
        } else {
            autoCloseable3 = null;
        }
        autoCloseableArr[3] = autoCloseable3;
        Utils.closeAllQuietly(atomicReference, "clientsAndServers", autoCloseableArr);
        if (atomicReference.get() != null) {
            throw new RuntimeException("Unable to cleanly close resources", (Throwable) atomicReference.get());
        }
    }

    @Test
    public void testRestForwardNoSsl() throws Exception {
        testRestForwardToLeader(false, false, false);
    }

    @Test
    public void testRestForwardNoSslDualListener() throws Exception {
        testRestForwardToLeader(true, false, false);
    }

    @Test
    public void testRestForwardLeaderSsl() throws Exception {
        testRestForwardToLeader(true, false, true);
    }

    @Test
    public void testRestForwardFollowerSsl() throws Exception {
        testRestForwardToLeader(true, true, false);
    }

    @Test
    public void testRestForwardSslDualListener() throws Exception {
        testRestForwardToLeader(true, true, true);
    }

    @Test
    public void testRestForwardSsl() throws Exception {
        testRestForwardToLeader(false, true, true);
    }

    public void testRestForwardToLeader(boolean z, boolean z2, boolean z3) throws Exception {
        DistributedConfig distributedConfig = new DistributedConfig(baseWorkerProps(z, z2));
        DistributedConfig distributedConfig2 = new DistributedConfig(baseWorkerProps(z, z3));
        this.followerServer = new RestServer(distributedConfig, new RestClient(distributedConfig));
        this.followerServer.initializeServer();
        Mockito.when(this.followerHerder.plugins()).thenReturn(this.plugins);
        this.followerServer.initializeResources(this.followerHerder);
        this.leaderServer = new RestServer(distributedConfig2, new RestClient(distributedConfig2));
        this.leaderServer.initializeServer();
        Mockito.when(this.leaderHerder.plugins()).thenReturn(this.plugins);
        this.leaderServer.initializeResources(this.leaderHerder);
        this.factory = SSLUtils.createClientSideSslContextFactory(distributedConfig);
        this.factory.start();
        this.httpClient = HttpClients.custom().setSSLContext(this.factory.getSslContext()).build();
        NotLeaderException notLeaderException = new NotLeaderException("Not leader", this.leaderServer.advertisedUrl().toString());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Callback.class);
        ((Herder) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) forClass.getValue()).onCompletion(notLeaderException, (Object) null);
            return null;
        }).when(this.followerHerder)).putConnectorConfig((String) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), (Callback) forClass.capture());
        Herder.Created created = new Herder.Created(true, new ConnectorInfo("blah", Collections.emptyMap(), Collections.emptyList(), ConnectorType.SOURCE));
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Callback.class);
        ((Herder) Mockito.doAnswer(invocationOnMock2 -> {
            ((Callback) forClass2.getValue()).onCompletion((Throwable) null, created);
            return null;
        }).when(this.leaderHerder)).putConnectorConfig((String) ArgumentMatchers.any(), (Map) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), (Callback) forClass2.capture());
        URI advertisedUrl = this.followerServer.advertisedUrl();
        HttpPost httpPost = new HttpPost("/connectors");
        StringEntity stringEntity = new StringEntity("{\"name\": \"blah\",\"config\": {}}", StandardCharsets.UTF_8.name());
        stringEntity.setContentType("application/json");
        httpPost.setEntity(stringEntity);
        Assertions.assertEquals(201, executeRequest(advertisedUrl, httpPost).getStatusLine().getStatusCode());
    }

    private Map<String, String> baseWorkerProps(boolean z, boolean z2) {
        HashMap hashMap = new HashMap();
        hashMap.put("status.storage.topic", "status-topic");
        hashMap.put("config.storage.topic", "config-topic");
        hashMap.put("bootstrap.servers", "localhost:9092");
        hashMap.put("group.id", "connect-test-group");
        hashMap.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("offset.storage.topic", "connect-offsets");
        if (z || z2) {
            for (String str : this.sslConfig.keySet()) {
                if (this.sslConfig.get(str) instanceof Password) {
                    hashMap.put(str, ((Password) this.sslConfig.get(str)).value());
                } else if (this.sslConfig.get(str) instanceof List) {
                    hashMap.put(str, String.join(",", (List) this.sslConfig.get(str)));
                } else {
                    hashMap.put(str, this.sslConfig.get(str).toString());
                }
            }
        }
        if (z) {
            hashMap.put("listeners", "http://localhost:0, https://localhost:0");
            hashMap.put("rest.advertised.listener", z2 ? "https" : "http");
        } else {
            hashMap.put("listeners", z2 ? "https://localhost:0" : "http://localhost:0");
        }
        return hashMap;
    }

    private HttpResponse executeRequest(URI uri, HttpRequest httpRequest) throws IOException {
        CloseableHttpResponse execute = this.httpClient.execute(new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme()), httpRequest);
        this.responses.add(execute);
        return execute;
    }
}
