/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.jmh.tier;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import kafka.log.LogConfig;
import kafka.log.LogSegment;
import kafka.server.KafkaConfig;
import kafka.tier.TopicIdPartition;
import kafka.tier.fetcher.PendingFetch;
import kafka.tier.fetcher.TierFetchMetadata;
import kafka.tier.fetcher.TierFetchResult;
import kafka.tier.fetcher.TierFetcher;
import kafka.tier.fetcher.TierFetcherConfig;
import kafka.tier.store.AzureBlockBlobTierObjectStore;
import kafka.tier.store.AzureBlockBlobTierObjectStoreConfig;
import kafka.tier.store.GcsTierObjectStore;
import kafka.tier.store.GcsTierObjectStoreConfig;
import kafka.tier.store.MockInMemoryTierObjectStore;
import kafka.tier.store.S3TierObjectStore;
import kafka.tier.store.S3TierObjectStoreConfig;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreConfig;
import kafka.utils.KafkaScheduler;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import scala.Option;
import scala.collection.JavaConverters;

@State(value=Scope.Benchmark)
@Fork(value=4)
@Warmup(iterations=5)
@Measurement(iterations=15)
@BenchmarkMode(value={Mode.AverageTime})
@OutputTimeUnit(value=TimeUnit.MILLISECONDS)
public class TierFetcherBenchmark {
    @Param(value={"500", "50000", "500000", "1000000", "2000000"})
    public static String approxBatchSize;
    @Param(value={"Mock"})
    public static String backend;
    @Param(value={""})
    public static String s3CredentialsFilePath;
    @Param(value={""})
    public static String s3AssumeRoleArn;
    @Param(value={""})
    public static String s3EndpointOverride;
    @Param(value={"500000"})
    public static String s3AutoAbortSize;
    @Param(value={"tiered-storage-s3-benchmarks"})
    public static String s3Bucket;
    @Param(value={"us-west-2"})
    public static String s3Region;
    @Param(value={""})
    public static String gcsCredentialsFilePath;
    @Param(value={"tiered-storage-gcs-benchmarks"})
    public static String gcsBucket;
    @Param(value={"us-west-2"})
    public static String gcsRegion;
    @Param(value={""})
    public static String azureBlockBlobCredentialsFilePath;
    @Param(value={"testblobcontainer"})
    public static String azureBlockBlobContainer;
    private static final int TARGET_SEGMENT_SIZE = 100000000;
    private static final int INDEX_INTERVAL_BYTES = 4096;
    private static final int SEGMENT_INDEX_BYTES = 10000000;
    private static final int TOTAL_FETCH_SIZE = 10000000;
    private static final int PARTITION_FETCH_MAX_BYTES = 1000000;

