/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.jmh.fetcher;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import kafka.api.ApiVersion$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.DelayedOperations;
import kafka.cluster.IsrChangeListener;
import kafka.cluster.Partition;
import kafka.log.CleanerConfig;
import kafka.log.Defaults;
import kafka.log.LogAppendInfo;
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.log.TierLogComponents;
import kafka.server.AlterIsrManager;
import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions;
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
import kafka.server.OffsetAndEpoch;
import kafka.server.OffsetTruncationState;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.CachedConfigRepository;
import kafka.server.metadata.ConfigRepository;
import kafka.utils.KafkaScheduler;
import kafka.utils.Pool;
import kafka.utils.Scheduler;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.RecordsSend;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import scala.Option;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.HashSet;
import scala.collection.immutable.Set;
import scala.collection.mutable.HashMap;
import scala.compat.java8.OptionConverters;

@State(value=Scope.Benchmark)
@Fork(value=1)
@Warmup(iterations=5)
@Measurement(iterations=15)
@BenchmarkMode(value={Mode.AverageTime})
@OutputTimeUnit(value=TimeUnit.NANOSECONDS)
public class ReplicaFetcherThreadBenchmark {
    @Param(value={"100", "500", "1000", "5000"})
    private int partitionCount;
    private ReplicaFetcherThread fetcher;
    private LogManager logManager;
    private File logDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
    private KafkaScheduler scheduler = new KafkaScheduler(1, "scheduler", true);
    protected Pool<TopicPartition, Partition> pool = new Pool(Option.empty());

    @Setup(value=Level.Trial)
    public void setup() throws IOException {
        if (!this.logDir.mkdir()) {
            throw new IOException("error creating test directory");
        }
        this.scheduler.startup();
        Properties props = new Properties();
        props.put("zookeeper.connect", "127.0.0.1:9999");
        KafkaConfig config = new KafkaConfig((java.util.Map)props);
        LogConfig logConfig = ReplicaFetcherThreadBenchmark.createLogConfig();
        List<File> logDirs = Collections.singletonList(this.logDir);
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        LogDirFailureChannel logDirFailureChannel = (LogDirFailureChannel)Mockito.mock(LogDirFailureChannel.class);
        this.logManager = new LogManager((Seq)((Iterator)JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala()).toSeq(), (Seq)((Iterator)JavaConverters.asScalaIteratorConverter(new ArrayList().iterator()).asScala()).toSeq(), (ConfigRepository)new CachedConfigRepository(), logConfig, new CleanerConfig(0, 0L, 0.0, 0, 0, 0.0, 0L, false, Integer.MAX_VALUE, "MD5"), 1, 1000L, 10000L, 10000L, 1000, 1000L, Integer.MAX_VALUE, 60000, (Scheduler)this.scheduler, brokerTopicStats, new Metrics(), logDirFailureChannel, TierLogComponents.EMPTY(), Time.SYSTEM, true);
        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> initialFetched = new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>();
        HashMap initialFetchStates = new HashMap();
        for (int i = 0; i < this.partitionCount; ++i) {
            TopicPartition tp = new TopicPartition("topic", i);
            List<Integer> replicas = Arrays.asList(0, 1, 2);
            LeaderAndIsrRequestData.LeaderAndIsrPartitionState partitionState = new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setControllerEpoch(0).setLeader(0).setLeaderEpoch(0).setIsr(replicas).setZkVersion(1).setReplicas(replicas).setIsNew(true);
            IsrChangeListener isrChangeListener = (IsrChangeListener)Mockito.mock(IsrChangeListener.class);
            OffsetCheckpoints offsetCheckpoints = (OffsetCheckpoints)Mockito.mock(OffsetCheckpoints.class);
            Mockito.when((Object)offsetCheckpoints.fetch(this.logDir.getAbsolutePath(), tp)).thenReturn((Object)Option.apply((Object)0L));
            AlterIsrManager isrChannelManager = (AlterIsrManager)Mockito.mock(AlterIsrManager.class);
            Partition partition = new Partition(tp, 100L, ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM, isrChangeListener, (DelayedOperations)new DelayedOperationsMock(tp), (MetadataCache)Mockito.mock(MetadataCache.class), this.logManager, Option.empty(), Option.empty(), Option.empty(), isrChannelManager);
            partition.makeFollower(partitionState, offsetCheckpoints);
            this.pool.put((Object)tp, (Object)partition);
            initialFetchStates.put((Object)tp, (Object)new InitialFetchState(new BrokerEndPoint(3, "host", 3000), 0, 0L));
            BaseRecords fetched = new BaseRecords(){

                public int sizeInBytes() {
                    return 0;
                }

                public RecordsSend<? extends BaseRecords> toSend() {
                    return null;
                }

                public void release() {
                }
            };
            initialFetched.put(tp, new FetchResponse.PartitionData(Errors.NONE, 0L, 0L, 0L, new LinkedList(), fetched));
        }
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn((Object)brokerTopicStats);
        this.fetcher = this.createFetcherThread(config, replicaManager, this.pool);
        this.fetcher.addPartitions((Map)initialFetchStates);
        this.fetcher.doWork();
        this.fetcher.fetchSessionHandler().handleResponse(new FetchResponse(Errors.NONE, initialFetched, 0, 999));
    }

