package kafka.tier;

import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.management.Attribute;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import kafka.api.IntegrationTestHarness;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.tier.fetcher.TierFetcher;
import kafka.tier.state.TierPartitionState;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.MatchError;
import scala.Predef$;
import scala.Some;
import scala.collection.LinearSeqOptimized;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: TierIntegrationTransactionTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005mb\u0001B\u000b\u0017\u0001mAQA\t\u0001\u0005\u0002\rBQA\n\u0001\u0005R\u001dBQA\f\u0001\u0005\n=Bq\u0001\u000f\u0001C\u0002\u0013%\u0011\b\u0003\u0004>\u0001\u0001\u0006IA\u000f\u0005\b}\u0001\u0011\r\u0011\"\u0003(\u0011\u0019y\u0004\u0001)A\u0005Q!9\u0001\t\u0001a\u0001\n\u0013\t\u0005bB'\u0001\u0001\u0004%IA\u0014\u0005\u0007)\u0002\u0001\u000b\u0015\u0002\"\t\u000bU\u0003A\u0011\u0002,\t\u000f-\u0004!\u0019!C\u0001Y\"1q\u000f\u0001Q\u0001\n5DQ\u0001\u001f\u0001\u0005BeDa!!\u0003\u0001\t\u0003J\bbBA\n\u0001\u0011%\u0011Q\u0003\u0005\b\u0003?\u0001A\u0011BA\u0011\u0011\u001d\t9\u0003\u0001C\u0005\u0003SAa!a\f\u0001\t\u0013I\bBBA\u0019\u0001\u0011\u0005\u0011P\u0001\u0010US\u0016\u0014\u0018J\u001c;fOJ\fG/[8o)J\fgn]1di&|g\u000eV3ti*\u0011q\u0003G\u0001\u0005i&,'OC\u0001\u001a\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\u000f\u0011\u0005u\u0001S\"\u0001\u0010\u000b\u0005}A\u0012aA1qS&\u0011\u0011E\b\u0002\u0017\u0013:$Xm\u001a:bi&|g\u000eV3ti\"\u000b'O\\3tg\u00061A(\u001b8jiz\"\u0012\u0001\n\t\u0003K\u0001i\u0011AF\u0001\fEJ|7.\u001a:D_VtG/F\u0001)!\tIC&D\u0001+\u0015\u0005Y\u0013!B:dC2\f\u0017BA\u0017+\u0005\rIe\u000e^\u0001\u000eG>tg-[4ve\u0016lunY6\u0016\u0003A\u0002\"!\r\u001c\u000e\u0003IR!a\r\u001b\u0002\t1\fgn\u001a\u0006\u0002k\u0005!!.\u0019<b\u0013\t9$G\u0001\u0004PE*,7\r^\u0001\u0006i>\u0004\u0018nY\u000b\u0002uA\u0011\u0011gO\u0005\u0003yI\u0012aa\u0015;sS:<\u0017A\u0002;pa&\u001c\u0007%\u0001\u0006qCJ$\u0018\u000e^5p]N\f1\u0002]1si&$\u0018n\u001c8tA\u0005\t\u0002/\u0019:uSRLwN\u001c+p\u0019\u0016\fG-\u001a:\u0016\u0003\t\u0003Ba\u0011&)Q9\u0011A\t\u0013\t\u0003\u000b*j\u0011A\u0012\u0006\u0003\u000fj\ta\u0001\u0010:p_Rt\u0014BA%+\u0003\u0019\u0001&/\u001a3fM&\u00111\n\u0014\u0002\u0004\u001b\u0006\u0004(BA%+\u0003U\u0001\u0018M\u001d;ji&|g\u000eV8MK\u0006$WM]0%KF$\"a\u0014*\u0011\u0005%\u0002\u0016BA)+\u0005\u0011)f.\u001b;\t\u000fMK\u0011\u0011!a\u0001\u0005\u0006\u0019\u0001\u0010J\u0019\u0002%A\f'\u000f^5uS>tGk\u001c'fC\u0012,'\u000fI\u0001\u0010i>\u0004\u0018n\u0019)beRLG/[8ogV\tq\u000bE\u0002Y;\u0002t!!W.\u000f\u0005\u0015S\u0016\"A\u0016\n\u0005qS\u0013a\u00029bG.\fw-Z\u0005\u0003=~\u00131aU3r\u0015\ta&\u0006\u0005\u0002bS6\t!M\u0003\u0002dI\u000611m\\7n_:T!!G3\u000b\u0005\u0019<\u0017AB1qC\u000eDWMC\u0001i\u0003\ry'oZ\u0005\u0003U\n\u0014a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|g.\u0001\u0004fq&$X\rZ\u000b\u0002[B\u0011a.^\u0007\u0002_*\u0011\u0001/]\u0001\u0007CR|W.[2\u000b\u0005I\u001c\u0018AC2p]\u000e,(O]3oi*\u0011A\u000fN\u0001\u0005kRLG.\u0003\u0002w_\ni\u0011\t^8nS\u000e\u0014un\u001c7fC:\fq!\u001a=ji\u0016$\u0007%A\u0003tKR,\u0006\u000fF\u0001PQ\tq1\u0010E\u0002}\u0003\u000bi\u0011! \u0006\u0003?yT1a`A\u0001\u0003\u001dQW\u000f]5uKJT1!a\u0001h\u0003\u0015QWO\\5u\u0013\r\t9! \u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017\u0001\u0003;fCJ$un\u001e8)\u0007=\ti\u0001E\u0002}\u0003\u001fI1!!\u0005~\u0005%\te\r^3s\u000b\u0006\u001c\u0007.\u0001\bqe>$WoY3SK\u000e|'\u000fZ:\u0015\u000b=\u000b9\"a\u0007\t\r\u0005e\u0001\u00031\u0001)\u0003!q')\u0019;dQ\u0016\u001c\bBBA\u000f!\u0001\u0007\u0001&A\bsK\u000e|'\u000fZ:QKJ\u0014\u0015\r^2i\u0003i9W\r\u001e'fC\u0012,'OR8s)>\u0004\u0018n\u0019)beRLG/[8o)\rA\u00131\u0005\u0005\u0007\u0003K\t\u0002\u0019\u00011\u0002)1,\u0017\rZ3s)>\u0004\u0018n\u0019)beRLG/[8o\u0003]9\u0018-\u001b;V]RLGnU3h[\u0016tGo\u001d+jKJ,G\rF\u0002P\u0003WAa!!\f\u0013\u0001\u0004A\u0013AD7j]:+XnU3h[\u0016tGo]\u0001\u0012g&lW\u000f\\1uKJ+G/\u001a8uS>t\u0017a\n;fgR\f%o\u00195jm\u0016\fe\u000e\u001a$fi\u000eD7+\u001b8hY\u0016$v\u000e]5d!\u0006\u0014H/\u001b;j_:D3\u0001FA\u001b!\ra\u0018qG\u0005\u0004\u0003si(\u0001\u0002+fgR\u0004")
/* loaded from: input_file:kafka/tier/TierIntegrationTransactionTest.class */
public class TierIntegrationTransactionTest extends IntegrationTestHarness {
    private final String topic;
    private final int partitions;
    private Map<Object, Object> partitionToLeader;
    private final AtomicBoolean exited;

