package kafka.link;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkIbp26Test.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0005\u0005\u0015a\u0001B\n\u0015\u0001eAQA\b\u0001\u0005\u0002}Aq!\t\u0001C\u0002\u0013\u0005#\u0005\u0003\u0004'\u0001\u0001\u0006Ia\t\u0005\bO\u0001\u0011\r\u0011\"\u0011#\u0011\u0019A\u0003\u0001)A\u0005G!9\u0011\u0006\u0001b\u0001\n\u0003R\u0003BB\u0019\u0001A\u0003%1\u0006C\u00053\u0001\u0001\u0007\t\u0011)Q\u0005g!)q\b\u0001C!\u0001\")!\n\u0001C\u0001\u0017\")\u0001\u000b\u0001C\u0001\u0017\")!\u000b\u0001C\u0001\u0017\")A\u000b\u0001C\u0001\u0017\")a\u000b\u0001C\u0001\u0017\")\u0001\f\u0001C\u0001\u0017\")!\f\u0001C\u00057\"9a\u000eAI\u0001\n\u0013y\u0007b\u0002>\u0001#\u0003%Ia\u001c\u0002\u0015\u00072,8\u000f^3s\u0019&t7.\u00132qeY\"Vm\u001d;\u000b\u0005U1\u0012\u0001\u00027j].T\u0011aF\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001!\u0004\u0005\u0002\u001c95\tA#\u0003\u0002\u001e)\t\u0011\u0013IY:ue\u0006\u001cGo\u00117vgR,'\u000fT5oW&sG/Z4sCRLwN\u001c+fgR\fa\u0001P5oSRtD#\u0001\u0011\u0011\u0005m\u0001\u0011!D:pkJ\u001cWm\u00117vgR,'/F\u0001$!\tYB%\u0003\u0002&)\t12\t\\;ti\u0016\u0014H*\u001b8l)\u0016\u001cH\u000fS1s]\u0016\u001c8/\u0001\bt_V\u00148-Z\"mkN$XM\u001d\u0011\u0002\u0017\u0011,7\u000f^\"mkN$XM]\u0001\rI\u0016\u001cHo\u00117vgR,'\u000fI\u0001\u0012e\u0016\u0004H.[2bi&|gNR1di>\u0014X#A\u0016\u0011\u00051zS\"A\u0017\u000b\u00039\nQa]2bY\u0006L!\u0001M\u0017\u0003\u000bMCwN\u001d;\u0002%I,\u0007\u000f\\5dCRLwN\u001c$bGR|'\u000fI\u0001\n?R,7\u000f^%oM>\u0004\"\u0001N\u001f\u000e\u0003UR!AN\u001c\u0002\u0007\u0005\u0004\u0018N\u0003\u00029s\u00059!.\u001e9ji\u0016\u0014(B\u0001\u001e<\u0003\u0015QWO\\5u\u0015\u0005a\u0014aA8sO&\u0011a(\u000e\u0002\t)\u0016\u001cH/\u00138g_\u0006)1/\u001a;VaR\u0011\u0011\t\u0012\t\u0003Y\tK!aQ\u0017\u0003\tUs\u0017\u000e\u001e\u0005\u0006\u000b&\u0001\raM\u0001\ti\u0016\u001cH/\u00138g_\"\u0012\u0011b\u0012\t\u0003i!K!!S\u001b\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007.A\u000euKN$8k\\;sG\u0016\u001c\u0005.\u00198hKB\u0013x\u000e]1hCRLwN\u001c\u000b\u0002\u0003\"\u0012!\"\u0014\t\u0003i9K!aT\u001b\u0003\tQ+7\u000f^\u00012i\u0016\u001cH/\u00168dY\u0016\fgnU8ve\u000e,G*Z1eKJ,E.Z2uS>tw+\u001b;i\t\u0016\u001cH/\u00129pG\"\f\u0005.Z1eQ\tYQ*\u0001\u0016uKN$X*\u001b:s_J<\u0016\u000e\u001e5T_V\u00148-\u001a$bS2,(/Z:XSRDw\n\u001c3EKN$\u0018\n\u001d2)\u00051i\u0015\u0001\f;fgRl\u0015N\u001d:pe^KG\u000f[*pkJ\u001cWMR1jYV\u0014Xm],ji\"|E\u000eZ*pkJ\u001cW-\u00139cQ\tiQ*A\u0016uKN$8k\\;sG\u0016$v\u000e]5d%\u0016\u001c'/Z1uK\u0012+G/Z2uK\u0012|e\u000e\u0016:v]\u000e\fG/[8oQ\tqQ*\u0001\nuKN$8)\u001b:dk2\f'/T5se>\u0014\bFA\bN\u00035\u0019X\r^+q\u00072,8\u000f^3sgR\u0019\u0011\t\u00187\t\u000fu\u0003\u0002\u0013!a\u0001=\u00069A-Z:u\u0013\n\u0004\bc\u0001\u0017`C&\u0011\u0001-\f\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0005\tLgBA2h!\t!W&D\u0001f\u0015\t1\u0007$\u0001\u0004=e>|GOP\u0005\u0003Q6\na\u0001\u0015:fI\u00164\u0017B\u00016l\u0005\u0019\u0019FO]5oO*\u0011\u0001.\f\u0005\b[B\u0001\n\u00111\u0001_\u0003%\u0019x.\u001e:dK&\u0013\u0007/A\ftKR,\u0006o\u00117vgR,'o\u001d\u0013eK\u001a\fW\u000f\u001c;%cU\t\u0001O\u000b\u0002_c.\n!\u000f\u0005\u0002tq6\tAO\u0003\u0002vm\u0006IQO\\2iK\u000e\\W\r\u001a\u0006\u0003o6\n!\"\u00198o_R\fG/[8o\u0013\tIHOA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\fqc]3u+B\u001cE.^:uKJ\u001cH\u0005Z3gCVdG\u000f\n\u001a)\u000b\u0001ax0!\u0001\u0011\u0005Qj\u0018B\u0001@6\u0005\r!\u0016mZ\u0001\u0006m\u0006dW/Z\u0011\u0003\u0003\u0007\t1\"\u001b8uK\u001e\u0014\u0018\r^5p]\u0002")
/* loaded from: input_file:kafka/link/ClusterLinkIbp26Test.class */
public class ClusterLinkIbp26Test extends AbstractClusterLinkIntegrationTest {
    private final ClusterLinkTestHarness sourceCluster;
    private final ClusterLinkTestHarness destCluster;
    private final short replicationFactor;
    private TestInfo _testInfo;

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public ClusterLinkTestHarness sourceCluster() {
        return this.sourceCluster;
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public ClusterLinkTestHarness destCluster() {
        return this.destCluster;
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public short replicationFactor() {
        return this.replicationFactor;
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
    }

    @Test
    public void testSourceChangePropagation() {
        setUpClusters(new Some("2.6"), None$.MODULE$);
        destCluster().createDestClusterLink(linkName(), sourceCluster(), destCluster().createDestClusterLink$default$3(), 10000L, destCluster().createDestClusterLink$default$5(), destCluster().createDestClusterLink$default$6());
        verifyMirrorWithSourceEpochChanges(false);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.ACTIVE, waitUntilMirrorDescriptionState$default$2());
        sourceCluster().deleteTopic(topic(), false);
        waitUntilMirrorDescriptionState(MirrorTopicDescription.State.FAILED, waitUntilMirrorDescriptionState$default$2());
    }

    @Test
    public void testUncleanSourceLeaderElectionWithDestEpochAhead() {
        setUpClusters(new Some("2.6"), None$.MODULE$);
        destCluster().createDestClusterLink(linkName(), sourceCluster(), destCluster().createDestClusterLink$default$3(), destCluster().createDestClusterLink$default$4(), destCluster().createDestClusterLink$default$5(), destCluster().createDestClusterLink$default$6());
        verifyMirrorWithSourceEpochChanges(true);
        verifyLinkedLeaderChangeMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
    }

    @Test
    public void testMirrorWithSourceFailuresWithOldDestIpb() {
        setUpClusters(new Some("2.6"), None$.MODULE$);
        int i = 10;
        createLinkAndMirrorWithPartialReplicationAndShutdownSource(createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$1(), 10, createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$3());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        KafkaServer partitionLeader = destCluster().partitionLeader(topicPartition);
        waitForMirror(new $colon.colon(partitionLeader, Nil$.MODULE$), waitForMirror$default$2(), waitForMirror$default$3());
        ((IterableOnceOps) destCluster().servers().filterNot(kafkaServer -> {
            return BoxesRunTime.boxToBoolean($anonfun$testMirrorWithSourceFailuresWithOldDestIpb$1(partitionLeader, kafkaServer));
        })).foreach(kafkaServer2 -> {
            $anonfun$testMirrorWithSourceFailuresWithOldDestIpb$2(this, i, topicPartition, kafkaServer2);
            return BoxedUnit.UNIT;
        });
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), false, destCluster().unlinkTopic$default$5(), destCluster().unlinkTopic$default$6());
    }

    @Test
    public void testMirrorWithSourceFailuresWithOldSourceIpb() {
        setUpClusters(None$.MODULE$, new Some("2.4"));
        createLinkAndMirrorWithPartialReplicationAndShutdownSource(new Properties(), createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$2(), createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$3());
        verifyMirror(topic(), verifyMirror$default$2(), false, verifyMirror$default$4());
    }

    @Test
    public void testSourceTopicRecreateDetectedOnTruncation() {
        numPartitions_$eq(1);
        setUpClusters(new Some("2.6"), None$.MODULE$);
        ConfluentAdmin createAdminClient = destCluster().createAdminClient(destCluster().createAdminClient$default$1());
        setupLinkAndMirrorForFailureTest(20000L, 60000L, "testGroup");
        produceToSourceCluster(20);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2(), waitForMirror$default$3());
        sourceCluster().deleteTopic(topic());
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4(), sourceCluster().createTopic$default$5());
        produceToSourceCluster(5);
        truncate(5);
        waitForFailure(createAdminClient, FailureType$SourceTopicDelete$.MODULE$);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2(), waitForMirror$default$3());
        destCluster().killAllBrokers();
        waitForFailure((ConfluentAdmin) restartCluster(destCluster(), !useSourceInitiatedLink()).get(), FailureType$SourceTopicDelete$.MODULE$);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3(), verifyMirror$default$4());
    }

    @Test
    public void testCircularMirror() {
        setUpClusters(None$.MODULE$, new Some("2.6"));
        numPartitions_$eq(1);
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        Properties properties = new Properties();
        properties.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        UUID createDestClusterLink = destCluster().createDestClusterLink(linkName(), sourceCluster(), destCluster().createDestClusterLink$default$3(), destCluster().createDestClusterLink$default$4(), destCluster().createDestClusterLink$default$5(), properties);
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", destCluster().brokerList());
        properties2.putAll(destCluster().clientSecurityProps(linkName()));
        properties2.put("sasl.jaas.config", createLinkCredentials(linkName(), destCluster(), createLinkCredentials$default$3()));
        UUID createDestClusterLink2 = sourceCluster().createDestClusterLink(linkName(), destCluster(), sourceCluster().createDestClusterLink$default$3(), sourceCluster().createDestClusterLink$default$4(), sourceCluster().createDestClusterLink$default$5(), properties);
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4(), sourceCluster().createTopic$default$5());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4(), destCluster().linkTopic$default$5());
        sourceCluster().deleteTopic(topic());
        sourceCluster().linkTopic(topic(), replicationFactor(), linkName(), sourceCluster().linkTopic$default$4(), sourceCluster().linkTopic$default$5());
        ConfluentAdmin createAdminClient = sourceCluster().createAdminClient(sourceCluster().createAdminClient$default$1());
        ConfluentAdmin createAdminClient2 = destCluster().createAdminClient(destCluster().createAdminClient$default$1());
        waitForReplicaState$1(createAdminClient, FailureType$CircularMirror$.MODULE$.replicaStatusStates());
        waitForReplicaState$1(createAdminClient2, FailureType$CircularMirror$.MODULE$.replicaStatusStates());
        waitForBlockedPartition$1(createDestClusterLink2, createDestClusterLink, topicPartition);
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), false, destCluster().unlinkTopic$default$5(), destCluster().unlinkTopic$default$6());
        destCluster().verifyTopicWritable(topic(), numPartitions());
        KafkaProducer<byte[], byte[]> createProducer = destCluster().createProducer(destCluster().createProducer$default$1(), destCluster().createProducer$default$2(), destCluster().createProducer$default$3());
        produceRecords(createProducer, topic(), 20, produceRecords$default$4());
        createProducer.close();
        waitForMirror(sourceCluster().servers(), waitForMirror$default$2(), waitForMirror$default$3());
        waitForReplicaState$1(createAdminClient, (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new ReplicaStatus.MirrorInfo.State[]{ReplicaStatus.MirrorInfo.State.ACTIVE})));
    }

    private void setUpClusters(Option<String> option, Option<String> option2) {
        option.foreach(str -> {
            return this.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), str);
        });
        option2.foreach(str2 -> {
            return this.sourceCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), str2);
        });
        super.setUp(this._testInfo);
    }

    private Option<String> setUpClusters$default$1() {
        return None$.MODULE$;
    }

    private Option<String> setUpClusters$default$2() {
        return None$.MODULE$;
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithSourceFailuresWithOldDestIpb$1(KafkaServer kafkaServer, KafkaServer kafkaServer2) {
        return kafkaServer2 == null ? kafkaServer == null : kafkaServer2.equals(kafkaServer);
    }

    public static final /* synthetic */ void $anonfun$testMirrorWithSourceFailuresWithOldDestIpb$2(ClusterLinkIbp26Test clusterLinkIbp26Test, int i, TopicPartition topicPartition, KafkaServer kafkaServer) {
        Assertions.assertEquals(new Some(BoxesRunTime.boxToInteger(i)), clusterLinkIbp26Test.logEndOffset(kafkaServer, topicPartition));
    }

    public static final /* synthetic */ boolean $anonfun$testCircularMirror$2(Set set, Set set2) {
        return set2.subsetOf(set);
    }

    private final void waitForReplicaState$1(ConfluentAdmin confluentAdmin, Set set) {
        Tuple2 $minus$greater$extension;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            Set<ReplicaStatus.MirrorInfo.State> mirrorPartitionStates = mirrorPartitionStates(confluentAdmin);
            if (mirrorPartitionStates.subsetOf(set)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(mirrorPartitionStates), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(mirrorPartitionStates), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Set set2 = (Set) tuple2._1();
        Assertions.assertTrue(set2.subsetOf(set), new StringBuilder(25).append("Expected subset of ").append(set).append(", got ").append(set2).toString());
    }

    private static final ConcurrentHashMap waitingPartitions$1(ClusterLinkTestHarness clusterLinkTestHarness, UUID uuid, TopicPartition topicPartition) {
        return (ConcurrentHashMap) TestUtils.fieldValue(clusterLinkTestHarness.partitionLeader(topicPartition).clusterLinkManager().fetcherManager(uuid).get(), ClusterLinkFetcherManager.class, "waitingPartitions");
    }

    public static final /* synthetic */ boolean $anonfun$testCircularMirror$3(ConcurrentHashMap concurrentHashMap, TopicPartition topicPartition, ConcurrentHashMap concurrentHashMap2) {
        return concurrentHashMap.containsKey(topicPartition) || concurrentHashMap2.containsKey(topicPartition);
    }

    public static final /* synthetic */ String $anonfun$testCircularMirror$4() {
        return "Partition not blocked after consecutive epoch bumps";
    }

    private final void waitForBlockedPartition$1(UUID uuid, UUID uuid2, TopicPartition topicPartition) {
        ConcurrentHashMap waitingPartitions$1 = waitingPartitions$1(sourceCluster(), uuid, topicPartition);
        ConcurrentHashMap waitingPartitions$12 = waitingPartitions$1(destCluster(), uuid2, topicPartition);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testCircularMirror$3(waitingPartitions$1, topicPartition, waitingPartitions$12)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Partition not blocked after consecutive epoch bumps");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    public ClusterLinkIbp26Test() {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        ClusterLinkTestHarness$ clusterLinkTestHarness$ = ClusterLinkTestHarness$.MODULE$;
        this.sourceCluster = new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, None$.MODULE$, 0, 3);
        SecurityProtocol securityProtocol2 = SecurityProtocol.SASL_PLAINTEXT;
        ClusterLinkTestHarness$ clusterLinkTestHarness$2 = ClusterLinkTestHarness$.MODULE$;
        this.destCluster = new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, None$.MODULE$, 100, 3);
        this.replicationFactor = (short) 3;
    }
}
