package kafka.link;

import kafka.cluster.Partition;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.link.ClusterLinkConfig$;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

/* compiled from: ClusterLinkIsrTest.scala */
@Tag("integration")
@ScalaSignature(bytes = "\u0006\u0001Y2A!\u0002\u0004\u0001\u0017!)\u0001\u0003\u0001C\u0001#!)1\u0003\u0001C!)!)\u0001\u0006\u0001C\u0001)!)Q\u0006\u0001C\u0001)\t\u00112\t\\;ti\u0016\u0014H*\u001b8l\u0013N\u0014H+Z:u\u0015\t9\u0001\"\u0001\u0003mS:\\'\"A\u0005\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0004\t\u0003\u001b9i\u0011AB\u0005\u0003\u001f\u0019\u0011!%\u00112tiJ\f7\r^\"mkN$XM\u001d'j].Le\u000e^3he\u0006$\u0018n\u001c8UKN$\u0018A\u0002\u001fj]&$h\bF\u0001\u0013!\ti\u0001!A\u0003tKR,\u0006\u000fF\u0001\u0016!\t1\u0012$D\u0001\u0018\u0015\u0005A\u0012!B:dC2\f\u0017B\u0001\u000e\u0018\u0005\u0011)f.\u001b;)\u0005\ta\u0002CA\u000f'\u001b\u0005q\"BA\u0010!\u0003\r\t\u0007/\u001b\u0006\u0003C\t\nqA[;qSR,'O\u0003\u0002$I\u0005)!.\u001e8ji*\tQ%A\u0002pe\u001eL!a\n\u0010\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007.\u0001\u000euKN$H)Z:uS:\fG/[8o+:$WM]'j]&\u001b(\u000f\u000b\u0002\u0004UA\u0011QdK\u0005\u0003Yy\u0011A\u0001V3ti\u0006)B/Z:u%\u0016\u001cH/\u0019:u!\u0006,8/\u001a3MS:\\\u0007F\u0001\u0003+Q\u0011\u0001\u0001g\r\u001b\u0011\u0005u\t\u0014B\u0001\u001a\u001f\u0005\r!\u0016mZ\u0001\u0006m\u0006dW/Z\u0011\u0002k\u0005Y\u0011N\u001c;fOJ\fG/[8o\u0001")
/* loaded from: input_file:kafka/link/ClusterLinkIsrTest.class */
public class ClusterLinkIsrTest extends AbstractClusterLinkIntegrationTest {
    @Override // kafka.link.AbstractClusterLinkIntegrationTest
    @BeforeEach
    public void setUp() {
        destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.ReplicaLagTimeMaxMsProp(), "5000");
        destCluster().serverConfig().setProperty(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), "2");
        super.setUp();
    }

    @Test
    public void testDestinationUnderMinIsr() {
        numPartitions_$eq(1);
        TopicPartition topicPartition = (TopicPartition) partitions().head();
        sourceCluster().createTopic(topic(), numPartitions(), 3, sourceCluster().createTopic$default$4());
        destCluster().createClusterLink(linkName(), sourceCluster(), destCluster().createClusterLink$default$3(), destCluster().createClusterLink$default$4(), destCluster().createClusterLink$default$5(), destCluster().createClusterLink$default$6());
        destCluster().linkTopic(topic(), (short) 3, linkName(), destCluster().linkTopic$default$4());
        produceToSourceCluster(2);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        KafkaServer partitionLeader = destCluster().partitionLeader(topicPartition);
        Buffer buffer = (Buffer) destCluster().servers().filterNot(kafkaServer -> {
            return BoxesRunTime.boxToBoolean($anonfun$testDestinationUnderMinIsr$1(partitionLeader, kafkaServer));
        });
        buffer.foreach(kafkaServer2 -> {
            kafkaServer2.shutdown();
            return BoxedUnit.UNIT;
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testDestinationUnderMinIsr$3(this, partitionLeader, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testDestinationUnderMinIsr$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        produceToSourceCluster(10);
        long unboxToLong = BoxesRunTime.unboxToLong(logEndOffset(partitionLeader, topicPartition).get());
        long unboxToLong2 = BoxesRunTime.unboxToLong(logEndOffset(sourceCluster().partitionLeader(topicPartition), topicPartition).get());
        Assertions.assertTrue(unboxToLong2 > unboxToLong, new StringBuilder(62).append("Records mirrored with under-min-isrs sourceOffset=").append(unboxToLong2).append(" destOffset=").append(unboxToLong).toString());
        buffer.foreach(kafkaServer3 -> {
            kafkaServer3.startup();
            return BoxedUnit.UNIT;
        });
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        produceToSourceCluster(10);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    @Test
    public void testRestartPausedLink() {
        Tuple2 $minus$greater$extension;
        numPartitions_$eq(1);
        TopicPartition topicPartition = (TopicPartition) partitions().head();
        sourceCluster().createTopic(topic(), numPartitions(), 3, sourceCluster().createTopic$default$4());
        destCluster().createClusterLink(linkName(), sourceCluster(), destCluster().createClusterLink$default$3(), destCluster().createClusterLink$default$4(), destCluster().createClusterLink$default$5(), destCluster().createClusterLink$default$6());
        destCluster().linkTopic(topic(), (short) 3, linkName(), destCluster().linkTopic$default$4());
        produceToSourceCluster(2);
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        KafkaServer partitionLeader = destCluster().partitionLeader(topicPartition);
        Buffer buffer = (Buffer) destCluster().servers().filterNot(kafkaServer -> {
            return BoxesRunTime.boxToBoolean($anonfun$testRestartPausedLink$1(partitionLeader, kafkaServer));
        });
        ((KafkaServer) buffer.head()).shutdown();
        destCluster().updateBootstrapServers();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testRestartPausedLink$2(this, partitionLeader, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testRestartPausedLink$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        waitForMirror(new $colon.colon(partitionLeader, Nil$.MODULE$), waitForMirror$default$2());
        Option<Object> logEndOffset = logEndOffset(partitionLeader, topicPartition);
        destCluster().alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "true")})), new $colon.colon(partitionLeader, Nil$.MODULE$));
        produceToSourceCluster(2);
        partitionLeader.shutdown();
        ((KafkaServer) buffer.apply(1)).shutdown();
        destCluster().startBroker(partitionLeader.config().brokerId());
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testRestartPausedLink$4(partitionLeader, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testRestartPausedLink$5());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        Assertions.assertEquals(logEndOffset, logEndOffset(partitionLeader, topicPartition));
        buffer.foreach(kafkaServer2 -> {
            $anonfun$testRestartPausedLink$6(this, kafkaServer2);
            return BoxedUnit.UNIT;
        });
        $colon.colon colonVar = new $colon.colon(logEndOffset, new $colon.colon(logEndOffset, Nil$.MODULE$));
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$3 == null) {
            throw null;
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (true) {
            Buffer $anonfun$testRestartPausedLink$7 = $anonfun$testRestartPausedLink$7(this, buffer, topicPartition);
            if ($anonfun$testRestartPausedLink$9(colonVar, $anonfun$testRestartPausedLink$7)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testRestartPausedLink$7), BoxesRunTime.boxToBoolean(true));
                break;
            } else {
                if (System.currentTimeMillis() > currentTimeMillis3 + computeUntilTrue$default$2) {
                    $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc($anonfun$testRestartPausedLink$7), BoxesRunTime.boxToBoolean(false));
                    break;
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
            }
        }
        if ($minus$greater$extension == null) {
            throw new MatchError((Object) null);
        }
        Assertions.assertEquals(colonVar, (Buffer) $minus$greater$extension._1());
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$4 == null) {
            throw null;
        }
        long currentTimeMillis4 = System.currentTimeMillis();
        while (!$anonfun$testRestartPausedLink$10(partitionLeader, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis4 + waitUntilTrue$default$33) {
                Assertions.fail($anonfun$testRestartPausedLink$11());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        destCluster().alterClusterLink(linkName(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ClusterLinkConfig$.MODULE$.ClusterLinkPausedProp()), "false")})), destCluster().alterClusterLink$default$3());
        waitForMirror(waitForMirror$default$1(), waitForMirror$default$2());
        produceToSourceCluster(2);
        verifyMirror(topic(), verifyMirror$default$2(), verifyMirror$default$3());
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationUnderMinIsr$1(KafkaServer kafkaServer, KafkaServer kafkaServer2) {
        return kafkaServer2 == null ? kafkaServer == null : kafkaServer2.equals(kafkaServer);
    }

    public static final /* synthetic */ boolean $anonfun$testDestinationUnderMinIsr$3(ClusterLinkIsrTest clusterLinkIsrTest, KafkaServer kafkaServer, TopicPartition topicPartition) {
        clusterLinkIsrTest.produceToSourceCluster(2);
        return ((Partition) kafkaServer.replicaManager().onlinePartition(topicPartition).get()).isUnderMinIsr();
    }

    public static final /* synthetic */ String $anonfun$testDestinationUnderMinIsr$4() {
        return "Destination not under-min-isr with two brokers down";
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$1(KafkaServer kafkaServer, KafkaServer kafkaServer2) {
        return kafkaServer2 == null ? kafkaServer == null : kafkaServer2.equals(kafkaServer);
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$2(ClusterLinkIsrTest clusterLinkIsrTest, KafkaServer kafkaServer, TopicPartition topicPartition) {
        clusterLinkIsrTest.produceToSourceCluster(2);
        return ((Partition) kafkaServer.replicaManager().onlinePartition(topicPartition).get()).isUnderReplicated();
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$3() {
        return "Destination not under-replicated with a broker down";
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$4(KafkaServer kafkaServer, TopicPartition topicPartition) {
        try {
            return !kafkaServer.replicaManager().getPartitionOrException(topicPartition).isBlockedOnMirrorSource();
        } catch (Exception unused) {
            return false;
        }
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$5() {
        return "Paused partition should not be blocked on source";
    }

    public static final /* synthetic */ void $anonfun$testRestartPausedLink$6(ClusterLinkIsrTest clusterLinkIsrTest, KafkaServer kafkaServer) {
        clusterLinkIsrTest.destCluster().startBroker(kafkaServer.config().brokerId());
    }

    public static final /* synthetic */ Buffer $anonfun$testRestartPausedLink$7(ClusterLinkIsrTest clusterLinkIsrTest, Buffer buffer, TopicPartition topicPartition) {
        return (Buffer) buffer.map(kafkaServer -> {
            return clusterLinkIsrTest.logEndOffset(kafkaServer, topicPartition);
        }, Buffer$.MODULE$.canBuildFrom());
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$9(Seq seq, Buffer buffer) {
        return buffer == null ? seq == null : buffer.equals(seq);
    }

    public static final /* synthetic */ boolean $anonfun$testRestartPausedLink$10(KafkaServer kafkaServer, TopicPartition topicPartition) {
        return !((Partition) kafkaServer.replicaManager().onlinePartition(topicPartition).get()).isUnderReplicated();
    }

    public static final /* synthetic */ String $anonfun$testRestartPausedLink$11() {
        return "Destination follower unable to join ISR with paused link";
    }
}
