package kafka.link;

import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import kafka.server.KafkaServer;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFactory;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.RichLong$;

/* compiled from: SourceConnectionAuthorizationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001m2AAB\u0004\u0001\u0019!)\u0011\u0003\u0001C\u0001%!9A\u0003\u0001b\u0001\n\u0003*\u0002B\u0002\u000f\u0001A\u0003%a\u0003C\u0003\u001e\u0001\u0011\u0005c\u0004C\u00030\u0001\u0011\u0005aDA\u0011T_V\u00148-Z\"p]:,7\r^5p]\u0006+H\u000f[8sSj\fG/[8o)\u0016\u001cHO\u0003\u0002\t\u0013\u0005!A.\u001b8l\u0015\u0005Q\u0011!B6bM.\f7\u0001A\n\u0003\u00015\u0001\"AD\b\u000e\u0003\u001dI!\u0001E\u0004\u00039\rcWo\u001d;fe2Kgn[!vi\"|'/\u001b>bi&|g\u000eV3ti\u00061A(\u001b8jiz\"\u0012a\u0005\t\u0003\u001d\u0001\ta$^:f'>,(oY3D_:tWm\u0019;j_:|%/[4j]\u0006$\u0018n\u001c8\u0016\u0003Y\u0001\"a\u0006\u000e\u000e\u0003aQ\u0011!G\u0001\u0006g\u000e\fG.Y\u0005\u00037a\u0011qAQ8pY\u0016\fg.A\u0010vg\u0016\u001cv.\u001e:dK\u000e{gN\\3di&|gn\u0014:jO&t\u0017\r^5p]\u0002\nQa]3u+B$\u0012a\b\t\u0003/\u0001J!!\t\r\u0003\tUs\u0017\u000e\u001e\u0015\u0003\t\r\u0002\"\u0001J\u0017\u000e\u0003\u0015R!AJ\u0014\u0002\u0007\u0005\u0004\u0018N\u0003\u0002)S\u00059!.\u001e9ji\u0016\u0014(B\u0001\u0016,\u0003\u0015QWO\\5u\u0015\u0005a\u0013aA8sO&\u0011a&\n\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017!\u000b;fgR\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\!vi\"|'/\u001b>bi&|gNR1jYV\u0014X\r\u000b\u0002\u0006cA\u0011AEM\u0005\u0003g\u0015\u0012A\u0001V3ti\"\"\u0001!\u000e\u001d:!\t!c'\u0003\u00028K\t\u0019A+Y4\u0002\u000bY\fG.^3\"\u0003i\n1\"\u001b8uK\u001e\u0014\u0018\r^5p]\u0002")
/* loaded from: input_file:kafka/link/SourceConnectionAuthorizationTest.class */
public class SourceConnectionAuthorizationTest extends ClusterLinkAuthorizationTest {
    private final boolean useSourceConnectionOrigination = true;

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public boolean useSourceConnectionOrigination() {
        return this.useSourceConnectionOrigination;
    }

