package kafka.tier;

import java.util.concurrent.atomic.AtomicBoolean;
import kafka.api.IntegrationTestHarness;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.state.TierPartitionState;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Array$;
import scala.Function1;
import scala.Predef$;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.TraversableOnce;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

/* compiled from: TierRetentionIntegrationTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ea\u0001\u0002\t\u0012\u0001YAQ!\b\u0001\u0005\u0002yAq!\t\u0001C\u0002\u0013%!\u0005\u0003\u0004,\u0001\u0001\u0006Ia\t\u0005\bY\u0001\u0011\r\u0011\"\u0003.\u0011\u0019!\u0004\u0001)A\u0005]!9Q\u0007\u0001b\u0001\n\u0013i\u0003B\u0002\u001c\u0001A\u0003%a\u0006C\u00038\u0001\u0011ES\u0006C\u00049\u0001\t\u0007I\u0011A\u001d\t\r\u0011\u0003\u0001\u0015!\u0003;\u0011\u0015)\u0005\u0001\"\u0011G\u0011\u00151\u0006\u0001\"\u0011G\u0011\u0015Y\u0006\u0001\"\u0001G\u0011\u0015\u0001\u0007\u0001\"\u0003b\u0011\u0015a\b\u0001\"\u0003~\u0005q!\u0016.\u001a:SKR,g\u000e^5p]&sG/Z4sCRLwN\u001c+fgRT!AE\n\u0002\tQLWM\u001d\u0006\u0002)\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u0018!\tA2$D\u0001\u001a\u0015\tQ2#A\u0002ba&L!\u0001H\r\u0003-%sG/Z4sCRLwN\u001c+fgRD\u0015M\u001d8fgN\fa\u0001P5oSRtD#A\u0010\u0011\u0005\u0001\u0002Q\"A\t\u0002\u000bQ|\u0007/[2\u0016\u0003\r\u0002\"\u0001J\u0015\u000e\u0003\u0015R!AJ\u0014\u0002\t1\fgn\u001a\u0006\u0002Q\u0005!!.\u0019<b\u0013\tQSE\u0001\u0004TiJLgnZ\u0001\u0007i>\u0004\u0018n\u0019\u0011\u0002\u001b9,X\u000eU1si&$\u0018n\u001c8t+\u0005q\u0003CA\u00183\u001b\u0005\u0001$\"A\u0019\u0002\u000bM\u001c\u0017\r\\1\n\u0005M\u0002$aA%oi\u0006qa.^7QCJ$\u0018\u000e^5p]N\u0004\u0013a\u00038v[J+\u0007\u000f\\5dCN\fAB\\;n%\u0016\u0004H.[2bg\u0002\n1B\u0019:pW\u0016\u00148i\\;oi\u00061Q\r_5uK\u0012,\u0012A\u000f\t\u0003w\tk\u0011\u0001\u0010\u0006\u0003{y\na!\u0019;p[&\u001c'BA A\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003\u0003\u001e\nA!\u001e;jY&\u00111\t\u0010\u0002\u000e\u0003R|W.[2C_>dW-\u00198\u0002\u000f\u0015D\u0018\u000e^3eA\u0005)1/\u001a;VaR\tq\t\u0005\u00020\u0011&\u0011\u0011\n\r\u0002\u0005+:LG\u000f\u000b\u0002\f\u0017B\u0011A\nV\u0007\u0002\u001b*\u0011!D\u0014\u0006\u0003\u001fB\u000bqA[;qSR,'O\u0003\u0002R%\u0006)!.\u001e8ji*\t1+A\u0002pe\u001eL!!V'\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007.\u0001\u0005uK\u0006\u0014Hi\\<oQ\ta\u0001\f\u0005\u0002M3&\u0011!,\u0014\u0002\n\u0003\u001a$XM]#bG\"\f!\u0003^3ti\n\u000b7/[2SKR,g\u000e^5p]\"\u0012Q\"\u0018\t\u0003\u0019zK!aX'\u0003\tQ+7\u000f^\u0001\u001bo\u0006LG/\u00168uS2,\u0015/^1m\u001f:\fE\u000e\u001c\"s_.,'o\u001d\u000b\u0004\u000f\n\u0004\b\"B2\u000f\u0001\u0004!\u0017!C2p[B,H/\u001a$o!\u0011ySmZ7\n\u0005\u0019\u0004$!\u0003$v]\u000e$\u0018n\u001c82!\tA7.D\u0001j\u0015\tQ7#\u0001\u0004tKJ4XM]\u0005\u0003Y&\u00141bS1gW\u0006\u001cVM\u001d<feB\u0011AE\\\u0005\u0003_\u0016\u0012aa\u00142kK\u000e$\b\"B9\u000f\u0001\u0004\u0011\u0018aA7tOB\u00111O\u001f\b\u0003ib\u0004\"!\u001e\u0019\u000e\u0003YT!a^\u000b\u0002\rq\u0012xn\u001c;?\u0013\tI\b'\u0001\u0004Qe\u0016$WMZ\u0005\u0003UmT!!\u001f\u0019\u0002\u0011\u0005<\u0018-\u001b;J'J#ba\u0012@\u0002\u0014\u0005U\u0001BB@\u0010\u0001\u0004\t\t!\u0001\u0002uaB!\u00111AA\b\u001b\t\t)A\u0003\u0003\u0002\b\u0005%\u0011AB2p[6|gNC\u0002\u0015\u0003\u0017Q1!!\u0004S\u0003\u0019\t\u0007/Y2iK&!\u0011\u0011CA\u0003\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:DQ!N\bA\u00029Ba!a\u0006\u0010\u0001\u00049\u0017A\u00027fC\u0012,'\u000f")
/* loaded from: input_file:kafka/tier/TierRetentionIntegrationTest.class */
public class TierRetentionIntegrationTest extends IntegrationTestHarness {
    private final String topic;
    private final int numPartitions;
    private final int numReplicas;
    private final AtomicBoolean exited;

