package kafka.tier;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.api.IntegrationTestHarness;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.$less$colon$less$;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: TierEpochStateRevolvingReplicationTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005=e\u0001B\t\u0013\u0001]AQ\u0001\n\u0001\u0005\u0002\u0015Bq\u0001\u000b\u0001C\u0002\u0013E\u0013\u0006\u0003\u00041\u0001\u0001\u0006IA\u000b\u0005\bc\u0001\u0011\r\u0011\"\u00013\u0011\u0019Y\u0004\u0001)A\u0005g!9A\b\u0001b\u0001\n\u0003i\u0004B\u0002#\u0001A\u0003%a\bC\u0004F\u0001\t\u0007I\u0011\u0001$\t\rE\u0003\u0001\u0015!\u0003H\u0011\u0015\u0011\u0006\u0001\"\u0011T\u0011\u0015A\u0007\u0001\"\u0011j\u0011\u0015q\u0007\u0001\"\u0001p\u0011\u001d\t\t\u0003\u0001C\u0005\u0003GAq!!\u0010\u0001\t\u0013\ty\u0004C\u0004\u0002T\u0001!I!!\u0016\t\u000f\u0005M\u0004\u0001\"\u0003\u0002v\t1C+[3s\u000bB|7\r[*uCR,'+\u001a<pYZLgn\u001a*fa2L7-\u0019;j_:$Vm\u001d;\u000b\u0005M!\u0012\u0001\u0002;jKJT\u0011!F\u0001\u0006W\u000647.Y\u0002\u0001'\r\u0001\u0001D\b\t\u00033qi\u0011A\u0007\u0006\u00037Q\t1!\u00199j\u0013\ti\"D\u0001\fJ]R,wM]1uS>tG+Z:u\u0011\u0006\u0014h.Z:t!\ty\"%D\u0001!\u0015\t\tC#A\u0003vi&d7/\u0003\u0002$A\t9Aj\\4hS:<\u0017A\u0002\u001fj]&$h\bF\u0001'!\t9\u0003!D\u0001\u0013\u0003-\u0011'o\\6fe\u000e{WO\u001c;\u0016\u0003)\u0002\"a\u000b\u0018\u000e\u00031R\u0011!L\u0001\u0006g\u000e\fG.Y\u0005\u0003_1\u00121!\u00138u\u00031\u0011'o\\6fe\u000e{WO\u001c;!\u0003\u0015!x\u000e]5d+\u0005\u0019\u0004C\u0001\u001b:\u001b\u0005)$B\u0001\u001c8\u0003\u0011a\u0017M\\4\u000b\u0003a\nAA[1wC&\u0011!(\u000e\u0002\u0007'R\u0014\u0018N\\4\u0002\rQ|\u0007/[2!\u0003\ri7oZ\u000b\u0002}A\u00191fP!\n\u0005\u0001c#!B!se\u0006L\bCA\u0016C\u0013\t\u0019EF\u0001\u0003CsR,\u0017\u0001B7tO\u0002\na!\u001a=ji\u0016$W#A$\u0011\u0005!{U\"A%\u000b\u0005)[\u0015AB1u_6L7M\u0003\u0002M\u001b\u0006Q1m\u001c8dkJ\u0014XM\u001c;\u000b\u00059;\u0014\u0001B;uS2L!\u0001U%\u0003\u001b\u0005#x.\\5d\u0005>|G.Z1o\u0003\u001d)\u00070\u001b;fI\u0002\nQa]3u+B$\"\u0001V,\u0011\u0005-*\u0016B\u0001,-\u0005\u0011)f.\u001b;\t\u000baS\u0001\u0019A-\u0002\u0011Q,7\u000f^%oM>\u0004\"A\u00172\u000e\u0003mS!a\u0007/\u000b\u0005us\u0016a\u00026va&$XM\u001d\u0006\u0003?\u0002\fQA[;oSRT\u0011!Y\u0001\u0004_J<\u0017BA2\\\u0005!!Vm\u001d;J]\u001a|\u0007F\u0001\u0006f!\tQf-\u0003\u0002h7\nQ!)\u001a4pe\u0016,\u0015m\u00195\u0002\u0011Q,\u0017M\u001d#po:$\u0012\u0001\u0016\u0015\u0003\u0017-\u0004\"A\u00177\n\u00055\\&!C!gi\u0016\u0014X)Y2i\u0003\u0005\"Xm\u001d;US\u0016\u00148\u000b^1uKJ+7\u000f^8sKR{'+\u001a9mS\u000e\fG/[8o)\t!\u0006\u000fC\u0003r\u0019\u0001\u0007!/\u0001\u0004rk>\u0014X/\u001c\t\u0003gjt!\u0001\u001e=\u0011\u0005UdS\"\u0001<\u000b\u0005]4\u0012A\u0002\u001fs_>$h(\u0003\u0002zY\u00051\u0001K]3eK\u001aL!AO>\u000b\u0005ed\u0003F\u0001\u0007~!\rq\u00181A\u0007\u0002\u007f*\u0019\u0011\u0011\u0001/\u0002\rA\f'/Y7t\u0013\r\t)a \u0002\u0012!\u0006\u0014\u0018-\\3uKJL'0\u001a3UKN$\bf\u0002\u0007\u0002\n\u0005U\u0011q\u0003\t\u0005\u0003\u0017\t\t\"\u0004\u0002\u0002\u000e)\u0019\u0011qB@\u0002\u0011A\u0014xN^5eKJLA!a\u0005\u0002\u000e\tYa+\u00197vKN{WO]2f\u0003\u001d\u0019HO]5oONdC!!\u0007\u0002\u001e\u0005\u0012\u00111D\u0001\u0003u.\f#!a\b\u0002\u000b-\u0014\u0018M\u001a;\u00025]\f\u0017\u000e\u001e$pe2{w-\u00128e\u001f\u001a47/\u001a;U_6\u000bGo\u00195\u0015\u000fQ\u000b)#!\u000e\u0002:!9\u0011qE\u0007A\u0002\u0005%\u0012A\u000122!\u0011\tY#!\r\u000e\u0005\u00055\"bAA\u0018)\u000511/\u001a:wKJLA!a\r\u0002.\tY1*\u00194lC\n\u0013xn[3s\u0011\u001d\t9$\u0004a\u0001\u0003S\t!A\u0019\u001a\t\r\u0005mR\u00021\u0001+\u0003%\u0001\u0018M\u001d;ji&|g.\u0001\u0004hKRdun\u001a\u000b\u0007\u0003\u0003\ni%!\u0015\u0011\t\u0005\r\u0013\u0011J\u0007\u0003\u0003\u000bR1!a\u0012\u0015\u0003\rawnZ\u0005\u0005\u0003\u0017\n)EA\u0006BEN$(/Y2u\u0019><\u0007bBA(\u001d\u0001\u0007\u0011\u0011F\u0001\u0007EJ|7.\u001a:\t\r\u0005mb\u00021\u0001+\u0003))\u0007o\\2i\u0007\u0006\u001c\u0007.\u001a\u000b\u0005\u0003/\n\t\b\u0005\u0003\u0002Z\u00055TBAA.\u0015\u0011\ti&a\u0018\u0002\u000b\u0015\u0004xn\u00195\u000b\t\u0005\u0005\u00141M\u0001\nS:$XM\u001d8bYNTA!!\u001a\u0002h\u000591\u000f^8sC\u001e,'bA\u000b\u0002j)\u0019\u00111\u000e1\u0002\r\u0005\u0004\u0018m\u00195f\u0013\u0011\ty'a\u0017\u0003)1+\u0017\rZ3s\u000bB|7\r\u001b$jY\u0016\u001c\u0015m\u00195f\u0011\u001d\tye\u0004a\u0001\u0003S\t\u0001\"Y<bSRL5K\u0015\u000b\b)\u0006]\u0014qQAF\u0011\u001d\tI\b\u0005a\u0001\u0003w\n!\u0001\u001e9\u0011\t\u0005u\u00141Q\u0007\u0003\u0003\u007fRA!!!\u0002h\u000511m\\7n_:LA!!\"\u0002��\tqAk\u001c9jGB\u000b'\u000f^5uS>t\u0007BBAE!\u0001\u0007!&A\u0006ok6\u0014V\r\u001d7jG\u0006\u001c\bbBAG!\u0001\u0007\u0011\u0011F\u0001\u0007Y\u0016\fG-\u001a:")
/* loaded from: input_file:kafka/tier/TierEpochStateRevolvingReplicationTest.class */
public class TierEpochStateRevolvingReplicationTest extends IntegrationTestHarness {
    private final int brokerCount = 3;
    private final String topic = "topic1";
    private final byte[] msg = new byte[1000];
    private final AtomicBoolean exited = new AtomicBoolean(false);

