package kafka.link;

import java.util.Map;
import java.util.Properties;
import kafka.server.KafkaBroker;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ConnectionMode$Inbound$;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.resource.ResourceType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.RichLong$;

/* compiled from: SourceInitiatedLinkAuthorizationTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001y4A\u0001C\u0005\u0001\u001d!)1\u0003\u0001C\u0001)!)a\u0003\u0001C!/!)\u0001\u0007\u0001C!c!)Q\f\u0001C!=\")A\r\u0001C!K\")1\u000e\u0001C!Y\")!\u000f\u0001C\u0001g\n!3k\\;sG\u0016Le.\u001b;jCR,G\rT5oW\u0006+H\u000f[8sSj\fG/[8o)\u0016\u001cHO\u0003\u0002\u000b\u0017\u0005!A.\u001b8l\u0015\u0005a\u0011!B6bM.\f7\u0001A\n\u0003\u0001=\u0001\"\u0001E\t\u000e\u0003%I!AE\u0005\u00039\rcWo\u001d;fe2Kgn[!vi\"|'/\u001b>bi&|g\u000eV3ti\u00061A(\u001b8jiz\"\u0012!\u0006\t\u0003!\u0001\tQa]3u+B$\"\u0001\u0007\u0010\u0011\u0005eaR\"\u0001\u000e\u000b\u0003m\tQa]2bY\u0006L!!\b\u000e\u0003\tUs\u0017\u000e\u001e\u0005\u0006?\t\u0001\r\u0001I\u0001\ti\u0016\u001cH/\u00138g_B\u0011\u0011EK\u0007\u0002E)\u00111\u0005J\u0001\u0004CBL'BA\u0013'\u0003\u001dQW\u000f]5uKJT!a\n\u0015\u0002\u000b),h.\u001b;\u000b\u0003%\n1a\u001c:h\u0013\tY#E\u0001\u0005UKN$\u0018J\u001c4pQ\t\u0011Q\u0006\u0005\u0002\"]%\u0011qF\t\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017a\f;fgR\u0014\u0015\u000eR5sK\u000e$\u0018n\u001c8bY2Kgn\u001b#fg\u000e\u0014\u0018NY3D_:4\u0017nZ:QKJl\u0017n]:j_:\u001cHC\u0001\r3\u0011\u0015\u00194\u00011\u00015\u0003\u0019\tXo\u001c:v[B\u0011Q\u0007\u0010\b\u0003mi\u0002\"a\u000e\u000e\u000e\u0003aR!!O\u0007\u0002\rq\u0012xn\u001c;?\u0013\tY$$\u0001\u0004Qe\u0016$WMZ\u0005\u0003{y\u0012aa\u0015;sS:<'BA\u001e\u001bQ\u0011\u0019\u0001\tS%\u0011\u0005\u00053U\"\u0001\"\u000b\u0005\r#\u0015\u0001\u00039s_ZLG-\u001a:\u000b\u0005\u0015#\u0013A\u00029be\u0006l7/\u0003\u0002H\u0005\nYa+\u00197vKN{WO]2f\u0003\u001d\u0019HO]5oONd#A\u0013'\"\u0003-\u000b!A_6\"\u00035\u000bQa\u001b:bMRDCaA(T)B\u0011\u0001+U\u0007\u0002\t&\u0011!\u000b\u0012\u0002\u0012!\u0006\u0014\u0018-\\3uKJL'0\u001a3UKN$\u0018\u0001\u00028b[\u0016\f\u0013!V\u0001\u0019w\u0012L7\u000f\u001d7bs:\u000bW.Z?/cV|'/^7>wBj\b\u0006B\u0002X5n\u0003\"!\t-\n\u0005e\u0013#\u0001\u0003#jg\u0006\u0014G.\u001a3\u0002\u000bY\fG.^3\"\u0003q\u000b1\u0005R5tC\ndW\r\u001a\u0011g_J\u00043o\\;sG\u0016\u0004\u0013N\\5uS\u0006$X\r\u001a\u0011mS:\\7/\u0001\u0010uKN$\u0018i\u00197Ts:\u001cG+Y:l'R\fG/Z'b]\u0006<W-\\3oiR\u0011\u0001d\u0018\u0005\u0006g\u0011\u0001\r\u0001\u000e\u0015\u0005\t\u0001C\u0015\r\f\u0002K\u0019\"\"AaT*UQ\u0011!qKW.\u0002qQ,7\u000f^!dYNKhn\u0019+bg.\u001cF/\u0019;f\u001b\u0006t\u0017mZ3nK:$\b*\u00198eY\u0016\u001c(I]8lKJ\fU\u000f\u001e5pe&T\u0018\r^5p]R\u0011\u0001D\u001a\u0005\u0006g\u0015\u0001\r\u0001\u000e\u0015\u0005\u000b\u0001C\u0005\u000eL\u0001KQ\u0011)qj\u0015+)\t\u00159&lW\u0001!i\u0016\u001cH/Q;uQ>\u0014\u0018N_1uS>tgi\u001c:BG2l\u0015n\u001a:bi&|g\u000e\u0006\u0002\u0019[\")1G\u0002a\u0001i!\"a\u0001\u0011%pY\tQE\n\u000b\u0003\u0007\u001fN#\u0006\u0006\u0002\u0004X5n\u000b\u0011\u0006^3tiJ+g/\u001a:tK\u000e{gN\\3di&|g.Q;uQ>\u0014\u0018N_1uS>tg)Y5mkJ,GC\u0001\ru\u0011\u0015\u0019t\u00011\u00015Q\u00119\u0001\t\u0013<-\u0003)CCaB(T)\"\"\u0001!\u001f.}!\t\t#0\u0003\u0002|E\t\u0019A+Y4\"\u0003u\f1\"\u001b8uK\u001e\u0014\u0018\r^5p]\u0002")
/* loaded from: input_file:kafka/link/SourceInitiatedLinkAuthorizationTest.class */
public class SourceInitiatedLinkAuthorizationTest extends ClusterLinkAuthorizationTest {
    @Override // kafka.link.ClusterLinkAuthorizationTest, kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        super.setUp(testInfo);
        AclBinding aclBinding = aclBinding(brokerUser(), ResourceType.TOPIC, "_confluent-link-metadata", AclOperation.CREATE);
        destCluster().addAcls((Seq) destReverseConnectionAcls().$plus$plus(new $colon.colon(aclBinding, Nil$.MODULE$), Seq$.MODULE$.canBuildFrom()));
        sourceCluster().addAcls((Seq) sourceReverseConnectionAcls().$plus$plus(new $colon.colon(aclBinding, Nil$.MODULE$), Seq$.MODULE$.canBuildFrom()));
    }

    @Override // kafka.link.ClusterLinkAuthorizationTest
    @Disabled("Disabled for source initiated links")
    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testBiDirectionalLinkDescribeConfigsPermissions(String str) {
    }

    @Override // kafka.link.ClusterLinkAuthorizationTest
    @Disabled("Disabled for source initiated links")
    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAclSyncTaskStateManagement(String str) {
    }

    @Override // kafka.link.ClusterLinkAuthorizationTest
    @Disabled("Disabled for source initiated links")
    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAclSyncTaskStateManagementHandlesBrokerAuthorization(String str) {
    }

    @Override // kafka.link.ClusterLinkAuthorizationTest
    @Disabled("Disabled for source initiated links")
    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testAuthorizationForAclMigration(String str) {
    }

    @ValueSource(strings = {"zk"})
    @ParameterizedTest(name = "{displayName}.quorum={0}")
    public void testReverseConnectionAuthorizationFailure(String str) {
        addAcls();
        sourceCluster().deleteAcls(new $colon.colon(sourceLinkUserClusterAlterAcl(), Nil$.MODULE$));
        destCluster().deleteAcls(new $colon.colon(destLinkUserClusterAlterAcl(), Nil$.MODULE$));
        prepareSourceTopic();
        ClusterLinkTestHarness destCluster = destCluster();
        Uuid createClusterLink = destCluster.createClusterLink(linkName(), destLinkProps(destLinkProps$default$1()), new Some(((KafkaBroker) sourceCluster().brokers().head()).clusterId()), destCluster.createClusterLink$default$4());
        Assertions.assertThrows(ClusterAuthorizationException.class, () -> {
            ClusterLinkTestHarness sourceCluster = this.sourceCluster();
            sourceCluster.createClusterLink(this.linkName(), (Properties) this.sourceLinkProps(this.sourceLinkProps$default$1()).get(), new Some(((KafkaBroker) this.destCluster().brokers().head()).clusterId()), sourceCluster.createClusterLink$default$4());
        });
        ClusterLinkData clusterLinkData = new ClusterLinkData(linkName(), createClusterLink, new Some(((KafkaBroker) destCluster().brokers().head()).clusterId()), None$.MODULE$, false);
        Properties properties = new Properties();
        ConfigDef.convertToStringMapWithPasswordValues((Map) sourceLinkProps(sourceLinkProps$default$1()).get()).forEach((str2, str3) -> {
            properties.setProperty(str2, str3);
        });
        if (useBidirectionalLink()) {
            properties.setProperty(ClusterLinkConfig$.MODULE$.RemoteLinkConnectionModeProp(), ConnectionMode$Inbound$.MODULE$.name());
        }
        ClusterLinkConfig create = ClusterLinkConfig$.MODULE$.create(properties, None$.MODULE$, ClusterLinkConfig$.MODULE$.create$default$3());
        ClusterLinkFactory.LinkManager clusterLinkManager = sourceCluster().controller().clusterLinkManager();
        clusterLinkManager.createClusterLink(clusterLinkData, create, clusterLinkManager.configEncoder().encode(properties));
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testReverseConnectionAuthorizationFailure$3(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testReverseConnectionAuthorizationFailure$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        Assertions.assertEquals(0.0d, kafkaMetricMaxValue(sourceCluster().brokers(), "reverse-connection-count", "cluster-link-metrics", new Some(linkName()), (scala.collection.Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), sourceLinkMode().lowerCaseName())})), kafkaMetricMaxValue$default$6(), kafkaMetricMaxValue$default$7(), kafkaMetricMaxValue$default$8()), 0.001d);
        sourceCluster().addAcls(new $colon.colon(sourceLinkUserClusterAlterAcl(), Nil$.MODULE$));
        destCluster().addAcls(new $colon.colon(destLinkUserClusterAlterAcl(), Nil$.MODULE$));
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testReverseConnectionAuthorizationFailure$5(this)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testReverseConnectionAuthorizationFailure$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$3 == null) {
            throw null;
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$testReverseConnectionAuthorizationFailure$7(this)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + waitUntilTrue$default$33) {
                Assertions.fail($anonfun$testReverseConnectionAuthorizationFailure$8());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        ClusterLinkTestHarness destCluster2 = destCluster();
        destCluster2.linkTopic(topic(), replicationFactor(), linkName(), destCluster2.linkTopic$default$4(), destCluster2.linkTopic$default$5());
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnectionAuthorizationFailure$3(SourceInitiatedLinkAuthorizationTest sourceInitiatedLinkAuthorizationTest) {
        return sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue(sourceInitiatedLinkAuthorizationTest.sourceCluster().brokers(), "reverse-connection-failed-total", "cluster-link-metrics", new Some(sourceInitiatedLinkAuthorizationTest.linkName()), (scala.collection.Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), sourceInitiatedLinkAuthorizationTest.sourceLinkMode().lowerCaseName())})), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$6(), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$7(), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$8()) > ((double) 0);
    }

    public static final /* synthetic */ String $anonfun$testReverseConnectionAuthorizationFailure$4() {
        return "Connections not failed";
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnectionAuthorizationFailure$5(SourceInitiatedLinkAuthorizationTest sourceInitiatedLinkAuthorizationTest) {
        return sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue(sourceInitiatedLinkAuthorizationTest.sourceCluster().brokers(), "reverse-connection-count", "cluster-link-metrics", new Some(sourceInitiatedLinkAuthorizationTest.linkName()), (scala.collection.Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), sourceInitiatedLinkAuthorizationTest.sourceLinkMode().lowerCaseName())})), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$6(), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$7(), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$8()) > ((double) 0);
    }

    public static final /* synthetic */ String $anonfun$testReverseConnectionAuthorizationFailure$6() {
        return "Connections not created on source";
    }

    public static final /* synthetic */ boolean $anonfun$testReverseConnectionAuthorizationFailure$7(SourceInitiatedLinkAuthorizationTest sourceInitiatedLinkAuthorizationTest) {
        return sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue(sourceInitiatedLinkAuthorizationTest.destCluster().brokers(), "reverse-connection-count", "cluster-link-metrics", new Some(sourceInitiatedLinkAuthorizationTest.linkName()), (scala.collection.Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("mode"), sourceInitiatedLinkAuthorizationTest.destinationLinkMode().lowerCaseName())})), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$6(), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$7(), sourceInitiatedLinkAuthorizationTest.kafkaMetricMaxValue$default$8()) > ((double) 0);
    }

    public static final /* synthetic */ String $anonfun$testReverseConnectionAuthorizationFailure$8() {
        return "Connections not created on destination";
    }

    public SourceInitiatedLinkAuthorizationTest() {
        useSourceInitiatedLink_$eq(true);
    }
}