    private String topic() {
        return this.topic;
    }

    private int numPartitions() {
        return this.numPartitions;
    }

    private int numReplicas() {
        return this.numReplicas;
    }

    @Override // kafka.api.IntegrationTestHarness
    public int brokerCount() {
        return 3;
    }

    public AtomicBoolean exited() {
        return this.exited;
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @BeforeEach
    public void setUp() {
        Exit.setExitProcedure((i, str) -> {
            this.exited().set(true);
        });
        super.setUp();
        createTopic(topic(), numPartitions(), numReplicas(), createTopic$default$4());
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @AfterEach
    public void tearDown() {
        super.tearDown();
        Assertions.assertFalse(exited().get());
    }

    @Test
    public void testBasicRetention() {
        int i;
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        ProducerRecord producerRecord = new ProducerRecord(topic(), Predef$.MODULE$.int2Integer(0), "key".getBytes(), "value".getBytes());
        int size = new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps((byte[]) producerRecord.key())).size() + new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps((byte[]) producerRecord.value())).size();
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        PartitionInfo partitionInfo = (PartitionInfo) ((IterableLike) CollectionConverters$.MODULE$.asScalaBufferConverter(createProducer.partitionsFor(topic())).asScala()).find(partitionInfo2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testBasicRetention$1(partitionInfo2));
        }).get();
        int id = partitionInfo.leader().id();
        int[] iArr = (int[]) new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps((int[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(partitionInfo.replicas())).map(node -> {
            return BoxesRunTime.boxToInteger(node.id());
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.Int())))).filter(i2 -> {
            return i2 != id;
        });
        KafkaServer kafkaServer = (KafkaServer) servers().find(kafkaServer2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testBasicRetention$4(id, kafkaServer2));
        }).get();
        int i3 = 0;
        while (true) {
            i = i3;
            if (i >= 1100) {
                break;
            }
            createProducer.send(producerRecord).get();
            i3 = i + size;
        }
        LogManager logManager = kafkaServer.logManager();
        AbstractLog abstractLog = (AbstractLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get();
        TierPartitionState tierPartitionState = abstractLog.tierPartitionState();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testBasicRetention$5(tierPartitionState)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$testBasicRetention$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$testBasicRetention$7(tierPartitionState)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$testBasicRetention$8());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
        Assertions.assertTrue(((TraversableOnce) CollectionConverters$.MODULE$.asScalaIteratorConverter(tierPartitionState.segments()).asScala()).nonEmpty());
        Assertions.assertEquals(0L, abstractLog.logStartOffset());
        Assertions.assertEquals(0L, ((TierObjectMetadata) ((Iterator) CollectionConverters$.MODULE$.asScalaIteratorConverter(tierPartitionState.segments()).asScala()).next()).baseOffset());
        awaitISR(topicPartition, numReplicas(), kafkaServer);
        killBroker(iArr[0]);
        while (i < 20000) {
            createProducer.send(producerRecord).get();
            i += size;
        }
        servers().foreach(kafkaServer3 -> {
            $anonfun$testBasicRetention$9(this, topicPartition, kafkaServer3);
            return BoxedUnit.UNIT;
        });
        killBroker(iArr[1]);
        while (i < 40000) {
            createProducer.send(producerRecord).get();
            i += size;
        }
        restartDeadBrokers();
        awaitISR(topicPartition, numReplicas(), kafkaServer);
        Function1 function1 = kafkaServer4 -> {
            LogManager logManager2 = kafkaServer4.logManager();
            return Long.toString(((AbstractLog) logManager2.getLog(topicPartition, logManager2.getLog$default$2()).get()).logStartOffset());
        };
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long waitUntilTrue$default$33 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$43 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$3 == null) {
            throw null;
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$waitUntilEqualOnAllBrokers$1(this, function1)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + waitUntilTrue$default$33) {
                Assertions.fail($anonfun$waitUntilEqualOnAllBrokers$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$33), waitUntilTrue$default$43));
        }
        Function1 function12 = kafkaServer5 -> {
            LogManager logManager2 = kafkaServer5.logManager();
            return Long.toString(((AbstractLog) logManager2.getLog(topicPartition, logManager2.getLog$default$2()).get()).logEndOffset());
        };
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        long waitUntilTrue$default$34 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$44 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$4 == null) {
            throw null;
        }
        long currentTimeMillis4 = System.currentTimeMillis();
        while (!$anonfun$waitUntilEqualOnAllBrokers$1(this, function12)) {
            if (System.currentTimeMillis() > currentTimeMillis4 + waitUntilTrue$default$34) {
                Assertions.fail($anonfun$waitUntilEqualOnAllBrokers$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$34), waitUntilTrue$default$44));
        }
    }

    private void waitUntilEqualOnAllBrokers(Function1<KafkaServer, Object> function1, String str) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitUntilEqualOnAllBrokers$1(this, function1)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$waitUntilEqualOnAllBrokers$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    private void awaitISR(TopicPartition topicPartition, int i, KafkaServer kafkaServer) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$awaitISR$1(kafkaServer, topicPartition, i)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$awaitISR$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    public static final /* synthetic */ boolean $anonfun$testBasicRetention$1(PartitionInfo partitionInfo) {
        return partitionInfo.partition() == 0;
    }

    public static final /* synthetic */ boolean $anonfun$testBasicRetention$4(int i, KafkaServer kafkaServer) {
        return kafkaServer.config().brokerId() == i;
    }

    public static final /* synthetic */ boolean $anonfun$testBasicRetention$5(TierPartitionState tierPartitionState) {
        return tierPartitionState.totalSize() > 0;
    }

    public static final /* synthetic */ String $anonfun$testBasicRetention$6() {
        return "Timed out waiting for segments to be tiered";
    }

    public static final /* synthetic */ boolean $anonfun$testBasicRetention$7(TierPartitionState tierPartitionState) {
        return tierPartitionState.committedEndOffset() > 0;
    }

    public static final /* synthetic */ String $anonfun$testBasicRetention$8() {
        return "Timed out waiting for tier partition state to be flushed";
    }

    public static final /* synthetic */ boolean $anonfun$testBasicRetention$10(AbstractLog abstractLog) {
        return abstractLog.logStartOffset() > 0;
    }

    public static final /* synthetic */ String $anonfun$testBasicRetention$11() {
        return "Timed out waiting for retention to kick in";
    }

    public static final /* synthetic */ boolean $anonfun$testBasicRetention$12(TierPartitionState tierPartitionState) {
        return BoxesRunTime.unboxToLong(((TraversableOnce) CollectionConverters$.MODULE$.asScalaIteratorConverter(tierPartitionState.segments()).asScala()).toList().headOption().map(tierObjectMetadata -> {
            return BoxesRunTime.boxToLong(tierObjectMetadata.baseOffset());
        }).getOrElse(() -> {
            return -1L;
        })) > 0;
    }

    public static final /* synthetic */ String $anonfun$testBasicRetention$15() {
        return "Timed out waiting for tiered segment to be deleted";
    }

    public static final /* synthetic */ void $anonfun$testBasicRetention$9(TierRetentionIntegrationTest tierRetentionIntegrationTest, TopicPartition topicPartition, KafkaServer kafkaServer) {
        if (tierRetentionIntegrationTest.alive()[tierRetentionIntegrationTest.servers().indexOf(kafkaServer)]) {
            LogManager logManager = kafkaServer.logManager();
            AbstractLog abstractLog = (AbstractLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get();
            TierPartitionState tierPartitionState = abstractLog.tierPartitionState();
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
            long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$ == null) {
                throw null;
            }
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$testBasicRetention$10(abstractLog)) {
                if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                    Assertions.fail($anonfun$testBasicRetention$11());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
            }
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
            long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
            if (testUtils$2 == null) {
                throw null;
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            while (!$anonfun$testBasicRetention$12(tierPartitionState)) {
                if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                    Assertions.fail($anonfun$testBasicRetention$15());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
            }
        }
    }

    public static final /* synthetic */ boolean $anonfun$waitUntilEqualOnAllBrokers$1(TierRetentionIntegrationTest tierRetentionIntegrationTest, Function1 function1) {
        return ((TraversableOnce) tierRetentionIntegrationTest.servers().map(kafkaServer -> {
            return function1.apply(kafkaServer);
        }, Buffer$.MODULE$.canBuildFrom())).toSet().size() == 1;
    }

    public static final /* synthetic */ String $anonfun$waitUntilEqualOnAllBrokers$3() {
        return "";
    }

    public static final /* synthetic */ boolean $anonfun$awaitISR$1(KafkaServer kafkaServer, TopicPartition topicPartition, int i) {
        return ((Partition) kafkaServer.replicaManager().onlinePartition(topicPartition).get()).inSyncReplicaIds().size() == i;
    }

    public static final /* synthetic */ String $anonfun$awaitISR$2() {
        return "Timed out waiting for replicas to join ISR";
    }

    public TierRetentionIntegrationTest() {
        serverConfig().setProperty(KafkaConfig$.MODULE$.TierFeatureProp(), "true");
        serverConfig().setProperty(KafkaConfig$.MODULE$.TierEnableProp(), "true");
        serverConfig().setProperty(KafkaConfig$.MODULE$.TierMetadataNumPartitionsProp(), "2");
        serverConfig().setProperty(KafkaConfig$.MODULE$.TierBackendProp(), "mock");
        serverConfig().setProperty(KafkaConfig$.MODULE$.TierFencedSegmentDeleteDelayMsProp(), "5");
        serverConfig().setProperty(KafkaConfig$.MODULE$.LogRetentionBytesProp(), "10000");
        serverConfig().setProperty(KafkaConfig$.MODULE$.LogSegmentBytesProp(), "1000");
        serverConfig().setProperty(KafkaConfig$.MODULE$.LogFlushSchedulerIntervalMsProp(), "10");
        serverConfig().setProperty(KafkaConfig$.MODULE$.TierPartitionStateCommitIntervalProp(), "10");
        serverConfig().setProperty(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), "10");
        this.topic = "foo";
        this.numPartitions = 1;
        this.numReplicas = 3;
        this.exited = new AtomicBoolean(false);
    }
}
