package kafka.link;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFetcherManager;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.Seq;
import scala.collection.immutable.Set;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ClusterLinkIbp26Test.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0005i4AAE\n\u00011!)Q\u0004\u0001C\u0001=!9\u0001\u0005\u0001b\u0001\n\u0003\n\u0003BB\u0013\u0001A\u0003%!\u0005C\u0004'\u0001\t\u0007I\u0011I\u0011\t\r\u001d\u0002\u0001\u0015!\u0003#\u0011\u001dA\u0003A1A\u0005B%Ba\u0001\r\u0001!\u0002\u0013Q\u0003\"B\u0019\u0001\t\u0003\u0012\u0004\"B\"\u0001\t\u0003\u0011\u0004\"\u0002%\u0001\t\u0003\u0011\u0004\"\u0002&\u0001\t\u0003\u0011\u0004\"\u0002'\u0001\t\u0003\u0011\u0004\"\u0002(\u0001\t\u0003\u0011\u0004\"\u0002)\u0001\t\u0003\u0011\u0004\"\u0002*\u0001\t\u0013\u0019\u0006b\u00024\u0001#\u0003%Ia\u001a\u0005\be\u0002\t\n\u0011\"\u0003h\u0005Q\u0019E.^:uKJd\u0015N\\6JEB\u0014d\u0007V3ti*\u0011A#F\u0001\u0005Y&t7NC\u0001\u0017\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\r\u0011\u0005iYR\"A\n\n\u0005q\u0019\"AI!cgR\u0014\u0018m\u0019;DYV\u001cH/\u001a:MS:\\\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cH/\u0001\u0004=S:LGO\u0010\u000b\u0002?A\u0011!\u0004A\u0001\u000eg>,(oY3DYV\u001cH/\u001a:\u0016\u0003\t\u0002\"AG\u0012\n\u0005\u0011\u001a\"AF\"mkN$XM\u001d'j].$Vm\u001d;ICJtWm]:\u0002\u001dM|WO]2f\u00072,8\u000f^3sA\u0005YA-Z:u\u00072,8\u000f^3s\u00031!Wm\u001d;DYV\u001cH/\u001a:!\u0003E\u0011X\r\u001d7jG\u0006$\u0018n\u001c8GC\u000e$xN]\u000b\u0002UA\u00111FL\u0007\u0002Y)\tQ&A\u0003tG\u0006d\u0017-\u0003\u00020Y\t)1\u000b[8si\u0006\u0011\"/\u001a9mS\u000e\fG/[8o\r\u0006\u001cGo\u001c:!\u0003\u0015\u0019X\r^+q)\u0005\u0019\u0004CA\u00165\u0013\t)DF\u0001\u0003V]&$\bF\u0001\u00058!\tA\u0014)D\u0001:\u0015\tQ4(A\u0002ba&T!\u0001P\u001f\u0002\u000f),\b/\u001b;fe*\u0011ahP\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0002\u0001\u0006\u0019qN]4\n\u0005\tK$A\u0003\"fM>\u0014X-R1dQ\u0006YB/Z:u'>,(oY3DQ\u0006tw-\u001a)s_B\fw-\u0019;j_:D#!C#\u0011\u0005a2\u0015BA$:\u0005\u0011!Vm\u001d;\u0002cQ,7\u000f^+oG2,\u0017M\\*pkJ\u001cW\rT3bI\u0016\u0014X\t\\3di&|gnV5uQ\u0012+7\u000f^#q_\u000eD\u0017\t[3bI\"\u0012!\"R\u0001+i\u0016\u001cH/T5se>\u0014x+\u001b;i'>,(oY3GC&dWO]3t/&$\bn\u00147e\t\u0016\u001cH/\u00139cQ\tYQ)\u0001\u0017uKN$X*\u001b:s_J<\u0016\u000e\u001e5T_V\u00148-\u001a$bS2,(/Z:XSRDw\n\u001c3T_V\u00148-Z%qE\"\u0012A\"R\u0001,i\u0016\u001cHoU8ve\u000e,Gk\u001c9jGJ+7M]3bi\u0016$U\r^3di\u0016$wJ\u001c+sk:\u001c\u0017\r^5p]\"\u0012Q\"R\u0001\u0013i\u0016\u001cHoQ5sGVd\u0017M]'jeJ|'\u000f\u000b\u0002\u000f\u000b\u0006i1/\u001a;Va\u000ecWo\u001d;feN$2a\r+e\u0011\u001d)v\u0002%AA\u0002Y\u000bq\u0001Z3ti&\u0013\u0007\u000fE\u0002,/fK!\u0001\u0017\u0017\u0003\r=\u0003H/[8o!\tQ\u0016M\u0004\u0002\\?B\u0011A\fL\u0007\u0002;*\u0011alF\u0001\u0007yI|w\u000e\u001e \n\u0005\u0001d\u0013A\u0002)sK\u0012,g-\u0003\u0002cG\n11\u000b\u001e:j]\u001eT!\u0001\u0019\u0017\t\u000f\u0015|\u0001\u0013!a\u0001-\u0006I1o\\;sG\u0016L%\r]\u0001\u0018g\u0016$X\u000b]\"mkN$XM]:%I\u00164\u0017-\u001e7uIE*\u0012\u0001\u001b\u0016\u0003-&\\\u0013A\u001b\t\u0003WBl\u0011\u0001\u001c\u0006\u0003[:\f\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0005=d\u0013AC1o]>$\u0018\r^5p]&\u0011\u0011\u000f\u001c\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017aF:fiV\u00038\t\\;ti\u0016\u00148\u000f\n3fM\u0006,H\u000e\u001e\u00133Q\u0011\u0001Ao\u001e=\u0011\u0005a*\u0018B\u0001<:\u0005\r!\u0016mZ\u0001\u0006m\u0006dW/Z\u0011\u0002s\u0006Y\u0011N\u001c;fOJ\fG/[8o\u0001")
/* loaded from: input_file:kafka/link/ClusterLinkIbp26Test.class */
public class ClusterLinkIbp26Test extends AbstractClusterLinkIntegrationTest {
    private final ClusterLinkTestHarness sourceCluster;
    private final ClusterLinkTestHarness destCluster;
    private final short replicationFactor;

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public ClusterLinkTestHarness sourceCluster() {
        return this.sourceCluster;
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public ClusterLinkTestHarness destCluster() {
        return this.destCluster;
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    public short replicationFactor() {
        return this.replicationFactor;
    }

    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp() {
    }

    @Test
    public void testSourceChangePropagation() {
        setUpClusters(new Some("2.6"), None$.MODULE$);
        destCluster().createDestClusterLink(linkName(), sourceCluster(), destCluster().createDestClusterLink$default$3(), 10000L, destCluster().createDestClusterLink$default$5(), destCluster().createDestClusterLink$default$6());
        verifyMirrorWithSourceEpochChanges(false);
        Assertions.assertTrue(destCluster().topicLinkState(topic()).state().shouldSync());
        sourceCluster().deleteTopic(topic(), false);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testSourceChangePropagation$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + 20000) {
                Assertions.fail("Source topic deletion not propagated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(20000L), 100L));
        }
    }

    @Test
    public void testUncleanSourceLeaderElectionWithDestEpochAhead() {
        setUpClusters(new Some("2.6"), None$.MODULE$);
        destCluster().createDestClusterLink(linkName(), sourceCluster(), destCluster().createDestClusterLink$default$3(), destCluster().createDestClusterLink$default$4(), destCluster().createDestClusterLink$default$5(), destCluster().createDestClusterLink$default$6());
        verifyMirrorWithSourceEpochChanges(true);
        verifyLinkedLeaderChangeMetrics();
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    @Test
    public void testMirrorWithSourceFailuresWithOldDestIpb() {
        setUpClusters(new Some("2.6"), None$.MODULE$);
        int i = 10;
        createLinkAndMirrorWithPartialReplicationAndShutdownSource(createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$1(), 10, createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$3());
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        KafkaServer partitionLeader = destCluster().partitionLeader(topicPartition);
        waitForMirror((Seq) package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapRefArray(new KafkaServer[]{partitionLeader})), waitForMirror$default$2());
        ((IterableOnceOps) destCluster().servers().filterNot(kafkaServer -> {
            return BoxesRunTime.boxToBoolean($anonfun$testMirrorWithSourceFailuresWithOldDestIpb$1(partitionLeader, kafkaServer));
        })).foreach(kafkaServer2 -> {
            $anonfun$testMirrorWithSourceFailuresWithOldDestIpb$2(this, i, topicPartition, kafkaServer2);
            return BoxedUnit.UNIT;
        });
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), false);
    }

