package kafka.server.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import kafka.utils.Logging;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.ClusterLinkError;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.authenticator.TestJaasConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.StringOps$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.java8.JFunction0;

/* compiled from: ClusterLinkConnectionCheckerTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u00055d\u0001\u0002\u0013&\u00011BQa\r\u0001\u0005\u0002QBqa\u000e\u0001C\u0002\u0013%\u0001\b\u0003\u0004F\u0001\u0001\u0006I!\u000f\u0005\b\r\u0002\u0011\r\u0011\"\u0003H\u0011\u0019\u0001\u0006\u0001)A\u0005\u0011\"9\u0011\u000b\u0001b\u0001\n\u00139\u0005B\u0002*\u0001A\u0003%\u0001\nC\u0004T\u0001\t\u0007I\u0011B$\t\rQ\u0003\u0001\u0015!\u0003I\u0011\u001d)\u0006A1A\u0005\n\u001dCaA\u0016\u0001!\u0002\u0013A\u0005bB,\u0001\u0005\u0004%Ia\u0012\u0005\u00071\u0002\u0001\u000b\u0011\u0002%\t\u000fe\u0003!\u0019!C\u00055\"1a\f\u0001Q\u0001\nmCqa\u0018\u0001C\u0002\u0013%!\f\u0003\u0004a\u0001\u0001\u0006Ia\u0017\u0005\u0006C\u0002!\tA\u0019\u0005\u0006c\u0002!\tA\u0019\u0005\u0006m\u0002!\tA\u0019\u0005\u0006q\u0002!\tA\u0019\u0005\u0006u\u0002!\tA\u0019\u0005\u0006y\u0002!\tA\u0019\u0005\u0006}\u0002!\tA\u0019\u0005\u0007\u0003\u0003\u0001A\u0011\u00012\t\r\u0005\u0015\u0001\u0001\"\u0001c\u0011\u0019\tI\u0001\u0001C\u0001E\"1\u0011Q\u0002\u0001\u0005\u0002\tDa!!\u0005\u0001\t\u0003\u0011\u0007BBA\u000b\u0001\u0011\u0005!\r\u0003\u0004\u0002\u001a\u0001!\tA\u0019\u0005\u0007\u0003;\u0001A\u0011\u00012\t\r\u0005\u0005\u0002\u0001\"\u0001c\u0011\u0019\t)\u0003\u0001C\u0001E\"9\u0011\u0011\u0006\u0001\u0005\n\u0005-\"\u0001I\"mkN$XM\u001d'j].\u001cuN\u001c8fGRLwN\\\"iK\u000e\\WM\u001d+fgRT!AJ\u0014\u0002\t1Lgn\u001b\u0006\u0003Q%\naa]3sm\u0016\u0014(\"\u0001\u0016\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001!\f\t\u0003]Ej\u0011a\f\u0006\u0002a\u0005)1oY1mC&\u0011!g\f\u0002\u0007\u0003:L(+\u001a4\u0002\rqJg.\u001b;?)\u0005)\u0004C\u0001\u001c\u0001\u001b\u0005)\u0013!B1e[&tW#A\u001d\u0011\u0005i\u001aU\"A\u001e\u000b\u0005]b$BA\u001f?\u0003\u001d\u0019G.[3oiNT!AK \u000b\u0005\u0001\u000b\u0015AB1qC\u000eDWMC\u0001C\u0003\ry'oZ\u0005\u0003\tn\u0012abQ8oM2,XM\u001c;BI6Lg.\u0001\u0004bI6Lg\u000eI\u0001\u0012Kb\fW\u000e\u001d7f+Jd\u0017I\u001c3Q_J$X#\u0001%\u0011\u0005%sU\"\u0001&\u000b\u0005-c\u0015\u0001\u00027b]\u001eT\u0011!T\u0001\u0005U\u00064\u0018-\u0003\u0002P\u0015\n11\u000b\u001e:j]\u001e\f!#\u001a=b[BdW-\u0016:m\u0003:$\u0007k\u001c:uA\u0005\u00192m\u001c8gYV,g\u000e^+sY\u0006sG\rU8si\u0006!2m\u001c8gYV,g\u000e^+sY\u0006sG\rU8si\u0002\n1\u0003\\8dC2Dwn\u001d;Ve2\fe\u000e\u001a)peR\fA\u0003\\8dC2Dwn\u001d;Ve2\fe\u000e\u001a)peR\u0004\u0013a\u00043og2{wn[+q\u0007>tg-[4\u0002!\u0011t7\u000fT8pWV\u00038i\u001c8gS\u001e\u0004\u0013A\u0002;f]\u0006tG/A\u0004uK:\fg\u000e\u001e\u0011\u0002!\rdw.\u001e3MS:\\W*\u001a;sS\u000e\u001cX#A.\u0011\u0005Yb\u0016BA/&\u0005I\u0019E.^:uKJd\u0015N\\6NKR\u0014\u0018nY:\u0002#\rdw.\u001e3MS:\\W*\u001a;sS\u000e\u001c\b%A\tis\n\u0014\u0018\u000e\u001a'j].lU\r\u001e:jGN\f!\u0003[=ce&$G*\u001b8l\u001b\u0016$(/[2tA\u0005)1/\u001a;vaR\t1\r\u0005\u0002/I&\u0011Qm\f\u0002\u0005+:LG\u000f\u000b\u0002\u0013OB\u0011\u0001n\\\u0007\u0002S*\u0011!n[\u0001\u0004CBL'B\u00017n\u0003\u001dQW\u000f]5uKJT!A\\!\u0002\u000b),h.\u001b;\n\u0005AL'A\u0003\"fM>\u0014X-R1dQ\u0006a1\r[3dW\u0006cG\u000eU1tg\"\u00121c\u001d\t\u0003QRL!!^5\u0003\tQ+7\u000f^\u0001\u001fi\u0016\u001cHo\u00115fG.\fU\u000f\u001e5f]RL7-\u0019;j_:4\u0015-\u001b7ve\u0016D#\u0001F:\u0002;Q,7\u000f^\"iK\u000e\\\u0017)\u001e;i_JL'0\u0019;j_:4\u0015-\u001b7ve\u0016D#!F:\u0002!Q,7\u000f\u001e+j[\u0016|W\u000f^#se>\u0014\bF\u0001\ft\u0003\u0011\"Xm\u001d;E\u001dN\u0013Vm]8mkRLwN\u001c$bS2,(/\u001a$s_6\u001c\u0005.Z2l\u00032d\u0007FA\ft\u0003M\u001a\u0007.Z2l\u0013N\u001c\u0016m\u001d7NK\u000eD\u0017M\\5t[6K7o]5oO\u001a\u0013x.\\\"iK\u000e\\\u0017\t\u001c7G_J\u001cEn\\;e\u0019&t7\u000e\u000b\u0002\u0019g\u0006!4\r[3dW&\u001b8+Y:m\u0015\u0006\f7oQ8oM&<W*[:tS:<gI]8n\u0007\",7m[!mY\u001a{'o\u00117pk\u0012d\u0015N\\6)\u0005e\u0019\u0018aN2iK\u000e\\\u0017j]*fGV\u0014\u0018\u000e^=Qe>$xnY8m\u001b&\u001c8/\u001b8h\rJ|Wn\u00115fG.\fE\u000e\u001c$pe\"K(M]5e\u0019&t7\u000e\u000b\u0002\u001bg\u0006!B/Z:u\u0013:4\u0018\r\\5e\u0005>|Go\u001d;sCBD#aG:\u0002-Q,7\u000f^\"iK\u000e\\\u0017\t\u001a3sKN\u001cXm\u001d)bgND#\u0001H:\u0002OQ,7\u000f^\"iK\u000e\\\u0017\t\u001a3sKN\u001cXm\u001d$bS2\u001chI]8n\u0013:$XM\u001d8bYB{'\u000f\u001e\u0015\u0003;M\f\u0011\u0007^3ti\u000eCWmY6BI\u0012\u0014Xm]:fgB\u000b7o]3t\rJ|W.\u00138uKJt\u0017\r\u001c)peR4uN](o!J,W\u000e\u000b\u0002\u001fg\u000693\r[3dW&\u001b8+Y:m\u001b\u0016\u001c\u0007.\u00198jg6l\u0015n]:j]\u001e4uN]\"m_V$G*\u001b8lQ\ty2/\u0001\u0015dQ\u0016\u001c7.S:TCNd'*Y:t\u0007>tg-[4NSN\u001c\u0018N\\4G_J\u001cEn\\;e\u0019&t7\u000e\u000b\u0002!g\u0006Y3\r[3dW&\u001b8+Z2ve&$\u0018\u0010\u0015:pi>\u001cw\u000e\\'jgNLgn\u001a$pe\"K(M]5e\u0019&t7\u000e\u000b\u0002\"g\u000612\r[3dWR\u001b\u0007oQ8o]\u0016\u001cG/[8o!\u0006\u001c8\u000f\u000b\u0002#g\u0006A1\r[3dW\u0006cG\u000e\u0006\u0007\u0002.\u0005e\u00121HA#\u0003\u0013\n\u0019\u0006E\u0003/\u0003_\t\u0019$C\u0002\u00022=\u0012aa\u00149uS>t\u0007c\u0001\u001c\u00026%\u0019\u0011qG\u0013\u0003E\rcWo\u001d;fe2Kgn[\"p]:,7\r^5p]\u000eCWmY6feJ+7/\u001e7u\u0011\u001594\u00051\u0001:\u0011\u001d\tid\ta\u0001\u0003\u007f\taaY8oM&<\u0007c\u0001\u001c\u0002B%\u0019\u00111I\u0013\u0003#\rcWo\u001d;fe2Kgn[\"p]\u001aLw\r\u0003\u0004\u0002H\r\u0002\raW\u0001\u0013G2,8\u000f^3s\u0019&t7.T3ue&\u001c7\u000fC\u0004\u0002L\r\u0002\r!!\u0014\u00023\r|gN\\3di&|g\u000eV8SK6|G/Z\"mkN$XM\u001d\t\u0004]\u0005=\u0013bAA)_\t9!i\\8mK\u0006t\u0007bBA+G\u0001\u0007\u0011qK\u0001\ri\u0016t\u0017M\u001c;Qe\u00164\u0017\u000e\u001f\t\u0006]\u0005=\u0012\u0011\f\t\u0005\u00037\nIG\u0004\u0003\u0002^\u0005\u0015\u0004cAA0_5\u0011\u0011\u0011\r\u0006\u0004\u0003GZ\u0013A\u0002\u001fs_>$h(C\u0002\u0002h=\na\u0001\u0015:fI\u00164\u0017bA(\u0002l)\u0019\u0011qM\u0018")
/* loaded from: input_file:kafka/server/link/ClusterLinkConnectionCheckerTest.class */
public class ClusterLinkConnectionCheckerTest {
    private final ConfluentAdmin admin = (ConfluentAdmin) Mockito.mock(ConfluentAdmin.class);
    private final String exampleUrlAndPort = "www.example.com:443";
    private final String confluentUrlAndPort = "confluent.io:443";
    private final String localhostUrlAndPort = "localhost:9092";
    private final String dnsLookUpConfig = "use_all_dns_ips";
    private final String tenant = "test-tenant";
    private final ClusterLinkMetrics cloudLinkMetrics = new ClusterLinkMetrics("test-tenant_test-link-1", Uuid.randomUuid(), ClusterLinkConfig.LinkMode.DESTINATION, ConnectionMode$Outbound$.MODULE$, ConnectionMode$Inbound$.MODULE$, true, (ClusterLinkManager) Mockito.mock(ClusterLinkManager.class), None$.MODULE$, new Metrics(), new Some(tenant()), false);
    private final ClusterLinkMetrics hybridLinkMetrics = new ClusterLinkMetrics("test-tenant_test-link-1", Uuid.randomUuid(), ClusterLinkConfig.LinkMode.DESTINATION, ConnectionMode$Outbound$.MODULE$, ConnectionMode$Inbound$.MODULE$, false, (ClusterLinkManager) Mockito.mock(ClusterLinkManager.class), None$.MODULE$, new Metrics(), new Some(tenant()), false);