    @Override // kafka.api.IntegrationTestHarness
    public int brokerCount() {
        return 1;
    }

    private Object configureMock() {
        serverConfig().put(KafkaConfig$.MODULE$.TierBackendProp(), "mock");
        return serverConfig().put(KafkaConfig$.MODULE$.TierS3BucketProp(), "mybucket");
    }

    private String topic() {
        return this.topic;
    }

    private int partitions() {
        return this.partitions;
    }

    private Map<Object, Object> partitionToLeader() {
        return this.partitionToLeader;
    }

    private void partitionToLeader_$eq(Map<Object, Object> map) {
        this.partitionToLeader = map;
    }

    private Seq<TopicPartition> topicPartitions() {
        return (Seq) package$.MODULE$.Range().apply(0, partitions()).map(obj -> {
            return $anonfun$topicPartitions$1(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom());
    }

    public AtomicBoolean exited() {
        return this.exited;
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @BeforeEach
    public void setUp() {
        Exit.setExitProcedure((i, str) -> {
            this.exited().set(true);
        });
        super.setUp();
        Properties properties = new Properties();
        properties.put("confluent.tier.enable", "true");
        properties.put("segment.bytes", "10000");
        properties.put("confluent.tier.local.hotset.bytes", "5000");
        properties.put("retention.bytes", "-1");
        properties.put("retention.ms", "-1");
        partitionToLeader_$eq(createTopic(topic(), partitions(), 1, properties));
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @AfterEach
    public void tearDown() {
        super.tearDown();
        Assertions.assertFalse(exited().get());
    }

    private void produceRecords(int i, int i2) {
        Properties properties = new Properties();
        properties.put("transactional.id", "1");
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), properties);
        createProducer.initTransactions();
        try {
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), i).foreach$mVc$sp(i3 -> {
                boolean z = i3 % 2 == 0;
                String str = z ? "aborted" : "committed";
                IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), i2).map(obj -> {
                    return $anonfun$produceRecords$2(this, i2, i3, str, BoxesRunTime.unboxToInt(obj));
                }, IndexedSeq$.MODULE$.canBuildFrom());
                createProducer.beginTransaction();
                indexedSeq.foreach(producerRecord -> {
                    return (RecordMetadata) createProducer.send(producerRecord).get(10L, TimeUnit.SECONDS);
                });
                if (z) {
                    createProducer.abortTransaction();
                } else {
                    createProducer.commitTransaction();
                }
            });
        } finally {
            createProducer.close();
        }
    }

    private int getLeaderForTopicPartition(TopicPartition topicPartition) {
        return BoxesRunTime.unboxToInt(partitionToLeader().apply(BoxesRunTime.boxToInteger(topicPartition.partition())));
    }

    private void waitUntilSegmentsTiered(int i) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitUntilSegmentsTiered$1(this, i)) {
            if (System.currentTimeMillis() > currentTimeMillis + 60000) {
                Assertions.fail($anonfun$waitUntilSegmentsTiered$3(i));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(60000L), waitUntilTrue$default$4));
        }
    }

    private void simulateRetention() {
        topicPartitions().foreach(topicPartition -> {
            $anonfun$simulateRetention$1(this, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void testArchiveAndFetchSingleTopicPartition() {
        produceRecords(100, 100);
        waitUntilSegmentsTiered(10);
        simulateRetention();
        TopicPartition topicPartition = (TopicPartition) topicPartitions().head();
        String bootstrapServers = TestUtils$.MODULE$.bootstrapServers(servers(), listenerName());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("max.poll.records", "50000");
        properties.put("isolation.level", "read_committed");
        consumerConfig().setProperty("group.id", "foo");
        consumerConfig().setProperty("client.id", "foo");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        try {
            ArrayList arrayList = new ArrayList();
            arrayList.add(topicPartition);
            arrayList.add(new TopicPartition(topic(), 1));
            kafkaConsumer.assign(arrayList);
            kafkaConsumer.seekToBeginning(arrayList);
            final ArrayList arrayList2 = new ArrayList();
            while (arrayList2.size() < ((100 * 100) / 2) - 100) {
                final TierIntegrationTransactionTest tierIntegrationTransactionTest = null;
                kafkaConsumer.poll(Duration.ofMillis(1000L)).forEach(new Consumer<ConsumerRecord<String, String>>(tierIntegrationTransactionTest, arrayList2) { // from class: kafka.tier.TierIntegrationTransactionTest$$anon$1
                    private final ArrayList valuesRead$1;

                    @Override // java.util.function.Consumer
                    public Consumer<ConsumerRecord<String, String>> andThen(Consumer<? super ConsumerRecord<String, String>> consumer) {
                        return super.andThen(consumer);
                    }

                    @Override // java.util.function.Consumer
                    public void accept(ConsumerRecord<String, String> consumerRecord) {
                        Assertions.assertNotEquals(consumerRecord.key(), "aborted", "did not expect to find any aborted records");
                        this.valuesRead$1.add(BoxesRunTime.boxToInteger(Integer.parseInt((String) consumerRecord.value())));
                    }

                    {
                        this.valuesRead$1 = arrayList2;
                    }
                });
            }
            kafkaConsumer.close();
            MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
            List list = ((TraversableOnce) ((TraversableLike) CollectionConverters$.MODULE$.asScalaBufferConverter(platformMBeanServer.getAttributes(new ObjectName("kafka.server:type=TierFetcher"), new String[]{"BytesFetchedTotal"}).asList()).asScala()).map(attribute -> {
                return BoxesRunTime.boxToDouble($anonfun$testArchiveAndFetchSingleTopicPartition$1(attribute));
            }, Buffer$.MODULE$.canBuildFrom())).toList();
            Some unapplySeq = List$.MODULE$.unapplySeq(list);
            if (unapplySeq.isEmpty() || unapplySeq.get() == null || ((LinearSeqOptimized) unapplySeq.get()).lengthCompare(1) != 0) {
                throw new MatchError(list);
            }
            Assertions.assertTrue(BoxesRunTime.unboxToDouble(((LinearSeqOptimized) unapplySeq.get()).apply(0)) > ((double) 0), "tier fetch metric shows no data fetched from tiered storage");
            List list2 = ((TraversableOnce) ((TraversableLike) CollectionConverters$.MODULE$.asScalaBufferConverter(platformMBeanServer.getAttributes(new ObjectName("kafka.tier.tasks.archive:type=TierArchiver,name=BytesPerSec"), new String[]{"MeanRate"}).asList()).asScala()).map(attribute2 -> {
                return BoxesRunTime.boxToDouble($anonfun$testArchiveAndFetchSingleTopicPartition$2(attribute2));
            }, Buffer$.MODULE$.canBuildFrom())).toList();
            Some unapplySeq2 = List$.MODULE$.unapplySeq(list2);
            if (unapplySeq2.isEmpty() || unapplySeq2.get() == null || ((LinearSeqOptimized) unapplySeq2.get()).lengthCompare(1) != 0) {
                throw new MatchError(list2);
            }
            Assertions.assertTrue(BoxesRunTime.unboxToDouble(((LinearSeqOptimized) unapplySeq2.get()).apply(0)) > ((double) 0), "tier archiver mean rate shows no data uploaded to tiered storage");
            servers().foreach(kafkaServer -> {
                $anonfun$testArchiveAndFetchSingleTopicPartition$3(kafkaServer);
                return BoxedUnit.UNIT;
            });
        } catch (Throwable th) {
            kafkaConsumer.close();
            throw th;
        }
    }

    public static final /* synthetic */ TopicPartition $anonfun$topicPartitions$1(TierIntegrationTransactionTest tierIntegrationTransactionTest, int i) {
        return new TopicPartition(tierIntegrationTransactionTest.topic(), i);
    }

    public static final /* synthetic */ ProducerRecord $anonfun$produceRecords$2(TierIntegrationTransactionTest tierIntegrationTransactionTest, int i, int i2, String str, int i3) {
        return new ProducerRecord(tierIntegrationTransactionTest.topic(), (Integer) null, Predef$.MODULE$.long2Long(i2 + (1 * i3)), str.getBytes(StandardCharsets.UTF_8), String.valueOf(BoxesRunTime.boxToInteger((i * i2) + i3)).getBytes(StandardCharsets.UTF_8));
    }

    public static final /* synthetic */ boolean $anonfun$waitUntilSegmentsTiered$2(TierIntegrationTransactionTest tierIntegrationTransactionTest, int i, TopicPartition topicPartition) {
        LogManager logManager = ((KafkaServer) tierIntegrationTransactionTest.serverForId(tierIntegrationTransactionTest.getLeaderForTopicPartition(topicPartition)).get()).logManager();
        TierPartitionState tierPartitionState = ((AbstractLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get()).tierPartitionState();
        long endOffset = tierPartitionState.endOffset();
        long committedEndOffset = tierPartitionState.committedEndOffset();
        return endOffset > 0 && committedEndOffset > 0 && endOffset == committedEndOffset && tierPartitionState.numSegments() > i;
    }

    public static final /* synthetic */ boolean $anonfun$waitUntilSegmentsTiered$1(TierIntegrationTransactionTest tierIntegrationTransactionTest, int i) {
        return tierIntegrationTransactionTest.topicPartitions().forall(topicPartition -> {
            return BoxesRunTime.boxToBoolean($anonfun$waitUntilSegmentsTiered$2(tierIntegrationTransactionTest, i, topicPartition));
        });
    }

    public static final /* synthetic */ String $anonfun$waitUntilSegmentsTiered$3(int i) {
        return new StringBuilder(61).append("timeout waiting for at least ").append(i).append(" to be archived and materialized").toString();
    }

    public static final /* synthetic */ void $anonfun$simulateRetention$1(TierIntegrationTransactionTest tierIntegrationTransactionTest, TopicPartition topicPartition) {
        LogManager logManager = ((KafkaServer) tierIntegrationTransactionTest.serverForId(tierIntegrationTransactionTest.getLeaderForTopicPartition(topicPartition)).get()).replicaManager().logManager();
        Assertions.assertTrue(((AbstractLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get()).deleteOldSegments() > 0, "tiered segments should have been deleted");
    }

    public static final /* synthetic */ double $anonfun$testArchiveAndFetchSingleTopicPartition$1(Attribute attribute) {
        return BoxesRunTime.unboxToDouble(attribute.getValue());
    }

    public static final /* synthetic */ double $anonfun$testArchiveAndFetchSingleTopicPartition$2(Attribute attribute) {
        return BoxesRunTime.unboxToDouble(attribute.getValue());
    }

    public static final /* synthetic */ void $anonfun$testArchiveAndFetchSingleTopicPartition$3(KafkaServer kafkaServer) {
        Assertions.assertEquals(0L, ((TierFetcher) kafkaServer.tierFetcherOpt().get()).memoryTracker().leased(), new StringBuilder(54).append("expected leased TierFetcher memory for broker ").append(kafkaServer.config().brokerId()).append(" to be 0").toString());
    }

    public TierIntegrationTransactionTest() {
        serverConfig().put(KafkaConfig$.MODULE$.TierPartitionStateCommitIntervalProp(), "5");
        serverConfig().put(KafkaConfig$.MODULE$.TierEnableProp(), "false");
        serverConfig().put(KafkaConfig$.MODULE$.TierFeatureProp(), "true");
        serverConfig().put(KafkaConfig$.MODULE$.TierMetadataNumPartitionsProp(), "1");
        serverConfig().put(KafkaConfig$.MODULE$.TierMetadataReplicationFactorProp(), "1");
        serverConfig().put(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.toString(Integer.MAX_VALUE));
        serverConfig().put(KafkaConfig$.MODULE$.TierLocalHotsetBytesProp(), "0");
        serverConfig().put(KafkaConfig$.MODULE$.TransactionsTopicReplicationFactorProp(), "1");
        serverConfig().put(KafkaConfig$.MODULE$.TransactionsTopicMinISRProp(), "1");
        configureMock();
        this.topic = UUID.randomUUID().toString();
        this.partitions = 1;
        this.partitionToLeader = Predef$.MODULE$.Map().apply(Nil$.MODULE$);
        this.exited = new AtomicBoolean(false);
    }
}