    @Override // kafka.api.IntegrationTestHarness
    public int brokerCount() {
        return this.brokerCount;
    }

    public String topic() {
        return this.topic;
    }

    public byte[] msg() {
        return this.msg;
    }

    public AtomicBoolean exited() {
        return this.exited;
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.server.QuorumTestHarness
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        Exit.setExitProcedure((i, str) -> {
            this.exited().set(true);
        });
        super.setUp(testInfo);
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.server.QuorumTestHarness
    @AfterEach
    public void tearDown() {
        Assertions.assertFalse(exited().get());
        super.tearDown();
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testTierStateRestoreToReplication(String str) {
        Properties properties = new Properties();
        properties.put("confluent.tier.enable", "true");
        properties.put("segment.bytes", "2000");
        properties.put("retention.bytes", "-1");
        properties.put("confluent.tier.local.hotset.bytes", "0");
        properties.put("min.insync.replicas", "2");
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        int _2$mcI$sp = ((Tuple2) createTopic(topic(), 1, 3, properties, createTopic$default$5(), createTopic$default$6()).head())._2$mcI$sp();
        KafkaBroker kafkaBroker = (KafkaBroker) brokers().find(kafkaBroker2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testTierStateRestoreToReplication$1(_2$mcI$sp, kafkaBroker2));
        }).get();
        awaitISR(topicPartition, 3, kafkaBroker);
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        brokers().foreach(kafkaBroker3 -> {
            $anonfun$testTierStateRestoreToReplication$2(kafkaBroker3);
            return BoxedUnit.UNIT;
        });
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i -> {
            this.killBroker(BoxesRunTime.unboxToInt(((IterableOps) ((IterableOps) this.brokers().map(kafkaBroker4 -> {
                return BoxesRunTime.boxToInteger($anonfun$testTierStateRestoreToReplication$4(kafkaBroker4));
            })).filter(i -> {
                return i != _2$mcI$sp;
            })).head()));
            this.awaitISR(topicPartition, 2, kafkaBroker);
            createProducer.send(new ProducerRecord(topicPartition.topic(), Predef$.MODULE$.int2Integer(topicPartition.partition()), (Object) null, this.msg())).get();
            LogManager logManager = kafkaBroker.replicaManager().logManager();
            long localLogEndOffset = ((AbstractLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get()).localLogEndOffset();
            RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 4).foreach(obj -> {
                return $anonfun$testTierStateRestoreToReplication$6(this, createProducer, topicPartition, BoxesRunTime.unboxToInt(obj));
            });
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$testTierStateRestoreToReplication$7(kafkaBroker, topicPartition, localLogEndOffset)) {
                if (System.currentTimeMillis() > currentTimeMillis + 60000) {
                    Assertions.fail("timed out waiting for segment tiering and deletion");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(60000L), 100L));
            }
            this.restartDeadBrokers(this.restartDeadBrokers$default$1());
            this.awaitISR(topicPartition, 3, kafkaBroker);
            createProducer.send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), (Object) null, this.msg())).get();
            this.brokers().foreach(kafkaBroker5 -> {
                this.waitForLogEndOffsetToMatch(kafkaBroker, kafkaBroker5, 0);
                return BoxedUnit.UNIT;
            });
            this.brokers().foreach(kafkaBroker6 -> {
                $anonfun$testTierStateRestoreToReplication$10(this, kafkaBroker, kafkaBroker6);
                return BoxedUnit.UNIT;
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void waitForLogEndOffsetToMatch(KafkaBroker kafkaBroker, KafkaBroker kafkaBroker2, int i) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitForLogEndOffsetToMatch$1(this, kafkaBroker, i, kafkaBroker2)) {
            if (System.currentTimeMillis() > currentTimeMillis + 60000) {
                Assertions.fail($anonfun$waitForLogEndOffsetToMatch$2(this, kafkaBroker, i, kafkaBroker2));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(60000L), 100L));
        }
    }

    private AbstractLog getLog(KafkaBroker kafkaBroker, int i) {
        LogManager logManager = kafkaBroker.logManager();
        return (AbstractLog) logManager.getLog(new TopicPartition(topic(), i), logManager.getLog$default$2()).orNull($less$colon$less$.MODULE$.refl());
    }

    private LeaderEpochFileCache epochCache(KafkaBroker kafkaBroker) {
        return (LeaderEpochFileCache) getLog(kafkaBroker, 0).leaderEpochCache().get();
    }

    private void awaitISR(TopicPartition topicPartition, int i, KafkaBroker kafkaBroker) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$awaitISR$1(kafkaBroker, topicPartition, i)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Timed out waiting for replicas to join ISR");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    public static final /* synthetic */ boolean $anonfun$testTierStateRestoreToReplication$1(int i, KafkaBroker kafkaBroker) {
        return kafkaBroker.config().brokerId() == i;
    }

    public static final /* synthetic */ void $anonfun$testTierStateRestoreToReplication$2(KafkaBroker kafkaBroker) {
        TierTestUtils$.MODULE$.awaitTierTopicPartition(kafkaBroker, Predef$.MODULE$.int2Integer(0));
    }

    public static final /* synthetic */ int $anonfun$testTierStateRestoreToReplication$4(KafkaBroker kafkaBroker) {
        return kafkaBroker.config().brokerId();
    }

    public static final /* synthetic */ RecordMetadata $anonfun$testTierStateRestoreToReplication$6(TierEpochStateRevolvingReplicationTest tierEpochStateRevolvingReplicationTest, KafkaProducer kafkaProducer, TopicPartition topicPartition, int i) {
        return (RecordMetadata) kafkaProducer.send(new ProducerRecord(topicPartition.topic(), Predef$.MODULE$.int2Integer(topicPartition.partition()), (Object) null, tierEpochStateRevolvingReplicationTest.msg())).get();
    }

    public static final /* synthetic */ boolean $anonfun$testTierStateRestoreToReplication$7(KafkaBroker kafkaBroker, TopicPartition topicPartition, long j) {
        LogManager logManager = kafkaBroker.replicaManager().logManager();
        return ((AbstractLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get()).localLogStartOffset() > j;
    }

    public static final /* synthetic */ String $anonfun$testTierStateRestoreToReplication$8() {
        return "timed out waiting for segment tiering and deletion";
    }

    public static final /* synthetic */ void $anonfun$testTierStateRestoreToReplication$10(TierEpochStateRevolvingReplicationTest tierEpochStateRevolvingReplicationTest, KafkaBroker kafkaBroker, KafkaBroker kafkaBroker2) {
        Assertions.assertEquals(tierEpochStateRevolvingReplicationTest.epochCache(kafkaBroker2).epochEntries(), tierEpochStateRevolvingReplicationTest.epochCache(kafkaBroker).epochEntries());
    }

    public static final /* synthetic */ boolean $anonfun$waitForLogEndOffsetToMatch$1(TierEpochStateRevolvingReplicationTest tierEpochStateRevolvingReplicationTest, KafkaBroker kafkaBroker, int i, KafkaBroker kafkaBroker2) {
        return tierEpochStateRevolvingReplicationTest.getLog(kafkaBroker, i).logEndOffset() == tierEpochStateRevolvingReplicationTest.getLog(kafkaBroker2, i).logEndOffset();
    }

    public static final /* synthetic */ String $anonfun$waitForLogEndOffsetToMatch$2(TierEpochStateRevolvingReplicationTest tierEpochStateRevolvingReplicationTest, KafkaBroker kafkaBroker, int i, KafkaBroker kafkaBroker2) {
        return new StringBuilder(27).append("Logs didn't match ").append(tierEpochStateRevolvingReplicationTest.getLog(kafkaBroker, i).logEndOffset()).append(" vs ").append(tierEpochStateRevolvingReplicationTest.getLog(kafkaBroker2, i).logEndOffset()).append(". ").append(kafkaBroker.config().brokerId()).append(" v ").append(kafkaBroker2.config().brokerId()).toString();
    }

    public static final /* synthetic */ boolean $anonfun$awaitISR$2(int i, Partition partition) {
        return partition.inSyncReplicaIds().size() == i;
    }

    public static final /* synthetic */ boolean $anonfun$awaitISR$1(KafkaBroker kafkaBroker, TopicPartition topicPartition, int i) {
        return kafkaBroker.replicaManager().onlinePartition(topicPartition).exists(partition -> {
            return BoxesRunTime.boxToBoolean($anonfun$awaitISR$2(i, partition));
        });
    }

    public static final /* synthetic */ String $anonfun$awaitISR$3() {
        return "Timed out waiting for replicas to join ISR";
    }

    public TierEpochStateRevolvingReplicationTest() {
        serverConfig().put(KafkaConfig$.MODULE$.TierEnableProp(), "false");
        serverConfig().put(KafkaConfig$.MODULE$.TierFeatureProp(), "true");
        serverConfig().put(KafkaConfig$.MODULE$.TierBackendProp(), "mock");
        serverConfig().put(KafkaConfig$.MODULE$.TierS3BucketProp(), "mybucket");
        serverConfig().put(KafkaConfig$.MODULE$.TierPartitionStateCommitIntervalProp(), "5");
        serverConfig().put(KafkaConfig$.MODULE$.TierMetadataNumPartitionsProp(), "1");
        serverConfig().put("log.retention.check.interval.ms", "5");
    }
}
