package kafka.tier;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import kafka.integration.KafkaServerTestHarness;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.tier.exceptions.TierTopicConsumerRewindException;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.TierPartitionState;
import kafka.tier.state.TierPartitionStatus;
import kafka.tier.store.OpaqueData;
import kafka.tier.topic.TierTopicConsumer;
import kafka.tier.topic.TierTopicConsumerRewindPolicy;
import kafka.tier.topic.TierTopicManager;
import kafka.tier.topic.TierTopicPartitioner;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichLong$;

/* compiled from: TierTopicConsumerRewindIntegrationTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\u00055b\u0001B\u0007\u000f\u0001MAQA\u0007\u0001\u0005\u0002mAqA\b\u0001C\u0002\u0013\u0005q\u0004\u0003\u0004)\u0001\u0001\u0006I\u0001\t\u0005\bS\u0001\u0011\r\u0011\"\u0001+\u0011\u0019\t\u0004\u0001)A\u0005W!)!\u0007\u0001C!g!)!\t\u0001C\u0001\u0007\")q\u000e\u0001C\u0001a\")Q\u000f\u0001C\u0001m\")1\u0010\u0001C!y\"9\u0011\u0011\u0002\u0001\u0005\n\u0005-\u0001BBA\u0016\u0001\u0011%AP\u0001\u0014US\u0016\u0014Hk\u001c9jG\u000e{gn];nKJ\u0014Vm^5oI&sG/Z4sCRLwN\u001c+fgRT!a\u0004\t\u0002\tQLWM\u001d\u0006\u0002#\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u0015!\t)\u0002$D\u0001\u0017\u0015\t9\u0002#A\u0006j]R,wM]1uS>t\u0017BA\r\u0017\u0005YY\u0015MZ6b'\u0016\u0014h/\u001a:UKN$\b*\u0019:oKN\u001c\u0018A\u0002\u001fj]&$h\bF\u0001\u001d!\ti\u0002!D\u0001\u000f\u0003=yg/\u001a:sS\u0012Lgn\u001a)s_B\u001cX#\u0001\u0011\u0011\u0005\u00052S\"\u0001\u0012\u000b\u0005\r\"\u0013\u0001B;uS2T\u0011!J\u0001\u0005U\u00064\u0018-\u0003\u0002(E\tQ\u0001K]8qKJ$\u0018.Z:\u0002!=4XM\u001d:jI&tw\r\u0015:paN\u0004\u0013A\u00027pO\u0012K'/F\u0001,!\tas&D\u0001.\u0015\tqC%\u0001\u0002j_&\u0011\u0001'\f\u0002\u0005\r&dW-A\u0004m_\u001e$\u0015N\u001d\u0011\u0002\u001f\u001d,g.\u001a:bi\u0016\u001cuN\u001c4jON,\u0012\u0001\u000e\t\u0004kibT\"\u0001\u001c\u000b\u0005]B\u0014AC2pY2,7\r^5p]*\t\u0011(A\u0003tG\u0006d\u0017-\u0003\u0002<m\t\u00191+Z9\u0011\u0005u\u0002U\"\u0001 \u000b\u0005}\u0002\u0012AB:feZ,'/\u0003\u0002B}\tY1*\u00194lC\u000e{gNZ5h\u0003E!Xm\u001d;SK^Lg\u000e\u001a+p'R\f'\u000f\u001e\u000b\u0003\t\"\u0003\"!\u0012$\u000e\u0003aJ!a\u0012\u001d\u0003\tUs\u0017\u000e\u001e\u0005\u0006\u0013\u001e\u0001\rAS\u0001\u0007cV|'/^7\u0011\u0005-\u0013fB\u0001'Q!\ti\u0005(D\u0001O\u0015\ty%#\u0001\u0004=e>|GOP\u0005\u0003#b\na\u0001\u0015:fI\u00164\u0017BA*U\u0005\u0019\u0019FO]5oO*\u0011\u0011\u000b\u000f\u0015\u0003\u000fY\u0003\"a\u00161\u000e\u0003aS!!\u0017.\u0002\rA\f'/Y7t\u0015\tYF,A\u0004kkBLG/\u001a:\u000b\u0005us\u0016!\u00026v]&$(\"A0\u0002\u0007=\u0014x-\u0003\u0002b1\n\t\u0002+\u0019:b[\u0016$XM]5{K\u0012$Vm\u001d;)\t\u001d\u0019\u0017N\u001b\t\u0003I\u001el\u0011!\u001a\u0006\u0003Mb\u000b\u0001\u0002\u001d:pm&$WM]\u0005\u0003Q\u0016\u00141BV1mk\u0016\u001cv.\u001e:dK\u000691\u000f\u001e:j]\u001e\u001cHFA6nC\u0005a\u0017A\u0001>lC\u0005q\u0017!B6sC\u001a$\u0018A\u0005;fgR\u0014Vm^5oIR{wJ\u001a4tKR$\"\u0001R9\t\u000b%C\u0001\u0019\u0001&)\u0005!1\u0006\u0006\u0002\u0005dSRd#a[7\u0002#Q,7\u000f\u001e*fo&tGMR1jYV\u0014X\r\u0006\u0002Eo\")\u0011*\u0003a\u0001\u0015\"\u0012\u0011B\u0016\u0015\u0005\u0013\rL'\u0010\f\u0002l[\u0006AA/Z1s\t><h\u000eF\u0001EQ\tQa\u0010E\u0002��\u0003\u000bi!!!\u0001\u000b\u0007\u0005\r!,A\u0002ba&LA!a\u0002\u0002\u0002\tI\u0011I\u001a;fe\u0016\u000b7\r[\u0001\u0018g\u0016$X\u000f\u001d+jKJ$v\u000e]5d!\u0006\u0014H/\u001b;j_:$\"!!\u0004\u0011\u000f\u0015\u000by!a\u0005\u0002 %\u0019\u0011\u0011\u0003\u001d\u0003\rQ+\b\u000f\\33!\u0011\t)\"a\u0007\u000e\u0005\u0005]!bAA\rI\u0005!A.\u00198h\u0013\u0011\ti\"a\u0006\u0003\u000f%sG/Z4feB!\u0011\u0011EA\u0014\u001b\t\t\u0019CC\u0002\u0002&9\tQ\u0001^8qS\u000eLA!!\u000b\u0002$\t\tB+[3s)>\u0004\u0018nY\"p]N,X.\u001a:\u00023]\f\u0017\u000e\u001e$peRKWM](gMN,Go\u001d+p\r2,8\u000f\u001b")
/* loaded from: input_file:kafka/tier/TierTopicConsumerRewindIntegrationTest.class */
public class TierTopicConsumerRewindIntegrationTest extends KafkaServerTestHarness {
    private final Properties overridingProps = new Properties();
    private final File logDir;

