package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.SbkTopicUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.UnknownVersionException;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import kafka.log.LogConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaSampleStore.class */
public class KafkaSampleStore implements SampleStore {
    private static final int ADDITIONAL_WINDOW_TO_RETAIN_FACTOR = 2;
    private static final String DEFAULT_PARTITION_SAMPLE_STORE_TOPIC = "_confluent_balancer_partition_samples";
    private static final String DEFAULT_BROKER_SAMPLE_STORE_TOPIC = "_confluent_balancer_broker_samples";
    protected static final int DEFAULT_NUM_SAMPLE_LOADING_THREADS = 2;
    protected static final int DEFAULT_PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT = 32;
    protected static final int DEFAULT_BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT = 32;
    protected static final long DEFAULT_MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS = 3600000;
    protected static final long DEFAULT_MIN_BROKER_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS = 3600000;
    protected static final boolean DEFAULT_SKIP_SAMPLE_STORE_TOPIC_RACK_AWARENESS_CHECK = true;
    protected static final String PRODUCER_CLIENT_ID = "ConfluentBalancerSampleStoreProducer";
    protected static final String CONSUMER_CLIENT_ID = "ConfluentBalancerSampleStoreConsumer";
    protected static final String CONSUMER_GROUP = "ConfluentBalancerSampleStoreGroup";
    protected List<KafkaConsumer<byte[], byte[]>> _consumers;
    protected ExecutorService _metricProcessorExecutor;
    protected String _partitionMetricSampleStoreTopic;
    protected String _brokerMetricSampleStoreTopic;
    protected volatile double _loadingProgress;
    protected Producer<byte[], byte[]> _producer;
    protected volatile boolean _shutdown = false;
    protected boolean _skipSampleStoreTopicRackAwarenessCheck;
    public static final String PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG = "partition.metric.sample.store.topic";
    public static final String BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG = "broker.metric.sample.store.topic";
    public static final String NUM_SAMPLE_LOADING_THREADS_CONFIG = "num.sample.loading.threads";
    public static final String PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG = "partition.sample.store.topic.partition.count";
    public static final String BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG = "broker.sample.store.topic.partition.count";
    public static final String MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG = "min.partition.sample.store.topic.retention.time.ms";
    public static final String MIN_BROKER_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG = "min.broker.sample.store.topic.retention.time.ms";
    public static final String SKIP_SAMPLE_STORE_TOPIC_RACK_AWARENESS_CHECK_CONFIG = "skip.sample.store.topic.rack.awareness.check";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSampleStore.class);
    private static final String DEFAULT_CLEANUP_POLICY = LogConfig.Delete();
    private static final ConsumerRecords<byte[], byte[]> SHUTDOWN_RECORDS = new ConsumerRecords<>(Collections.emptyMap());
    private static final Duration SAMPLE_POLL_TIMEOUT = Duration.ofMillis(1000);
    private static final Random RANDOM = new Random();

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaSampleStore$MetricLoader.class */
    private class MetricLoader implements Runnable {
        private final SampleStore.SampleLoader _sampleLoader;
        private final AtomicLong _numLoadedSamples;
        private final AtomicLong _numPartitionMetricSamples;
        private final AtomicLong _numBrokerMetricSamples;
        private final AtomicLong _totalSamples;
        private final KafkaConsumer<byte[], byte[]> _consumer;

        MetricLoader(KafkaConsumer<byte[], byte[]> kafkaConsumer, SampleStore.SampleLoader sampleLoader, AtomicLong atomicLong, AtomicLong atomicLong2, AtomicLong atomicLong3, AtomicLong atomicLong4) {
            this._consumer = kafkaConsumer;
            this._sampleLoader = sampleLoader;
            this._numLoadedSamples = atomicLong;
            this._numPartitionMetricSamples = atomicLong2;
            this._numBrokerMetricSamples = atomicLong3;
            this._totalSamples = atomicLong4;
        }

        @Override // java.lang.Runnable
        public void run() {
            ConsumerRecords poll;
            try {
                prepareConsumerOffset();
                Map beginningOffsets = this._consumer.beginningOffsets(this._consumer.assignment());
                Map<TopicPartition, Long> endOffsets = this._consumer.endOffsets(this._consumer.assignment());
                KafkaSampleStore.LOG.debug("Loading beginning offsets: {}, loading end offsets: {}", beginningOffsets, endOffsets);
                for (Map.Entry entry : beginningOffsets.entrySet()) {
                    this._totalSamples.addAndGet(endOffsets.get(entry.getKey()).longValue() - ((Long) entry.getValue()).longValue());
                    KafkaSampleStore.this._loadingProgress = this._numLoadedSamples.get() / this._totalSamples.get();
                }
                while (!sampleLoadingFinished(endOffsets)) {
                    try {
                        try {
                            poll = this._consumer.poll(KafkaSampleStore.SAMPLE_POLL_TIMEOUT);
                        } catch (CorruptRecordException e) {
                            KafkaSampleStore.LOG.warn("Corrupt message detected, skipping forward");
                            for (TopicPartition topicPartition : this._consumer.assignment()) {
                                long position = this._consumer.position(topicPartition);
                                if (position < endOffsets.get(topicPartition).longValue()) {
                                    this._consumer.seek(topicPartition, position + 1);
                                }
                            }
                        }
                    } catch (Exception e2) {
                        if (KafkaSampleStore.this._shutdown) {
                            return;
                        } else {
                            KafkaSampleStore.LOG.error("Metric loader received exception:", e2);
                        }
                    }
                    if (poll == KafkaSampleStore.SHUTDOWN_RECORDS) {
                        KafkaSampleStore.LOG.trace("Metric loader received empty records");
                        return;
                    }
                    HashSet hashSet = new HashSet();
                    HashSet hashSet2 = new HashSet();
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        try {
                            if (consumerRecord.topic().equals(KafkaSampleStore.this._partitionMetricSampleStoreTopic)) {
                                PartitionMetricSample fromBytes = PartitionMetricSample.fromBytes((byte[]) consumerRecord.value());
                                hashSet.add(fromBytes);
                                KafkaSampleStore.LOG.trace("Loaded partition metric sample {}", fromBytes);
                            } else if (consumerRecord.topic().equals(KafkaSampleStore.this._brokerMetricSampleStoreTopic)) {
                                BrokerMetricSample fromBytes2 = BrokerMetricSample.fromBytes((byte[]) consumerRecord.value());
                                fromBytes2.close(consumerRecord.timestamp());
                                hashSet2.add(fromBytes2);
                                KafkaSampleStore.LOG.trace("Loaded broker metric sample {}", fromBytes2);
                            }
                        } catch (UnknownVersionException e3) {
                            KafkaSampleStore.LOG.warn("Ignoring sample due to", e3);
                        }
                    }
                    if (!hashSet.isEmpty() || !hashSet2.isEmpty()) {
                        this._sampleLoader.loadSamples(new MetricSampler.Samples(hashSet, hashSet2));
                        this._numPartitionMetricSamples.getAndAdd(hashSet.size());
                        this._numBrokerMetricSamples.getAndAdd(hashSet2.size());
                        KafkaSampleStore.this._loadingProgress = this._numLoadedSamples.addAndGet(poll.count()) / this._totalSamples.get();
                    }
                }
                KafkaSampleStore.LOG.info("Metric loader finished loading samples.");
            } catch (Throwable th) {
                KafkaSampleStore.LOG.warn("Encountered error when loading sample from Kafka.", th);
            }
        }

        private boolean sampleLoadingFinished(Map<TopicPartition, Long> map) {
            for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                long position = this._consumer.position(entry.getKey());
                if (position < entry.getValue().longValue()) {
                    KafkaSampleStore.LOG.debug("Partition {} is still lagging. Current position: {}, LEO: {}", new Object[]{entry.getKey(), Long.valueOf(position), entry.getValue()});
                    return false;
                }
            }
            return true;
        }

        private void prepareConsumerOffset() {
            HashMap hashMap = new HashMap(this._consumer.assignment().size());
            long currentTimeMillis = System.currentTimeMillis();
            for (TopicPartition topicPartition : this._consumer.assignment()) {
                if (topicPartition.topic().equals(KafkaSampleStore.this._brokerMetricSampleStoreTopic)) {
                    hashMap.put(topicPartition, Long.valueOf(currentTimeMillis - this._sampleLoader.brokerMonitoringPeriodMs()));
                } else {
                    hashMap.put(topicPartition, Long.valueOf(currentTimeMillis - this._sampleLoader.partitionMonitoringPeriodMs()));
                }
            }
            HashSet hashSet = new HashSet();
            for (Map.Entry entry : this._consumer.offsetsForTimes(hashMap).entrySet()) {
                if (entry.getValue() == null) {
                    hashSet.add(entry.getKey());
                } else {
                    this._consumer.seek((TopicPartition) entry.getKey(), ((OffsetAndTimestamp) entry.getValue()).offset());
                }
            }
            if (hashSet.size() > 0) {
                this._consumer.seekToBeginning(hashSet);
            }
        }
    }

    @Override // com.linkedin.cruisecontrol.common.CruiseControlConfigurable
    public void configure(Map<String, ?> map) {
        this._partitionMetricSampleStoreTopic = getPartitionMetricSampleStoreTopic(map);
        this._brokerMetricSampleStoreTopic = getBrokerMetricSampleStoreTopic(map);
        String str = (String) map.get(NUM_SAMPLE_LOADING_THREADS_CONFIG);
        int parseInt = (str == null || str.isEmpty()) ? 2 : Integer.parseInt(str);
        String str2 = (String) map.get(SKIP_SAMPLE_STORE_TOPIC_RACK_AWARENESS_CHECK_CONFIG);
        this._skipSampleStoreTopicRackAwarenessCheck = (str2 == null || str2.isEmpty()) ? true : Boolean.parseBoolean(str2);
        this._metricProcessorExecutor = Executors.newFixedThreadPool(parseInt, new KafkaCruiseControlThreadFactory("SampleStore", true, LOG));
        this._consumers = new ArrayList(parseInt);
        for (int i = 0; i < parseInt; i++) {
            this._consumers.add(createConsumer(map));
        }
        this._producer = createProducer(map);
        this._loadingProgress = -1.0d;
        SbkTopicUtils.checkTopicPropertiesMaybeCreate(getTopicConfig(this._partitionMetricSampleStoreTopic, map).setMinRetentionTimeMs(getMinPartitionRetentionTimeMs(map)).setPartitionCount(map, PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG, 32).build(), map);
        SbkTopicUtils.checkTopicPropertiesMaybeCreate(getTopicConfig(this._brokerMetricSampleStoreTopic, map).setMinRetentionTimeMs(getMinBrokerRetentionTimeMs(map)).setPartitionCount(map, BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG, 32).build(), map);
    }

    protected KafkaProducer<byte[], byte[]> createProducer(Map<String, ?> map) {
        Properties properties = new Properties();
        properties.putAll(KafkaCruiseControlUtils.filterProducerConfigs(map));
        properties.setProperty(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", (List) map.get(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG)));
        properties.setProperty(KafkaCruiseControlConfig.CLIENT_ID_CONFIG, PRODUCER_CLIENT_ID);
        properties.setProperty("linger.ms", "30000");
        properties.setProperty("retry.backoff.ms", String.valueOf(500L));
        properties.setProperty(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG, String.valueOf(500L));
        properties.setProperty("reconnect.backoff.max.ms", String.valueOf(KafkaCruiseControlUtils.RECONNECT_BACKOFF_MAX_MS_CONFIG));
        properties.setProperty("batch.size", "500000");
        properties.setProperty("buffer.memory", "67108864");
        properties.setProperty("retries", "5");
        properties.setProperty("compression.type", "gzip");
        properties.setProperty("key.serializer", ByteArraySerializer.class.getName());
        properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
        properties.setProperty(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG, map.get(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG).toString());
        return new KafkaProducer<>(properties);
    }

    public static KafkaConsumer<byte[], byte[]> createConsumer(Map<String, ?> map) {
        Properties properties = new Properties();
        properties.putAll(KafkaCruiseControlUtils.filterConsumerConfigs(map));
        long nextLong = RANDOM.nextLong();
        properties.setProperty(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", (List) map.get(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG)));
        properties.setProperty("group.id", CONSUMER_GROUP + nextLong);
        properties.setProperty(KafkaCruiseControlConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID + nextLong);
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("max.poll.records", Integer.toString(Integer.MAX_VALUE));
        properties.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        properties.setProperty(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG, map.get(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG).toString());
        return new KafkaConsumer<>(properties);
    }

    /* JADX WARN: Code restructure failed: missing block: B:12:0x0035, code lost:
    
        throw new io.confluent.databalancer.startup.StartupCheckInterruptedException();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public static void checkStartupCondition(com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig r5, java.util.concurrent.Semaphore r6) {
        /*
            r0 = r5
            java.util.Map r0 = r0.mergedConfigValues()
            r7 = r0
            r0 = 60
            r8 = r0
            r0 = 1
            r10 = r0
        Lc:
            r0 = r7
            boolean r0 = checkTopicsCreated(r0)
            if (r0 != 0) goto L54
            org.slf4j.Logger r0 = com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore.LOG
            java.lang.String r1 = "Waiting for {} seconds to ensure that sample store topics are created/exists."
            r2 = r10
            java.lang.Long r2 = java.lang.Long.valueOf(r2)
            r0.info(r1, r2)
            r0 = r6
            r1 = r10
            java.util.concurrent.TimeUnit r2 = java.util.concurrent.TimeUnit.SECONDS     // Catch: java.lang.InterruptedException -> L39
            boolean r0 = r0.tryAcquire(r1, r2)     // Catch: java.lang.InterruptedException -> L39
            if (r0 == 0) goto L36
            io.confluent.databalancer.startup.StartupCheckInterruptedException r0 = new io.confluent.databalancer.startup.StartupCheckInterruptedException     // Catch: java.lang.InterruptedException -> L39
            r1 = r0
            r1.<init>()     // Catch: java.lang.InterruptedException -> L39
            throw r0     // Catch: java.lang.InterruptedException -> L39
        L36:
            goto L45
        L39:
            r12 = move-exception
            io.confluent.databalancer.startup.StartupCheckInterruptedException r0 = new io.confluent.databalancer.startup.StartupCheckInterruptedException
            r1 = r0
            r2 = r12
            r1.<init>(r2)
            throw r0
        L45:
            r0 = 2
            r1 = r10
            long r0 = r0 * r1
            r1 = r8
            long r0 = java.lang.Math.min(r0, r1)
            r10 = r0
            goto Lc
        L54:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore.checkStartupCondition(com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig, java.util.concurrent.Semaphore):void");
    }

    private static SbkTopicUtils.SbkTopicConfigBuilder getTopicConfig(String str, Map<String, ?> map) {
        return new SbkTopicUtils.SbkTopicConfigBuilder().setTopic(str).setReplicationFactor(map, "topic.replication.factor", ConfluentConfigs.BALANCER_TOPICS_REPLICATION_FACTOR_DEFAULT.shortValue()).setCleanupPolicy(DEFAULT_CLEANUP_POLICY);
    }

    private static String getBrokerMetricSampleStoreTopic(Map<String, ?> map) {
        String str = (String) map.get(BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG);
        return (str == null || str.isEmpty()) ? DEFAULT_BROKER_SAMPLE_STORE_TOPIC : str;
    }

    private static String getPartitionMetricSampleStoreTopic(Map<String, ?> map) {
        String str = (String) map.get(PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG);
        return (str == null || str.isEmpty()) ? DEFAULT_PARTITION_SAMPLE_STORE_TOPIC : str;
    }

    private static long getMinPartitionRetentionTimeMs(Map<String, ?> map) {
        String str = (String) map.get(MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG);
        return Math.max((str == null || str.isEmpty()) ? 3600000L : Long.parseLong(str), ((Integer) map.get(KafkaCruiseControlConfig.NUM_PARTITION_METRICS_WINDOWS_CONFIG)).intValue() * 2 * ((Long) map.get(KafkaCruiseControlConfig.PARTITION_METRICS_WINDOW_MS_CONFIG)).longValue());
    }

    private static long getMinBrokerRetentionTimeMs(Map<String, ?> map) {
        String str = (String) map.get(MIN_BROKER_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG);
        return Math.max((str == null || str.isEmpty()) ? 3600000L : Long.parseLong(str), ((Integer) map.get(KafkaCruiseControlConfig.NUM_BROKER_METRICS_WINDOWS_CONFIG)).intValue() * 2 * ((Long) map.get(KafkaCruiseControlConfig.BROKER_METRICS_WINDOW_MS_CONFIG)).longValue());
    }

    static boolean checkTopicsCreated(Map<String, ?> map) {
        try {
            return SbkTopicUtils.checkTopicPropertiesMaybeCreate(getTopicConfig(getPartitionMetricSampleStoreTopic(map), map).setMinRetentionTimeMs(getMinPartitionRetentionTimeMs(map)).setPartitionCount(map, PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG, 32).build(), map) && SbkTopicUtils.checkTopicPropertiesMaybeCreate(getTopicConfig(getBrokerMetricSampleStoreTopic(map), map).setMinRetentionTimeMs(getMinBrokerRetentionTimeMs(map)).setPartitionCount(map, BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG, 32).build(), map);
        } catch (Exception e) {
            LOG.error("Error when checking for sample store topics.", e);
            return false;
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore
    public void storeSamples(MetricSampler.Samples samples) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (PartitionMetricSample partitionMetricSample : samples.partitionMetricSamples()) {
            this._producer.send(new ProducerRecord(this._partitionMetricSampleStoreTopic, (Integer) null, Long.valueOf(partitionMetricSample.sampleTime()), (Object) null, partitionMetricSample.toBytes()), (recordMetadata, exc) -> {
                if (exc == null) {
                    atomicInteger.incrementAndGet();
                } else {
                    LOG.error("Failed to produce partition metric sample for {} of timestamp {} due to exception", new Object[]{partitionMetricSample.entity().tp(), Long.valueOf(partitionMetricSample.sampleTime()), exc});
                }
            });
        }
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        Iterator<BrokerMetricSample> it = samples.brokerMetricSamples().iterator();
        while (it.hasNext()) {
            this._producer.send(new ProducerRecord(this._brokerMetricSampleStoreTopic, it.next().toBytes()), (recordMetadata2, exc2) -> {
                if (exc2 == null) {
                    atomicInteger2.incrementAndGet();
                } else {
                    LOG.error("Failed to produce model training sample due to exception", exc2);
                }
            });
        }
        this._producer.flush();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Stored {} partition metric samples and {} broker metric samples to Kafka", Integer.valueOf(atomicInteger.get()), Integer.valueOf(atomicInteger2.get()));
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore
    public void loadSamples(SampleStore.SampleLoader sampleLoader) {
        LOG.info("Starting loading samples.");
        long currentTimeMillis = System.currentTimeMillis();
        AtomicLong atomicLong = new AtomicLong(0L);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        AtomicLong atomicLong3 = new AtomicLong(0L);
        AtomicLong atomicLong4 = new AtomicLong(0L);
        try {
            try {
                prepareConsumers();
                Iterator<KafkaConsumer<byte[], byte[]>> it = this._consumers.iterator();
                while (it.hasNext()) {
                    this._metricProcessorExecutor.submit(new MetricLoader(it.next(), sampleLoader, atomicLong4, atomicLong, atomicLong2, atomicLong3));
                }
                this._metricProcessorExecutor.shutdown();
                this._metricProcessorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                this._consumers.forEach(kafkaConsumer -> {
                    kafkaConsumer.close(Duration.ofSeconds(0L));
                });
                try {
                    this._metricProcessorExecutor.awaitTermination(30000L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    LOG.warn("Interrupted during waiting for metrics processor to shutdown.");
                }
            } catch (Throwable th) {
                this._consumers.forEach(kafkaConsumer2 -> {
                    kafkaConsumer2.close(Duration.ofSeconds(0L));
                });
                try {
                    this._metricProcessorExecutor.awaitTermination(30000L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e2) {
                    LOG.warn("Interrupted during waiting for metrics processor to shutdown.");
                }
                throw th;
            }
        } catch (Exception e3) {
            LOG.error("Received exception when loading samples", e3);
            this._consumers.forEach(kafkaConsumer22 -> {
                kafkaConsumer22.close(Duration.ofSeconds(0L));
            });
            try {
                this._metricProcessorExecutor.awaitTermination(30000L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e4) {
                LOG.warn("Interrupted during waiting for metrics processor to shutdown.");
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        long partitionSampleCount = sampleLoader.partitionSampleCount();
        long brokerSampleCount = sampleLoader.brokerSampleCount();
        long j = atomicLong.get() - partitionSampleCount;
        long j2 = atomicLong2.get() - brokerSampleCount;
        Logger logger = LOG;
        Object[] objArr = new Object[5];
        objArr[0] = Long.valueOf(partitionSampleCount);
        objArr[1] = j > 0 ? String.format("(%d discarded)", Long.valueOf(j)) : "";
        objArr[2] = Long.valueOf(sampleLoader.brokerSampleCount());
        objArr[3] = j2 > 0 ? String.format("(%d discarded)", Long.valueOf(j2)) : "";
        objArr[4] = Long.valueOf(currentTimeMillis2 - currentTimeMillis);
        logger.info("Sample loading finished. Loaded {}{} partition metrics samples and {}{} broker metric samples in {} ms.", objArr);
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore
    public double sampleLoadingProgress() {
        return this._loadingProgress;
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore
    public void evictSamplesBefore(long j) {
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore
    public void close() {
        this._shutdown = true;
        this._producer.close(Duration.ofSeconds(0L));
    }

    private void prepareConsumers() {
        int size = this._consumers.size();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < size; i++) {
            arrayList.add(new ArrayList());
        }
        int i2 = 0;
        Iterator it = Arrays.asList(this._partitionMetricSampleStoreTopic, this._brokerMetricSampleStoreTopic).iterator();
        while (it.hasNext()) {
            for (PartitionInfo partitionInfo : this._consumers.get(0).partitionsFor((String) it.next())) {
                int i3 = i2;
                i2++;
                ((List) arrayList.get(i3 % size)).add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
        }
        for (int i4 = 0; i4 < size; i4++) {
            this._consumers.get(i4).assign((Collection) arrayList.get(i4));
        }
    }
}