    protected ReplicaFetcherThread createFetcherThread(KafkaConfig config, ReplicaManager replicaManager, Pool<TopicPartition, Partition> pool) {
        return new ReplicaFetcherBenchThread(config, replicaManager, pool);
    }

    @TearDown(value=Level.Trial)
    public void tearDown() throws IOException {
        this.logManager.shutdown();
        this.scheduler.shutdown();
        Utils.delete((File)this.logDir);
    }

    @Benchmark
    public long testFetcher() {
        this.fetcher.doWork();
        return this.fetcher.fetcherStats().requestRate().count();
    }

    private static LogConfig createLogConfig() {
        Properties logProps = new Properties();
        logProps.put(LogConfig.SegmentMsProp(), (Object)Defaults.SegmentMs());
        logProps.put(LogConfig.SegmentBytesProp(), (Object)Defaults.SegmentSize());
        logProps.put(LogConfig.RetentionMsProp(), (Object)Defaults.RetentionMs());
        logProps.put(LogConfig.RetentionBytesProp(), (Object)Defaults.RetentionSize());
        logProps.put(LogConfig.SegmentJitterMsProp(), (Object)Defaults.SegmentJitterMs());
        logProps.put(LogConfig.CleanupPolicyProp(), Defaults.CleanupPolicy());
        logProps.put(LogConfig.MaxMessageBytesProp(), (Object)Defaults.MaxMessageSize());
        logProps.put(LogConfig.IndexIntervalBytesProp(), (Object)Defaults.IndexInterval());
        logProps.put(LogConfig.SegmentIndexBytesProp(), (Object)Defaults.MaxIndexSize());
        logProps.put(LogConfig.MessageFormatVersionProp(), Defaults.MessageFormatVersion());
        logProps.put(LogConfig.FileDeleteDelayMsProp(), (Object)Defaults.FileDeleteDelayMs());
        return LogConfig.apply((java.util.Map)logProps, (Set)new HashSet());
    }

    static class ReplicaFetcherBenchThread
    extends ReplicaFetcherThread {
        private final Pool<TopicPartition, Partition> pool;

        ReplicaFetcherBenchThread(KafkaConfig config, ReplicaManager replicaManager, Pool<TopicPartition, Partition> partitions) {
            super("name", 3, new BrokerEndPoint(3, "host", 3000), config, new FailedPartitions(), replicaManager, new Metrics(), Time.SYSTEM, new ReplicaQuota(){

                public boolean isQuotaExceeded() {
                    return false;
                }

                public void record(long value) {
                }

                public boolean isThrottled(TopicPartition topicPartition) {
                    return false;
                }
            }, Option.empty(), Option.empty(), (Map)new HashMap());
            this.pool = partitions;
        }

        public Option<Object> latestEpoch(TopicPartition topicPartition) {
            return Option.apply((Object)0);
        }

        public long logStartOffset(TopicPartition topicPartition) {
            return ((Partition)this.pool.get((Object)topicPartition)).localLogOrException().logStartOffset();
        }

        public long logEndOffset(TopicPartition topicPartition) {
            return 0L;
        }

        public void truncate(TopicPartition tp, OffsetTruncationState offsetTruncationState) {
        }

        public Option<OffsetAndEpoch> endOffsetForEpoch(TopicPartition topicPartition, int epoch) {
            return OptionConverters.toScala(Optional.of(new OffsetAndEpoch(0L, 0)));
        }

        public Option<LogAppendInfo> processPartitionData(TopicPartition topicPartition, long fetchOffset, FetchResponse.PartitionData partitionData) {
            return Option.empty();
        }

        public long fetchEarliestOffsetFromLeader(TopicPartition topicPartition, int currentLeaderEpoch) {
            return 0L;
        }

        public Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> fetchEpochEndOffsets(Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> partitions) {
            HashMap endOffsets = new HashMap();
            for (TopicPartition tp : partitions.keys()) {
                endOffsets.put((Object)tp, (Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(tp.partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(0).setEndOffset(100L));
            }
            return endOffsets;
        }

        public Map<TopicPartition, FetchResponse.PartitionData<Records>> fetchFromLeader(FetchRequest.Builder fetchRequest) {
            return new HashMap();
        }
    }

    private static class DelayedOperationsMock
    extends DelayedOperations {
        DelayedOperationsMock(TopicPartition topicPartition) {
            super(topicPartition, null, null, null);
        }

        public int numDelayedDelete() {
            return 0;
        }
    }
}