    private ConfluentAdmin admin() {
        return this.admin;
    }

    private String exampleUrlAndPort() {
        return this.exampleUrlAndPort;
    }

    private String confluentUrlAndPort() {
        return this.confluentUrlAndPort;
    }

    private String localhostUrlAndPort() {
        return this.localhostUrlAndPort;
    }

    private String dnsLookUpConfig() {
        return this.dnsLookUpConfig;
    }

    private String tenant() {
        return this.tenant;
    }

    private ClusterLinkMetrics cloudLinkMetrics() {
        return this.cloudLinkMetrics;
    }

    private ClusterLinkMetrics hybridLinkMetrics() {
        return this.hybridLinkMetrics;
    }

    @BeforeEach
    public void setup() {
        Mockito.reset(new ConfluentAdmin[]{admin()});
    }

    @Test
    public void checkAllPass() {
        DescribeClusterResult describeClusterResult = (DescribeClusterResult) Mockito.mock(DescribeClusterResult.class);
        KafkaFuture kafkaFuture = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        Mockito.when(admin().describeCluster((DescribeClusterOptions) ArgumentMatchers.any())).thenReturn(describeClusterResult);
        Mockito.when(describeClusterResult.clusterId()).thenReturn(kafkaFuture);
        Mockito.when(kafkaFuture.get()).thenReturn("123");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", new StringBuilder(1).append(exampleUrlAndPort()).append(",").append(confluentUrlAndPort()).toString());
        properties.put("client.dns.lookup", dnsLookUpConfig());
        properties.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user", "pass"));
        None$ none$ = None$.MODULE$;
        ConfluentAdmin admin = admin();
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$2 = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        Assertions.assertEquals(none$, checkAll(admin, clusterLinkConfig$.create(properties, none$2, true), cloudLinkMetrics(), true, new Some("lkc-123")));
    }