    public Properties overridingProps() {
        return this.overridingProps;
    }

    public File logDir() {
        return this.logDir;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    /* renamed from: generateConfigs */
    public Seq<KafkaConfig> mo50generateConfigs() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        String zkConnectOrNull = zkConnectOrNull();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        Map<Object, String> map = (Map) Map$.MODULE$.apply(Nil$.MODULE$);
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        return (Seq) testUtils$.createBrokerConfigs(1, zkConnectOrNull, false, true, none$, none$2, none$3, true, false, false, false, map, 1, false, 1, (short) 1, 0, false).map(properties -> {
            return KafkaConfig$.MODULE$.fromProps(properties, this.overridingProps(), true);
        });
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testRewindToStart(String str) {
        Tuple2<Integer, TierTopicConsumer> tuple2 = setupTierTopicPartition();
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Integer num = (Integer) tuple2._1();
        TierTopicConsumer tierTopicConsumer = (TierTopicConsumer) tuple2._2();
        waitForTierOffsetsToFlush();
        ObjectRef create = ObjectRef.create(tierTopicConsumer.snapshotPositions());
        Assertions.assertTrue(((OffsetAndEpoch) ((java.util.Map) create.elem).get(num)).offset() > 0);
        boolean z = false;
        if (((java.util.Map) create.elem).keySet().size() < 2) {
            z = true;
        }
        if (z) {
            Assertions$.MODULE$.assertThrows(() -> {
                return tierTopicConsumer.rewindToStart(true, TierTopicConsumerRewindPolicy.FAIL_ON_MISSING_PARTITIONS);
            }, ClassTag$.MODULE$.apply(TierTopicConsumerRewindException.class), new Position("TierTopicConsumerRewindIntegrationTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 65));
            java.util.Map rewindToStart = tierTopicConsumer.rewindToStart(true, TierTopicConsumerRewindPolicy.SKIP_MISSING_PARTITIONS);
            ((KafkaBroker) brokers().last()).config().logDirs().foreach(str2 -> {
                $anonfun$testRewindToStart$2(rewindToStart, num, str2);
                return BoxedUnit.UNIT;
            });
        } else {
            tierTopicConsumer.rewindToStart(true, TierTopicConsumerRewindPolicy.FAIL_ON_MISSING_PARTITIONS);
        }
        create.elem = tierTopicConsumer.snapshotPositions();
        ((java.util.Map) create.elem).keySet().forEach(num2 -> {
            Assertions.assertEquals(0L, ((OffsetAndEpoch) ((java.util.Map) create.elem).get(num2)).offset());
        });
        tierTopicConsumer.shutdown();
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testRewindToOffset(String str) {
        Tuple2<Integer, TierTopicConsumer> tuple2 = setupTierTopicPartition();
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Integer num = (Integer) tuple2._1();
        TierTopicConsumer tierTopicConsumer = (TierTopicConsumer) tuple2._2();
        waitForTierOffsetsToFlush();
        java.util.Map snapshotPositions = tierTopicConsumer.snapshotPositions();
        OffsetAndEpoch offsetAndEpoch = (OffsetAndEpoch) snapshotPositions.get(num);
        Assertions.assertTrue(offsetAndEpoch.offset() > 0);
        HashMap hashMap = new HashMap();
        OffsetAndEpoch offsetAndEpoch2 = new OffsetAndEpoch(1L, offsetAndEpoch.epoch());
        hashMap.put(num, offsetAndEpoch2);
        tierTopicConsumer.rewind(hashMap, true, TierTopicConsumerRewindPolicy.FAIL_ON_MISSING_PARTITIONS);
        java.util.Map snapshotPositions2 = tierTopicConsumer.snapshotPositions();
        snapshotPositions2.keySet().forEach(num2 -> {
            if (num2 != null ? num2.equals(num) : num == null) {
                Assertions.assertEquals(offsetAndEpoch2.offset(), ((OffsetAndEpoch) snapshotPositions2.get(num)).offset());
            } else {
                Assertions.assertEquals(snapshotPositions.get(num2), snapshotPositions2.get(num2));
            }
        });
        tierTopicConsumer.shutdown();
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testRewindFailure(String str) {
        Tuple2<Integer, TierTopicConsumer> tuple2 = setupTierTopicPartition();
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Integer num = (Integer) tuple2._1();
        TierTopicConsumer tierTopicConsumer = (TierTopicConsumer) tuple2._2();
        waitForTierOffsetsToFlush();
        java.util.Map snapshotPositions = tierTopicConsumer.snapshotPositions();
        OffsetAndEpoch offsetAndEpoch = (OffsetAndEpoch) snapshotPositions.get(num);
        Assertions.assertTrue(offsetAndEpoch.offset() > 0);
        HashMap hashMap = new HashMap();
        hashMap.put(num, new OffsetAndEpoch(offsetAndEpoch.offset() + 1, offsetAndEpoch.epoch()));
        Assertions$.MODULE$.assertThrows(() -> {
            return tierTopicConsumer.rewind(hashMap, false, TierTopicConsumerRewindPolicy.FAIL_ON_MISSING_PARTITIONS);
        }, ClassTag$.MODULE$.apply(TierTopicConsumerRewindException.class), new Position("TierTopicConsumerRewindIntegrationTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 143));
        Assertions$.MODULE$.assertThrows(() -> {
            return tierTopicConsumer.rewind(hashMap, true, TierTopicConsumerRewindPolicy.FAIL_ON_MISSING_PARTITIONS);
        }, ClassTag$.MODULE$.apply(TierTopicConsumerRewindException.class), new Position("TierTopicConsumerRewindIntegrationTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 145));
        Assertions.assertTrue(Files.deleteIfExists(new File((String) ((KafkaBroker) brokers().last()).config().logDirs().last(), "/tier.offsets").toPath()));
        Assertions$.MODULE$.assertThrows(() -> {
            return tierTopicConsumer.rewind(hashMap, true, TierTopicConsumerRewindPolicy.FAIL_ON_MISSING_PARTITIONS);
        }, ClassTag$.MODULE$.apply(TierTopicConsumerRewindException.class), new Position("TierTopicConsumerRewindIntegrationTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 148));
        Assertions.assertEquals(snapshotPositions, tierTopicConsumer.snapshotPositions());
        tierTopicConsumer.shutdown();
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.server.QuorumTestHarness
    @AfterEach
    public void tearDown() {
        super.tearDown();
        TestUtils$.MODULE$.verifyNoUnexpectedThreads("@AfterEach");
    }

    private Tuple2<Integer, TierTopicConsumer> setupTierTopicPartition() {
        Properties properties = new Properties();
        properties.put("confluent.tier.enable", "true");
        properties.put("cleanup.policy", "delete");
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        createTopic("foo", 2, createTopic$default$3(), properties, createTopic$default$5(), createTopic$default$6());
        TierTopicConsumer tierTopicConsumer = (TierTopicConsumer) ((KafkaBroker) brokers().last()).tierTopicConsumerOpt().get();
        TierTopicManager tierTopicManager = (TierTopicManager) ((KafkaBroker) brokers().last()).tierTopicManagerOpt().get();
        LogManager logManager = ((KafkaBroker) brokers().last()).logManager();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!tierTopicManager.isReadyForWrites()) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("timed out waiting for TierTopicManager to be ready");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$setupTierTopicPartition$3(logManager, topicPartition)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + 15000) {
                Assertions.fail("InitLeader event not materialized.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
        TierPartitionState tierPartitionState = ((AbstractLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get()).tierPartitionState();
        TopicIdPartition topicIdPartition = (TopicIdPartition) tierPartitionState.topicIdPartition().get();
        int partitionId = new TierTopicPartitioner(2).partitionId(topicIdPartition);
        Assertions.assertEquals(TierPartitionState.AppendResult.ACCEPTED, TierTestUtils$.MODULE$.uploadWithMetadata(tierTopicManager, topicIdPartition, 0, UUID.randomUUID(), 0L, 1000L, 15000L, 0L, 100, false, true, false, tierPartitionState.lastLocalMaterializedSrcOffsetAndEpoch(), OpaqueData.ZEROED).get());
        tierPartitionState.flush();
        Assertions.assertEquals(1000L, tierPartitionState.committedEndOffset());
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        long currentTimeMillis3 = System.currentTimeMillis();
        while (!$anonfun$setupTierTopicPartition$6(tierPartitionState)) {
            if (System.currentTimeMillis() > currentTimeMillis3 + 500) {
                Assertions.fail("tierTopicManager consumers catchingUp timed out");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(500L), 100L));
        }
        return new Tuple2<>(Predef$.MODULE$.int2Integer(partitionId), tierTopicConsumer);
    }

    private void waitForTierOffsetsToFlush() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitForTierOffsetsToFlush$1(this)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("timed out while waiting for tier.offsets to be flushed to disk in all logDirs");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    public static final /* synthetic */ void $anonfun$testRewindToStart$2(java.util.Map map, Integer num, String str) {
        Set set = (Set) map.get(str);
        Assertions.assertFalse(set.contains(num));
        Assertions.assertFalse(set.isEmpty());
        Assertions.assertTrue(set.size() == 1);
    }

    public static final /* synthetic */ String $anonfun$setupTierTopicPartition$2() {
        return "timed out waiting for TierTopicManager to be ready";
    }

    public static final /* synthetic */ boolean $anonfun$setupTierTopicPartition$3(LogManager logManager, TopicPartition topicPartition) {
        Option map = logManager.getLog(topicPartition, logManager.getLog$default$2()).map(abstractLog -> {
            return abstractLog.tierPartitionState();
        });
        return map.isDefined() && ((TierPartitionState) map.get()).topicIdPartition().isPresent() && ((TierPartitionState) map.get()).tierEpoch() == 0;
    }

    public static final /* synthetic */ String $anonfun$setupTierTopicPartition$5() {
        return "InitLeader event not materialized.";
    }

    public static final /* synthetic */ boolean $anonfun$setupTierTopicPartition$6(TierPartitionState tierPartitionState) {
        TierPartitionStatus status = tierPartitionState.status();
        TierPartitionStatus tierPartitionStatus = TierPartitionStatus.ONLINE;
        return status == null ? tierPartitionStatus == null : status.equals(tierPartitionStatus);
    }

    public static final /* synthetic */ String $anonfun$setupTierTopicPartition$7() {
        return "tierTopicManager consumers catchingUp timed out";
    }

    public static final /* synthetic */ boolean $anonfun$waitForTierOffsetsToFlush$2(String str) {
        Path path = new File(str, "/tier.offsets").toPath();
        return Files.exists(path, new LinkOption[0]) && Files.lines(path).count() > 1;
    }

    public static final /* synthetic */ boolean $anonfun$waitForTierOffsetsToFlush$1(TierTopicConsumerRewindIntegrationTest tierTopicConsumerRewindIntegrationTest) {
        return ((KafkaBroker) tierTopicConsumerRewindIntegrationTest.brokers().last()).config().logDirs().forall(str -> {
            return BoxesRunTime.boxToBoolean($anonfun$waitForTierOffsetsToFlush$2(str));
        });
    }

    public static final /* synthetic */ String $anonfun$waitForTierOffsetsToFlush$3() {
        return "timed out while waiting for tier.offsets to be flushed to disk in all logDirs";
    }

    public TierTopicConsumerRewindIntegrationTest() {
        overridingProps().setProperty(KafkaConfig$.MODULE$.TierFeatureProp(), "true");
        overridingProps().setProperty(KafkaConfig$.MODULE$.TierMetadataNumPartitionsProp(), "2");
        overridingProps().setProperty(KafkaConfig$.MODULE$.TierMetadataReplicationFactorProp(), "1");
        overridingProps().setProperty(KafkaConfig$.MODULE$.TierBackendProp(), "mock");
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        this.logDir = TestUtils.tempDirectory((Path) null, (String) null);
    }
}