    private static MemoryRecords buildWithOffset(long baseOffset) {
        int approxBatchSizeInt = Integer.parseInt(approxBatchSize);
        ByteBuffer buffer = ByteBuffer.allocate(approxBatchSizeInt * 2);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)baseOffset);
        byte[] array = new byte[approxBatchSizeInt];
        Arrays.fill(array, (byte)-2);
        builder.appendWithOffset(baseOffset, baseOffset, null, array);
        return builder.build();
    }

    private static TierObjectStore getTierObjectStore() {
        MockInMemoryTierObjectStore tierObjectStore;
        switch (backend) {
            case "S3": {
                Properties props = new Properties();
                props.put(KafkaConfig.ZkConnectProp(), "IGNORED");
                props.put(KafkaConfig.TierS3BucketProp(), s3Bucket);
                props.put(KafkaConfig.TierS3RegionProp(), s3Region);
                props.put(KafkaConfig.TierS3AutoAbortThresholdBytesProp(), s3AutoAbortSize);
                if (!s3CredentialsFilePath.isEmpty()) {
                    props.put(KafkaConfig.TierS3CredFilePathProp(), s3CredentialsFilePath);
                }
                if (!s3AssumeRoleArn.isEmpty()) {
                    props.put(KafkaConfig.TierS3AssumeRoleArnProp(), s3AssumeRoleArn);
                }
                if (!s3EndpointOverride.isEmpty()) {
                    props.put(KafkaConfig.TierS3EndpointOverrideProp(), s3EndpointOverride);
                }
                KafkaConfig kafkaConfig = new KafkaConfig((Map)props);
                S3TierObjectStoreConfig objectStoreConfig = new S3TierObjectStoreConfig(Optional.of("mycluster"), kafkaConfig);
                tierObjectStore = new S3TierObjectStore(objectStoreConfig);
                break;
            }
            case "GCS": {
                Properties props = new Properties();
                props.put(KafkaConfig.ZkConnectProp(), "IGNORED");
                props.put(KafkaConfig.TierGcsBucketProp(), gcsBucket);
                props.put(KafkaConfig.TierGcsRegionProp(), gcsRegion);
                if (!gcsCredentialsFilePath.isEmpty()) {
                    props.put(KafkaConfig.TierGcsCredFilePathProp(), gcsCredentialsFilePath);
                }
                KafkaConfig kafkaConfig = new KafkaConfig((Map)props);
                GcsTierObjectStoreConfig objectStoreConfig = new GcsTierObjectStoreConfig(Optional.of("mycluster"), kafkaConfig);
                tierObjectStore = new GcsTierObjectStore(objectStoreConfig);
                break;
            }
            case "AzureBlockBlob": {
                Properties props = new Properties();
                props.put(KafkaConfig.ZkConnectProp(), "IGNORED");
                props.put(KafkaConfig.TierAzureBlockBlobContainerProp(), azureBlockBlobContainer);
                if (!azureBlockBlobCredentialsFilePath.isEmpty()) {
                    props.put(KafkaConfig.TierAzureBlockBlobCredFilePathProp(), azureBlockBlobCredentialsFilePath);
                }
                KafkaConfig kafkaConfig = new KafkaConfig((Map)props);
                AzureBlockBlobTierObjectStoreConfig objectStoreConfig = new AzureBlockBlobTierObjectStoreConfig(Optional.of("mycluster"), kafkaConfig);
                tierObjectStore = new AzureBlockBlobTierObjectStore(objectStoreConfig);
                break;
            }
            case "Mock": {
                tierObjectStore = new MockInMemoryTierObjectStore(TierObjectStoreConfig.createEmpty());
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported backend " + backend);
            }
        }
        return tierObjectStore;
    }

    @Benchmark
    @OutputTimeUnit(value=TimeUnit.MILLISECONDS)
    public int fetch(FetchState fetchState) {
        Metrics metrics = new Metrics();
        TierFetcherConfig fetcherConfig = new TierFetcherConfig();
        LogContext logContext = new LogContext("tierFetcher");
        TierFetcher tierFetcher = new TierFetcher(Time.SYSTEM, fetcherConfig, fetchState.tierObjectStore, (KafkaScheduler)Mockito.mock(KafkaScheduler.class), metrics, logContext);
        int sizeRead = 0;
        long nextOffset = 5L;
        while (sizeRead < 10000000) {
            TierFetchMetadata fetchMetadata = new TierFetchMetadata(fetchState.topicIdPartition.topicPartition(), nextOffset, Integer.valueOf(1000000), 10000000L, true, fetchState.objectMetadata, Option.empty(), 0L, (int)fetchState.finalSegmentSize);
            PendingFetch pending = tierFetcher.buildFetch(Collections.singletonList(fetchMetadata), IsolationLevel.READ_UNCOMMITTED, ignored -> {}, 0);
            pending.run();
            Map fetchResults = pending.finish();
            TierFetchResult fetchResult = (TierFetchResult)fetchResults.get(fetchState.topicIdPartition.topicPartition());
            RecordBatch finalBatch = null;
            for (RecordBatch batch : fetchResult.records.batches()) {
                sizeRead += batch.sizeInBytes();
                finalBatch = batch;
            }
            if (finalBatch == null) {
                throw new IllegalStateException("Failed to read a full batch. This usually happens due to a timeout, but it will mostly invalidate the benchmark.");
            }
            nextOffset = finalBatch.nextOffset();
        }
        tierFetcher.close();
        return sizeRead;
    }

    @State(value=Scope.Thread)
    public static class FetchState {
        private final TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        private TierObjectStore tierObjectStore;
        private TierObjectStore.ObjectMetadata objectMetadata;
        private LogSegment logSegment;
        private long finalSegmentSize;

        @Setup(value=Level.Trial)
        public void setupState() throws Exception {
            File logSegmentDir = new File(System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString());
            logSegmentDir.mkdir();
            Properties logProps = new Properties();
            logProps.put(LogConfig.IndexIntervalBytesProp(), (Object)4096);
            logProps.put(LogConfig.SegmentIndexBytesProp(), (Object)10000000);
            Set override = Collections.emptySet();
            LogConfig logConfig = LogConfig.apply((Map)logProps, (scala.collection.immutable.Set)((scala.collection.mutable.Set)JavaConverters.asScalaSetConverter(override).asScala()).toSet());
            this.logSegment = LogSegment.open((File)logSegmentDir, (long)0L, (LogConfig)logConfig, (Time)Time.SYSTEM, (boolean)false, (int)4096, (boolean)true, (String)"");
            File offsetIndexFile = this.logSegment.offsetIndex().file();
            File timestampIndexFile = this.logSegment.offsetIndex().file();
            File segmentFile = this.logSegment.log().file();
            while (segmentFile.length() < 100000000L) {
                long nextOffset = this.logSegment.readNextOffset();
                MemoryRecords batch = TierFetcherBenchmark.buildWithOffset(nextOffset);
                this.logSegment.append(((MutableRecordBatch)batch.batches().iterator().next()).lastOffset(), 1L, 1L, batch);
            }
            this.finalSegmentSize = segmentFile.length();
            this.logSegment.flush();
            this.logSegment.offsetIndex().flush();
            this.logSegment.offsetIndex().trimToValidSize();
            this.logSegment.timeIndex().flush();
            this.logSegment.timeIndex().trimToValidSize();
            this.tierObjectStore = TierFetcherBenchmark.getTierObjectStore();
            this.objectMetadata = new TierObjectStore.ObjectMetadata(this.topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false);
            this.tierObjectStore.putSegment(this.objectMetadata, segmentFile, offsetIndexFile, timestampIndexFile, Optional.empty(), Optional.empty(), Optional.empty());
        }

        @TearDown(value=Level.Trial)
        public void teardown() {
            this.logSegment.deleteIfExists();
        }
    }
}