    @Test
    public void testCheckAuthenticationFailure() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new SaslAuthenticationException("Not authenticated"));
        DescribeClusterResult describeClusterResult = (DescribeClusterResult) Mockito.mock(DescribeClusterResult.class);
        Mockito.when(describeClusterResult.clusterId()).thenReturn(kafkaFutureImpl);
        Mockito.when(admin().describeCluster((DescribeClusterOptions) ArgumentMatchers.any())).thenReturn(describeClusterResult);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", localhostUrlAndPort());
        properties.put("client.dns.lookup", dnsLookUpConfig());
        properties.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user", "pass"));
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        ClusterLinkConfig create = clusterLinkConfig$.create(properties, none$, true);
        Assertions.assertEquals(new AuthenticationErrorUnavailableLinkReason(create), ((ClusterLinkConnectionCheckerResult) checkAll(admin(), create, cloudLinkMetrics(), true, new Some("lkc-123")).get()).unavailableLinkReason());
    }

    @Test
    public void testCheckAuthorizationFailure() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new ClusterAuthorizationException("Not authorized"));
        DescribeClusterResult describeClusterResult = (DescribeClusterResult) Mockito.mock(DescribeClusterResult.class);
        Mockito.when(describeClusterResult.clusterId()).thenReturn(kafkaFutureImpl);
        Mockito.when(admin().describeCluster((DescribeClusterOptions) ArgumentMatchers.any())).thenReturn(describeClusterResult);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", localhostUrlAndPort());
        properties.put("client.dns.lookup", dnsLookUpConfig());
        properties.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user", "pass"));
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        ClusterLinkConfig create = clusterLinkConfig$.create(properties, none$, true);
        Assertions.assertEquals(new InvalidBootstrapInternalEndpointUnavailableLinkReason(create), ((ClusterLinkConnectionCheckerResult) checkAll(admin(), create, cloudLinkMetrics(), true, new Some("lkc-123")).get()).unavailableLinkReason());
    }

    @Test
    public void testTimeoutError() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new TimeoutException("Timed out"));
        DescribeClusterResult describeClusterResult = (DescribeClusterResult) Mockito.mock(DescribeClusterResult.class);
        Mockito.when(describeClusterResult.clusterId()).thenReturn(kafkaFutureImpl);
        Mockito.when(admin().describeCluster((DescribeClusterOptions) ArgumentMatchers.any())).thenReturn(describeClusterResult);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", confluentUrlAndPort());
        properties.put("client.dns.lookup", dnsLookUpConfig());
        properties.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user", "pass"));
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        ClusterLinkConfig create = clusterLinkConfig$.create(properties, none$, true);
        Assertions.assertEquals(new TimeoutUnavailableLinkReason(create), ((ClusterLinkConnectionCheckerResult) checkAll(admin(), create, cloudLinkMetrics(), true, new Some("lkc-123")).get()).unavailableLinkReason());
    }

    @Test
    public void testDNSResolutionFailureFromCheckAll() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new TimeoutException());
        DescribeClusterResult describeClusterResult = (DescribeClusterResult) Mockito.mock(DescribeClusterResult.class);
        Mockito.when(describeClusterResult.clusterId()).thenReturn(kafkaFutureImpl);
        Mockito.when(admin().describeCluster((DescribeClusterOptions) ArgumentMatchers.any())).thenReturn(describeClusterResult);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "www.www.askjdbasdkjsad.com:9");
        properties.put("client.dns.lookup", dnsLookUpConfig());
        properties.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user", "pass"));
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        ClusterLinkConfig create = clusterLinkConfig$.create(properties, none$, true);
        Assertions.assertEquals(new UnresolvableBootstrapUnavailableLinkReason(create), ((ClusterLinkConnectionCheckerResult) checkAll(admin(), create, cloudLinkMetrics(), true, new Some("lkc-123")).get()).unavailableLinkReason());
    }

    @Test
    public void checkIsSaslMechanismMissingFromCheckAllForCloudLink() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new TimeoutException("Timed out"));
        DescribeClusterResult describeClusterResult = (DescribeClusterResult) Mockito.mock(DescribeClusterResult.class);
        Mockito.when(describeClusterResult.clusterId()).thenReturn(kafkaFutureImpl);
        Mockito.when(admin().describeCluster((DescribeClusterOptions) ArgumentMatchers.any())).thenReturn(describeClusterResult);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", new StringBuilder(1).append(exampleUrlAndPort()).append(",").append(confluentUrlAndPort()).toString());
        properties.put("client.dns.lookup", dnsLookUpConfig());
        properties.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user", "pass"));
        properties.put("sasl.mechanism", "null");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        ClusterLinkConfig create = clusterLinkConfig$.create(properties, none$, true);
        Assertions.assertEquals(new AuthenticationErrorUnavailableLinkReason(create), ((ClusterLinkConnectionCheckerResult) checkAll(admin(), create, cloudLinkMetrics(), true, new Some("lkc-123")).get()).unavailableLinkReason());
    }

    @Test
    public void checkIsSaslJaasConfigMissingFromCheckAllForCloudLink() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new TimeoutException("Timed out"));
        DescribeClusterResult describeClusterResult = (DescribeClusterResult) Mockito.mock(DescribeClusterResult.class);
        Mockito.when(describeClusterResult.clusterId()).thenReturn(kafkaFutureImpl);
        Mockito.when(admin().describeCluster((DescribeClusterOptions) ArgumentMatchers.any())).thenReturn(describeClusterResult);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", new StringBuilder(1).append(exampleUrlAndPort()).append(",").append(confluentUrlAndPort()).toString());
        properties.put("client.dns.lookup", dnsLookUpConfig());
        properties.put("sasl.jaas.config", "null");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        ClusterLinkConfig create = clusterLinkConfig$.create(properties, none$, true);
        Assertions.assertEquals(new AuthenticationErrorUnavailableLinkReason(create), ((ClusterLinkConnectionCheckerResult) checkAll(admin(), create, cloudLinkMetrics(), true, new Some("lkc-123")).get()).unavailableLinkReason());
    }

    @Test
    public void checkIsSecurityProtocolMissingFromCheckAllForHybridLink() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new TimeoutException("Timed out"));
        DescribeClusterResult describeClusterResult = (DescribeClusterResult) Mockito.mock(DescribeClusterResult.class);
        Mockito.when(describeClusterResult.clusterId()).thenReturn(kafkaFutureImpl);
        Mockito.when(admin().describeCluster((DescribeClusterOptions) ArgumentMatchers.any())).thenReturn(describeClusterResult);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", new StringBuilder(1).append(exampleUrlAndPort()).append(",").append(confluentUrlAndPort()).toString());
        properties.put("client.dns.lookup", dnsLookUpConfig());
        properties.put("sasl.mechanism", "null");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        ClusterLinkConfig create = clusterLinkConfig$.create(properties, none$, true);
        Assertions.assertEquals(new TimeoutUnavailableLinkReason(create), ((ClusterLinkConnectionCheckerResult) checkAll(admin(), create, hybridLinkMetrics(), false, new Some("lkc-123")).get()).unavailableLinkReason());
    }

    @Test
    public void testInvalidBootstrap() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new TimeoutException());
        DescribeClusterResult describeClusterResult = (DescribeClusterResult) Mockito.mock(DescribeClusterResult.class);
        Mockito.when(describeClusterResult.clusterId()).thenReturn(kafkaFutureImpl);
        Mockito.when(admin().describeCluster((DescribeClusterOptions) ArgumentMatchers.any())).thenReturn(describeClusterResult);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "999.999.999.999:0000");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        ClusterLinkConfig create = clusterLinkConfig$.create(properties, none$, true);
        Assertions.assertEquals(new UnresolvableBootstrapUnavailableLinkReason(create), ((ClusterLinkConnectionCheckerResult) checkAll(admin(), create, cloudLinkMetrics(), true, new Some("lkc-123")).get()).unavailableLinkReason());
    }

    @Test
    public void testCheckAddressesPass() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", exampleUrlAndPort());
        properties.put("client.dns.lookup", dnsLookUpConfig());
        None$ none$ = None$.MODULE$;
        ClusterLinkConnectionChecker$ clusterLinkConnectionChecker$ = ClusterLinkConnectionChecker$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$2 = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        Assertions.assertEquals(none$, clusterLinkConnectionChecker$.checkAddresses(clusterLinkConfig$.create(properties, none$2, true), () -> {
            return false;
        }, new Some("lkc-123"))._1());
    }

    @Test
    public void testCheckAddressesFailsFromInternalPort() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9071");
        properties.put("client.dns.lookup", dnsLookUpConfig());
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        ClusterLinkConfig create = clusterLinkConfig$.create(properties, none$, true);
        Assertions.assertEquals(new Some(new InvalidBootstrapInternalEndpointUnavailableLinkReason(create)), ClusterLinkConnectionChecker$.MODULE$.checkAddresses(create, () -> {
            return false;
        }, new Some("lkc-123"))._1());
    }

    @Test
    public void testCheckAddressesPassesFromInternalPortForOnPrem() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9071");
        properties.put("client.dns.lookup", dnsLookUpConfig());
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        Assertions.assertEquals(None$.MODULE$, ClusterLinkConnectionChecker$.MODULE$.checkAddresses(clusterLinkConfig$.create(properties, none$, true), () -> {
            return false;
        }, None$.MODULE$)._1());
    }

    @Test
    public void checkIsSaslMechanismMissingForCloudLink() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", new StringBuilder(1).append(exampleUrlAndPort()).append(",").append(confluentUrlAndPort()).toString());
        properties.put("client.dns.lookup", dnsLookUpConfig());
        properties.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user", "pass"));
        properties.put("sasl.mechanism", "null");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        Assertions.assertTrue(ClusterLinkConnectionChecker$.MODULE$.isSecuritySettingsMissingForCloudLink(clusterLinkConfig$.create(properties, none$, true), Option$.MODULE$.apply(cloudLinkMetrics()), true));
    }

    @Test
    public void checkIsSaslJassConfigMissingForCloudLink() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", new StringBuilder(1).append(exampleUrlAndPort()).append(",").append(confluentUrlAndPort()).toString());
        properties.put("client.dns.lookup", dnsLookUpConfig());
        properties.put("sasl.jaas.config", "null");
        Assertions.assertTrue(ClusterLinkConnectionChecker$.MODULE$.isSecuritySettingsMissingForCloudLink(ClusterLinkConfig$.MODULE$.create(properties, None$.MODULE$, false), Option$.MODULE$.apply(cloudLinkMetrics()), true));
    }

    @Test
    public void checkIsSecurityProtocolMissingForHybridLink() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", new StringBuilder(1).append(exampleUrlAndPort()).append(",").append(confluentUrlAndPort()).toString());
        properties.put("client.dns.lookup", dnsLookUpConfig());
        properties.put("sasl.jaas.config", "null");
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$ = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        Assertions.assertFalse(ClusterLinkConnectionChecker$.MODULE$.isSecuritySettingsMissingForCloudLink(clusterLinkConfig$.create(properties, none$, true), Option$.MODULE$.apply(hybridLinkMetrics()), false));
    }

    @Test
    public void checkTcpConnectionPass() {
        ArrayList arrayList = new ArrayList();
        String str = exampleUrlAndPort().split(":")[0];
        int int$extension = StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(exampleUrlAndPort().split(":")[1]));
        arrayList.add(new InetSocketAddress(str, int$extension));
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", new StringBuilder(1).append(str).append(":").append(int$extension).toString());
        None$ none$ = None$.MODULE$;
        ClusterLinkConnectionChecker$ clusterLinkConnectionChecker$ = ClusterLinkConnectionChecker$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$ = ClusterLinkConfig$.MODULE$;
        None$ none$2 = None$.MODULE$;
        ClusterLinkConfig$ clusterLinkConfig$2 = ClusterLinkConfig$.MODULE$;
        Assertions.assertEquals(none$, clusterLinkConnectionChecker$.checkTcpConnection(arrayList, clusterLinkConfig$.create(properties, none$2, true)));
    }

    private Option<ClusterLinkConnectionCheckerResult> checkAll(ConfluentAdmin confluentAdmin, ClusterLinkConfig clusterLinkConfig, ClusterLinkMetrics clusterLinkMetrics, boolean z, Option<String> option) {
        Option apply;
        List list;
        ClusterLinkConnectionChecker$ clusterLinkConnectionChecker$ = ClusterLinkConnectionChecker$.MODULE$;
        DescribeClusterResult describeCluster = new ClusterLinkNonBatchingAdmin(() -> {
            return confluentAdmin;
        }).describeCluster();
        ClusterLinkConnectionChecker$ clusterLinkConnectionChecker$2 = ClusterLinkConnectionChecker$.MODULE$;
        Option apply2 = Option$.MODULE$.apply(clusterLinkMetrics);
        JFunction0.mcZ.sp spVar = () -> {
            return false;
        };
        try {
            describeCluster.clusterId().get();
            return Option$.MODULE$.empty();
        } catch (Throwable th) {
            ApiException apiException = ClusterLinkUtils$.MODULE$.apiException(th, "Basic describeCluster check failed");
            if (clusterLinkConnectionChecker$2.logger().underlying().isWarnEnabled()) {
                clusterLinkConnectionChecker$2.logger().underlying().warn(Logging.msgWithLogIdent$(clusterLinkConnectionChecker$2, ClusterLinkConnectionChecker$.$anonfun$check$1(th, apiException)));
            }
            if (apiException instanceof AuthenticationException) {
                return Option$.MODULE$.apply(new ClusterLinkConnectionCheckerResult(th, new AuthenticationErrorUnavailableLinkReason(clusterLinkConfig)));
            }
            if (apiException instanceof AuthorizationException) {
                return Option$.MODULE$.apply(new ClusterLinkConnectionCheckerResult(th, new InvalidBootstrapInternalEndpointUnavailableLinkReason(clusterLinkConfig)));
            }
            List bootstrapServersToConnect = clusterLinkConfig.bootstrapServersToConnect(spVar);
            String string = clusterLinkConfig.getString("client.dns.lookup");
            ClientDnsLookup forConfig = string == null ? ClientDnsLookup.USE_ALL_DNS_IPS : ClientDnsLookup.forConfig(string);
            ArrayList arrayList = new ArrayList();
            try {
                List parseAndValidateAddresses = ClientUtils.parseAndValidateAddresses(bootstrapServersToConnect, forConfig);
                CollectionConverters$.MODULE$.ListHasAsScala(parseAndValidateAddresses).asScala().foreach((v2) -> {
                    return ClusterLinkConnectionChecker$.$anonfun$checkAddresses$3(r1, r2, v2);
                });
                if (arrayList.isEmpty()) {
                    apply = None$.MODULE$;
                    list = parseAndValidateAddresses;
                } else {
                    if (clusterLinkConnectionChecker$2.logger().underlying().isWarnEnabled()) {
                        clusterLinkConnectionChecker$2.logger().underlying().warn(Logging.msgWithLogIdent$(clusterLinkConnectionChecker$2, ClusterLinkConnectionChecker$.$anonfun$checkAddresses$4(arrayList)));
                    }
                    apply = Option$.MODULE$.apply(new InvalidBootstrapInternalEndpointUnavailableLinkReason(clusterLinkConfig));
                    list = null;
                }
            } catch (ConfigException e) {
                if (clusterLinkConnectionChecker$2.logger().underlying().isWarnEnabled()) {
                    clusterLinkConnectionChecker$2.logger().underlying().warn(Logging.msgWithLogIdent$(clusterLinkConnectionChecker$2, ClusterLinkConnectionChecker$.$anonfun$checkAddresses$1(e)));
                }
                apply = Option$.MODULE$.apply(new UnresolvableBootstrapUnavailableLinkReason(clusterLinkConfig));
                list = null;
            } catch (Throwable th2) {
                if (clusterLinkConnectionChecker$2.logger().underlying().isWarnEnabled()) {
                    clusterLinkConnectionChecker$2.logger().underlying().warn(Logging.msgWithLogIdent$(clusterLinkConnectionChecker$2, ClusterLinkConnectionChecker$.$anonfun$checkAddresses$2(th2)));
                }
                apply = Option$.MODULE$.apply(new UnknownUnavailableLinkReason(clusterLinkConfig));
                list = null;
            }
            Option option2 = apply;
            List list2 = list;
            UnavailableLinkReason unavailableLinkReason = (UnavailableLinkReason) option2.getOrElse(() -> {
                return ClusterLinkConnectionChecker$.$anonfun$checkBasicNetworking$1(r1, r2);
            });
            ClusterLinkError clusterLinkError = unavailableLinkReason.clusterLinkError();
            ClusterLinkError clusterLinkError2 = ClusterLinkError.UNKNOWN;
            if (clusterLinkError != null ? clusterLinkError.equals(clusterLinkError2) : clusterLinkError2 == null) {
                if (apiException instanceof TimeoutException) {
                    return clusterLinkConnectionChecker$2.isSecuritySettingsMissingForCloudLink(clusterLinkConfig, apply2, z) ? Option$.MODULE$.apply(new ClusterLinkConnectionCheckerResult(th, new AuthenticationErrorUnavailableLinkReason(clusterLinkConfig))) : Option$.MODULE$.apply(new ClusterLinkConnectionCheckerResult(th, new TimeoutUnavailableLinkReason(clusterLinkConfig)));
                }
            }
            return Option$.MODULE$.apply(new ClusterLinkConnectionCheckerResult(th, unavailableLinkReason));
        }
    }
}