    @Test
    public void testMirrorWithSourceFailuresWithOldSourceIpb() {
        setUpClusters(None$.MODULE$, new Some("2.4"));
        createLinkAndMirrorWithPartialReplicationAndShutdownSource(new Properties(), createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$2(), createLinkAndMirrorWithPartialReplicationAndShutdownSource$default$3());
        verifyMirror(topic(), verifyMirror$default$2(), false);
    }

    @Test
    public void testSourceTopicRecreateDetectedOnTruncation() {
        numPartitions_$eq(1);
        setUpClusters(new Some("2.6"), None$.MODULE$);
        ConfluentAdmin createAdminClient = destCluster().createAdminClient(destCluster().createAdminClient$default$1());
        setupLinkAndMirrorForFailureTest(20000L, 60000L, "testGroup");
        produceToSourceCluster(20);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        sourceCluster().deleteTopic(topic(), sourceCluster().deleteTopic$default$2());
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        produceToSourceCluster(5);
        truncate(5);
        waitForFailure(createAdminClient, FailureType$SourceTopicDelete$.MODULE$);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        destCluster().killAllBrokers();
        waitForFailure((ConfluentAdmin) restartCluster(destCluster(), !useSourceInitiatedLink()).get(), FailureType$SourceTopicDelete$.MODULE$);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    @Test
    public void testCircularMirror() {
        setUpClusters(None$.MODULE$, new Some("2.6"));
        numPartitions_$eq(1);
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        Properties properties = new Properties();
        properties.setProperty(ClusterLinkConfig$.MODULE$.AvailabilityCheckMsProp(), "100");
        UUID createDestClusterLink = destCluster().createDestClusterLink(linkName(), sourceCluster(), destCluster().createDestClusterLink$default$3(), destCluster().createDestClusterLink$default$4(), destCluster().createDestClusterLink$default$5(), properties);
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", destCluster().brokerList());
        properties2.putAll(destCluster().clientSecurityProps(linkName()));
        properties2.put("sasl.jaas.config", createLinkCredentials(linkName(), destCluster(), createLinkCredentials$default$3()));
        UUID createDestClusterLink2 = sourceCluster().createDestClusterLink(linkName(), destCluster(), sourceCluster().createDestClusterLink$default$3(), sourceCluster().createDestClusterLink$default$4(), sourceCluster().createDestClusterLink$default$5(), properties);
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), sourceCluster().createTopic$default$4());
        destCluster().linkTopic(topic(), replicationFactor(), linkName(), destCluster().linkTopic$default$4());
        sourceCluster().deleteTopic(topic(), sourceCluster().deleteTopic$default$2());
        sourceCluster().linkTopic(topic(), replicationFactor(), linkName(), sourceCluster().linkTopic$default$4());
        ConfluentAdmin createAdminClient = sourceCluster().createAdminClient(sourceCluster().createAdminClient$default$1());
        ConfluentAdmin createAdminClient2 = destCluster().createAdminClient(destCluster().createAdminClient$default$1());
        waitForReplicaState$1(createAdminClient, FailureType$CircularMirror$.MODULE$.replicaStatusStates());
        waitForReplicaState$1(createAdminClient2, FailureType$CircularMirror$.MODULE$.replicaStatusStates());
        waitForBlockedPartition$1(createDestClusterLink2, createDestClusterLink, topicPartition);
        destCluster().unlinkTopic(topic(), linkName(), destCluster().unlinkTopic$default$3(), false);
        destCluster().verifyTopicWritable(topic(), numPartitions());
        KafkaProducer<byte[], byte[]> createProducer = destCluster().createProducer(destCluster().createProducer$default$1(), destCluster().createProducer$default$2(), destCluster().createProducer$default$3());
        produceRecords(createProducer, topic(), 20, produceRecords$default$4());
        createProducer.close();
        waitForMirror(sourceCluster().servers(), waitForMirror$default$2());
        waitForReplicaState$1(createAdminClient, (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new ReplicaStatus.MirrorInfo.State[]{ReplicaStatus.MirrorInfo.State.ACTIVE})));
    }

    private void setUpClusters(Option<String> option, Option<String> option2) {
        option.foreach(str -> {
            return this.destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), str);
        });
        option2.foreach(str2 -> {
            return this.sourceCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), str2);
        });
        super.setUp();
    }

    private Option<String> setUpClusters$default$1() {
        return None$.MODULE$;
    }

    private Option<String> setUpClusters$default$2() {
        return None$.MODULE$;
    }

    public static final /* synthetic */ boolean $anonfun$testSourceChangePropagation$1(ClusterLinkIbp26Test clusterLinkIbp26Test) {
        return !clusterLinkIbp26Test.destCluster().topicLinkState(clusterLinkIbp26Test.topic()).state().shouldSync();
    }

    public static final /* synthetic */ String $anonfun$testSourceChangePropagation$2() {
        return "Source topic deletion not propagated";
    }

    public static final /* synthetic */ boolean $anonfun$testMirrorWithSourceFailuresWithOldDestIpb$1(KafkaServer kafkaServer, KafkaServer kafkaServer2) {
        return kafkaServer2 == null ? kafkaServer == null : kafkaServer2.equals(kafkaServer);
    }

    public static final /* synthetic */ void $anonfun$testMirrorWithSourceFailuresWithOldDestIpb$2(ClusterLinkIbp26Test clusterLinkIbp26Test, int i, TopicPartition topicPartition, KafkaServer kafkaServer) {
        Assertions.assertEquals(new Some(BoxesRunTime.boxToInteger(i)), clusterLinkIbp26Test.logEndOffset(kafkaServer, topicPartition));
    }

    public static final /* synthetic */ boolean $anonfun$testCircularMirror$2(Set set, Set set2) {
        return set2.subsetOf(set);
    }

    private final void waitForReplicaState$1(ConfluentAdmin confluentAdmin, Set set) {
        Tuple2 $minus$greater$extension;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            Set<ReplicaStatus.MirrorInfo.State> mirrorPartitionStates = mirrorPartitionStates(confluentAdmin);
            if (mirrorPartitionStates.subsetOf(set)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(mirrorPartitionStates), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(mirrorPartitionStates), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
            }
        }
        Tuple2 tuple2 = $minus$greater$extension;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Set set2 = (Set) tuple2._1();
        Assertions.assertTrue(set2.subsetOf(set), new StringBuilder(25).append("Expected subset of ").append(set).append(", got ").append(set2).toString());
    }

    private static final ConcurrentHashMap waitingPartitions$1(ClusterLinkTestHarness clusterLinkTestHarness, UUID uuid, TopicPartition topicPartition) {
        return (ConcurrentHashMap) TestUtils.fieldValue(clusterLinkTestHarness.partitionLeader(topicPartition).clusterLinkManager().fetcherManager(uuid).get(), ClusterLinkFetcherManager.class, "waitingPartitions");
    }

    public static final /* synthetic */ boolean $anonfun$testCircularMirror$3(ConcurrentHashMap concurrentHashMap, TopicPartition topicPartition, ConcurrentHashMap concurrentHashMap2) {
        return concurrentHashMap.containsKey(topicPartition) || concurrentHashMap2.containsKey(topicPartition);
    }

    public static final /* synthetic */ String $anonfun$testCircularMirror$4() {
        return "Partition not blocked after consecutive epoch bumps";
    }

    private final void waitForBlockedPartition$1(UUID uuid, UUID uuid2, TopicPartition topicPartition) {
        ConcurrentHashMap waitingPartitions$1 = waitingPartitions$1(sourceCluster(), uuid, topicPartition);
        ConcurrentHashMap waitingPartitions$12 = waitingPartitions$1(destCluster(), uuid2, topicPartition);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testCircularMirror$3(waitingPartitions$1, topicPartition, waitingPartitions$12)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Partition not blocked after consecutive epoch bumps");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    public ClusterLinkIbp26Test() {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        ClusterLinkTestHarness$ clusterLinkTestHarness$ = ClusterLinkTestHarness$.MODULE$;
        this.sourceCluster = new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, None$.MODULE$, 0, 3);
        SecurityProtocol securityProtocol2 = SecurityProtocol.SASL_PLAINTEXT;
        ClusterLinkTestHarness$ clusterLinkTestHarness$2 = ClusterLinkTestHarness$.MODULE$;
        this.destCluster = new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, None$.MODULE$, 100, 3);
        this.replicationFactor = (short) 3;
    }
}