    @Override // kafka.link.ClusterLinkAuthorizationTest, kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp() {
        super.setUp();
        destCluster().addAcls(destReverseConnectionAcls());
        sourceCluster().addAcls(sourceReverseConnectionAcls());
    }

    @Test
    public void testReverseConnectionAuthorizationFailure() {
        addAcls();
        sourceCluster().deleteAcls(new $colon.colon(sourceLinkUserClusterAlterAcl(), Nil$.MODULE$));
        destCluster().deleteAcls(new $colon.colon(destLinkUserClusterAlterAcl(), Nil$.MODULE$));
        prepareSourceTopic();
        UUID createClusterLink = destCluster().createClusterLink(linkName(), destLinkProps(destLinkProps$default$1()), new Some(((KafkaServer) sourceCluster().servers().head()).clusterId()));
        Assertions.assertThrows(ClusterAuthorizationException.class, () -> {
            this.sourceCluster().createClusterLink(this.linkName(), (Properties) this.sourceLinkProps(this.sourceLinkProps$default$1()).get(), new Some(((KafkaServer) this.destCluster().servers().head()).clusterId()));
        });
        ClusterLinkData clusterLinkData = new ClusterLinkData(linkName(), createClusterLink, new Some(((KafkaServer) destCluster().servers().head()).clusterId()), None$.MODULE$, false);
        Properties properties = new Properties();
        ConfigDef.convertToStringMapWithPasswordValues((Map) sourceLinkProps(sourceLinkProps$default$1()).get()).forEach((str, str2) -> {
            properties.setProperty(str, str2);
        });
        ClusterLinkConfig create = ClusterLinkConfig$.MODULE$.create(properties);
        ClusterLinkFactory.LinkManager clusterLinkManager = sourceCluster().controller().clusterLinkManager();
        clusterLinkManager.createClusterLink(clusterLinkData, create, clusterLinkManager.configEncoder().encode(properties));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testReverseConnectionAuthorizationFailure$3(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testReverseConnectionAuthorizationFailure$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        Assertions.assertEquals(0.0d, kafkaMetricMaxValue(sourceCluster().servers(), "reverse-connection-count", "cluster-link-metrics", new Some(linkName()), (scala.collection.Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), "source")})), kafkaMetricMaxValue$default$6()), 0.001d);
        sourceCluster().addAcls(new $colon.colon(sourceLinkUserClusterAlterAcl(), Nil$.MODULE$));
        destCluster().addAcls(new $colon.colon(destLinkUserClusterAlterAcl(), Nil$.MODULE$));
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testReverseConnectionAuthorizationFailure$5(this)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testReverseConnectionAuthorizationFailure$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$3 == null) {
            throw null;
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testReverseConnectionAuthorizationFailure$7(this)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + waitUntilTrue$default$33) {
                Assertions.fail($anonfun$testReverseConnectionAuthorizationFailure$8());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        destCluster().linkTopic(topic(), (short) 2, linkName(), destCluster().linkTopic$default$4());
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnectionAuthorizationFailure$3(SourceConnectionAuthorizationTest sourceConnectionAuthorizationTest) {
        return sourceConnectionAuthorizationTest.kafkaMetricMaxValue(sourceConnectionAuthorizationTest.sourceCluster().servers(), "reverse-connection-failed-total", "cluster-link-metrics", new Some(sourceConnectionAuthorizationTest.linkName()), (scala.collection.Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), "source")})), sourceConnectionAuthorizationTest.kafkaMetricMaxValue$default$6()) > ((double) 0);
    }

    public static final /* synthetic */ String $anonfun$testReverseConnectionAuthorizationFailure$4() {
        return "Connections not failed";
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnectionAuthorizationFailure$5(SourceConnectionAuthorizationTest sourceConnectionAuthorizationTest) {
        return sourceConnectionAuthorizationTest.kafkaMetricMaxValue(sourceConnectionAuthorizationTest.sourceCluster().servers(), "reverse-connection-count", "cluster-link-metrics", new Some(sourceConnectionAuthorizationTest.linkName()), (scala.collection.Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), "source")})), sourceConnectionAuthorizationTest.kafkaMetricMaxValue$default$6()) > ((double) 0);
    }

    public static final /* synthetic */ String $anonfun$testReverseConnectionAuthorizationFailure$6() {
        return "Connections not created on source";
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnectionAuthorizationFailure$7(SourceConnectionAuthorizationTest sourceConnectionAuthorizationTest) {
        return sourceConnectionAuthorizationTest.kafkaMetricMaxValue(sourceConnectionAuthorizationTest.destCluster().servers(), "reverse-connection-count", "cluster-link-metrics", new Some(sourceConnectionAuthorizationTest.linkName()), (scala.collection.Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), "destination")})), sourceConnectionAuthorizationTest.kafkaMetricMaxValue$default$6()) > ((double) 0);
    }

    public static final /* synthetic */ String $anonfun$testReverseConnectionAuthorizationFailure$8() {
        return "Connections not created on destination";
    }
}
