package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.SbkTopicUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.UnknownVersionException;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import kafka.log.LogConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaSampleStore.class */
public class KafkaSampleStore implements SampleStore {
    private static final int ADDITIONAL_WINDOW_TO_RETAIN_FACTOR = 2;
    protected static final String PRODUCER_CLIENT_ID = "ConfluentBalancerSampleStoreProducer";
    protected static final String CONSUMER_CLIENT_ID = "ConfluentBalancerSampleStoreConsumer";
    protected static final String CONSUMER_GROUP = "ConfluentBalancerSampleStoreGroup";
    protected List<KafkaConsumer<byte[], byte[]>> consumers;
    protected ExecutorService metricProcessorExecutor;
    protected String partitionMetricSampleStoreTopic;
    protected String brokerMetricSampleStoreTopic;
    protected volatile double loadingProgress;
    protected Producer<byte[], byte[]> producer;
    protected volatile boolean shutdown = false;
    protected Duration maxSampleLoadDuration;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSampleStore.class);
    private static final String DEFAULT_CLEANUP_POLICY = LogConfig.Delete();
    private static final ConsumerRecords<byte[], byte[]> SHUTDOWN_RECORDS = new ConsumerRecords<>(Collections.emptyMap());
    private static final Duration SAMPLE_POLL_TIMEOUT = Duration.ofMillis(1000);
    private static final Random RANDOM = new Random();

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaSampleStore$MetricLoader.class */
    private class MetricLoader implements Runnable {
        private final SampleStore.SampleLoader sampleLoader;
        private final AtomicLong numLoadedSamples;
        private final AtomicLong numPartitionMetricSamples;
        private final AtomicLong numBrokerMetricSamples;
        private final AtomicLong totalSamples;
        private final KafkaConsumer<byte[], byte[]> consumer;
        private final Duration maxLoadTime;

        MetricLoader(KafkaConsumer<byte[], byte[]> kafkaConsumer, SampleStore.SampleLoader sampleLoader, AtomicLong atomicLong, AtomicLong atomicLong2, AtomicLong atomicLong3, AtomicLong atomicLong4, Duration duration) {
            this.consumer = kafkaConsumer;
            this.sampleLoader = sampleLoader;
            this.numLoadedSamples = atomicLong;
            this.numPartitionMetricSamples = atomicLong2;
            this.numBrokerMetricSamples = atomicLong3;
            this.totalSamples = atomicLong4;
            this.maxLoadTime = duration;
        }

        @Override // java.lang.Runnable
        public void run() {
            ConsumerRecords poll;
            try {
                prepareConsumerOffset();
                Map beginningOffsets = this.consumer.beginningOffsets(this.consumer.assignment());
                Map<TopicPartition, Long> endOffsets = this.consumer.endOffsets(this.consumer.assignment());
                KafkaSampleStore.LOG.debug("Loading beginning offsets: {}, loading end offsets: {}", beginningOffsets, endOffsets);
                for (Map.Entry entry : beginningOffsets.entrySet()) {
                    this.totalSamples.addAndGet(endOffsets.get(entry.getKey()).longValue() - ((Long) entry.getValue()).longValue());
                    KafkaSampleStore.this.loadingProgress = this.numLoadedSamples.get() / this.totalSamples.get();
                }
                long nanoTime = System.nanoTime();
                while (!sampleLoadingFinished(endOffsets)) {
                    if (sampleLoadingDeadlineExceeded(nanoTime)) {
                        KafkaSampleStore.LOG.debug("Metric load time exceeded, exiting before completion.");
                        return;
                    }
                    try {
                        try {
                            poll = this.consumer.poll(KafkaSampleStore.SAMPLE_POLL_TIMEOUT);
                        } catch (CorruptRecordException e) {
                            KafkaSampleStore.LOG.warn("Corrupt message detected, skipping forward");
                            for (TopicPartition topicPartition : this.consumer.assignment()) {
                                long position = this.consumer.position(topicPartition);
                                if (position < endOffsets.get(topicPartition).longValue()) {
                                    this.consumer.seek(topicPartition, position + 1);
                                }
                            }
                        }
                    } catch (Exception e2) {
                        if (KafkaSampleStore.this.shutdown) {
                            return;
                        } else {
                            KafkaSampleStore.LOG.error("Metric loader received exception:", e2);
                        }
                    }
                    if (poll == KafkaSampleStore.SHUTDOWN_RECORDS) {
                        KafkaSampleStore.LOG.trace("Metric loader received empty records");
                        return;
                    }
                    HashSet hashSet = new HashSet();
                    HashSet hashSet2 = new HashSet();
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        try {
                            if (consumerRecord.topic().equals(KafkaSampleStore.this.partitionMetricSampleStoreTopic)) {
                                PartitionMetricSample fromBytes = PartitionMetricSample.fromBytes((byte[]) consumerRecord.value());
                                hashSet.add(fromBytes);
                                KafkaSampleStore.LOG.trace("Loaded partition metric sample {}", fromBytes);
                            } else if (consumerRecord.topic().equals(KafkaSampleStore.this.brokerMetricSampleStoreTopic)) {
                                BrokerMetricSample fromBytes2 = BrokerMetricSample.fromBytes((byte[]) consumerRecord.value());
                                fromBytes2.close(consumerRecord.timestamp());
                                hashSet2.add(fromBytes2);
                                KafkaSampleStore.LOG.trace("Loaded broker metric sample {}", fromBytes2);
                            }
                        } catch (UnknownVersionException e3) {
                            KafkaSampleStore.LOG.warn("Ignoring sample due to", e3);
                        }
                    }
                    if (!hashSet.isEmpty() || !hashSet2.isEmpty()) {
                        this.sampleLoader.loadSamples(new MetricSampler.Samples(hashSet, hashSet2));
                        this.numPartitionMetricSamples.getAndAdd(hashSet.size());
                        this.numBrokerMetricSamples.getAndAdd(hashSet2.size());
                        KafkaSampleStore.this.loadingProgress = this.numLoadedSamples.addAndGet(poll.count()) / this.totalSamples.get();
                    }
                }
                KafkaSampleStore.LOG.info("Metric loader finished loading samples.");
            } catch (Throwable th) {
                KafkaSampleStore.LOG.warn("Encountered error when loading sample from Kafka.", th);
            }
        }

        private boolean sampleLoadingFinished(Map<TopicPartition, Long> map) {
            for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                long position = this.consumer.position(entry.getKey());
                if (position < entry.getValue().longValue()) {
                    KafkaSampleStore.LOG.debug("Partition {} is still lagging. Current position: {}, LEO: {}", new Object[]{entry.getKey(), Long.valueOf(position), entry.getValue()});
                    return false;
                }
            }
            return true;
        }

        private boolean sampleLoadingDeadlineExceeded(long j) {
            return TimeUnit.MILLISECONDS.convert(j - System.nanoTime(), TimeUnit.NANOSECONDS) > this.maxLoadTime.toMillis();
        }

        private void prepareConsumerOffset() {
            HashMap hashMap = new HashMap(this.consumer.assignment().size());
            long currentTimeMillis = System.currentTimeMillis();
            for (TopicPartition topicPartition : this.consumer.assignment()) {
                if (topicPartition.topic().equals(KafkaSampleStore.this.brokerMetricSampleStoreTopic)) {
                    hashMap.put(topicPartition, Long.valueOf(currentTimeMillis - this.sampleLoader.brokerMonitoringPeriodMs()));
                } else {
                    hashMap.put(topicPartition, Long.valueOf(currentTimeMillis - this.sampleLoader.partitionMonitoringPeriodMs()));
                }
            }
            HashSet hashSet = new HashSet();
            for (Map.Entry entry : this.consumer.offsetsForTimes(hashMap).entrySet()) {
                if (entry.getValue() == null) {
                    hashSet.add(entry.getKey());
                } else {
                    KafkaSampleStore.LOG.debug("Seeking to offset {} for sample partition {}", Long.valueOf(((OffsetAndTimestamp) entry.getValue()).offset()), entry.getKey());
                    this.consumer.seek((TopicPartition) entry.getKey(), ((OffsetAndTimestamp) entry.getValue()).offset());
                }
            }
            if (hashSet.size() > 0) {
                KafkaSampleStore.LOG.debug("These sampling partitions had no recent messages, seeking to beginning: {}", hashSet);
                this.consumer.seekToBeginning(hashSet);
            }
        }
    }

    @Override // com.linkedin.cruisecontrol.common.CruiseControlConfigurable
    public void configure(Map<String, ?> map) {
        this.partitionMetricSampleStoreTopic = getPartitionMetricSampleStoreTopic(map);
        this.brokerMetricSampleStoreTopic = getBrokerMetricSampleStoreTopic(map);
        int intValue = ((Integer) map.get(KafkaCruiseControlConfig.NUM_SAMPLE_LOADING_THREADS_CONFIG)).intValue();
        this.metricProcessorExecutor = Executors.newFixedThreadPool(intValue, new KafkaCruiseControlThreadFactory("SampleStore", true, LOG));
        this.consumers = new ArrayList(intValue);
        for (int i = 0; i < intValue; i++) {
            this.consumers.add(createConsumer(map));
        }
        this.producer = createProducer(map);
        this.loadingProgress = -1.0d;
        this.maxSampleLoadDuration = Duration.ofMillis(((Long) map.get(KafkaCruiseControlConfig.MAX_SAMPLE_LOAD_TIME_MS_CONFIG)).longValue());
        SbkTopicUtils.checkTopicPropertiesMaybeCreate(getTopicConfig(this.partitionMetricSampleStoreTopic, map).setMinRetentionTimeMs(getMinPartitionRetentionTimeMs(map)).setPartitionCount(((Integer) map.get(KafkaCruiseControlConfig.PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG)).intValue()).build(), map);
        SbkTopicUtils.checkTopicPropertiesMaybeCreate(getTopicConfig(this.brokerMetricSampleStoreTopic, map).setMinRetentionTimeMs(getMinBrokerRetentionTimeMs(map)).setPartitionCount(((Integer) map.get(KafkaCruiseControlConfig.BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG)).intValue()).build(), map);
    }

    protected KafkaProducer<byte[], byte[]> createProducer(Map<String, ?> map) {
        Properties properties = new Properties();
        properties.putAll(KafkaCruiseControlUtils.filterProducerConfigs(map));
        properties.setProperty(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", (List) map.get(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG)));
        properties.setProperty(KafkaCruiseControlConfig.CLIENT_ID_CONFIG, PRODUCER_CLIENT_ID);
        properties.setProperty("linger.ms", "30000");
        properties.setProperty("retry.backoff.ms", String.valueOf(500L));
        properties.setProperty(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG, String.valueOf(500L));
        properties.setProperty("reconnect.backoff.max.ms", String.valueOf(KafkaCruiseControlUtils.RECONNECT_BACKOFF_MAX_MS_CONFIG));
        properties.setProperty("batch.size", "500000");
        properties.setProperty("buffer.memory", "67108864");
        properties.setProperty("retries", "5");
        properties.setProperty("compression.type", "gzip");
        properties.setProperty("enable.idempotence", "false");
        properties.setProperty("key.serializer", ByteArraySerializer.class.getName());
        properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
        properties.setProperty(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG, map.get(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG).toString());
        return new KafkaProducer<>(properties);
    }

    public static KafkaConsumer<byte[], byte[]> createConsumer(Map<String, ?> map) {
        Properties properties = new Properties();
        properties.putAll(KafkaCruiseControlUtils.filterConsumerConfigs(map));
        long nextLong = RANDOM.nextLong();
        properties.setProperty(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", (List) map.get(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG)));
        properties.setProperty("group.id", CONSUMER_GROUP + nextLong);
        properties.setProperty(KafkaCruiseControlConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID + nextLong);
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("max.poll.records", Integer.toString(Integer.MAX_VALUE));
        properties.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        properties.setProperty(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG, map.get(KafkaCruiseControlConfig.RECONNECT_BACKOFF_MS_CONFIG).toString());
        return new KafkaConsumer<>(properties);
    }

    /* JADX WARN: Code restructure failed: missing block: B:12:0x0035, code lost:
    
        throw new io.confluent.databalancer.startup.StartupCheckInterruptedException();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public static void checkStartupCondition(com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig r5, java.util.concurrent.Semaphore r6) {
        /*
            r0 = r5
            java.util.Map r0 = r0.mergedConfigValues()
            r7 = r0
            r0 = 60
            r8 = r0
            r0 = 1
            r10 = r0
        Lc:
            r0 = r7
            boolean r0 = checkTopicsCreated(r0)
            if (r0 != 0) goto L54
            org.slf4j.Logger r0 = com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore.LOG
            java.lang.String r1 = "Waiting for {} seconds to ensure that sample store topics are created/exists."
            r2 = r10
            java.lang.Long r2 = java.lang.Long.valueOf(r2)
            r0.info(r1, r2)
            r0 = r6
            r1 = r10
            java.util.concurrent.TimeUnit r2 = java.util.concurrent.TimeUnit.SECONDS     // Catch: java.lang.InterruptedException -> L39
            boolean r0 = r0.tryAcquire(r1, r2)     // Catch: java.lang.InterruptedException -> L39
            if (r0 == 0) goto L36
            io.confluent.databalancer.startup.StartupCheckInterruptedException r0 = new io.confluent.databalancer.startup.StartupCheckInterruptedException     // Catch: java.lang.InterruptedException -> L39
            r1 = r0
            r1.<init>()     // Catch: java.lang.InterruptedException -> L39
            throw r0     // Catch: java.lang.InterruptedException -> L39
        L36:
            goto L45
        L39:
            r12 = move-exception
            io.confluent.databalancer.startup.StartupCheckInterruptedException r0 = new io.confluent.databalancer.startup.StartupCheckInterruptedException
            r1 = r0
            r2 = r12
            r1.<init>(r2)
            throw r0
        L45:
            r0 = 2
            r1 = r10
            long r0 = r0 * r1
            r1 = r8
            long r0 = java.lang.Math.min(r0, r1)
            r10 = r0
            goto Lc
        L54:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore.checkStartupCondition(com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig, java.util.concurrent.Semaphore):void");
    }

    private static SbkTopicUtils.SbkTopicConfigBuilder getTopicConfig(String str, Map<String, ?> map) {
        return new SbkTopicUtils.SbkTopicConfigBuilder().setTopic(str).setReplicationFactor(map, "topic.replication.factor", ConfluentConfigs.BALANCER_TOPICS_REPLICATION_FACTOR_DEFAULT.shortValue()).setCleanupPolicy(DEFAULT_CLEANUP_POLICY);
    }

    private static String getBrokerMetricSampleStoreTopic(Map<String, ?> map) {
        return (String) map.get(KafkaCruiseControlConfig.BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG);
    }

    private static String getPartitionMetricSampleStoreTopic(Map<String, ?> map) {
        return (String) map.get(KafkaCruiseControlConfig.PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG);
    }

    private static long getMinPartitionRetentionTimeMs(Map<String, ?> map) {
        return Math.max(((Long) map.get(KafkaCruiseControlConfig.MIN_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG)).longValue(), ((Integer) map.get(KafkaCruiseControlConfig.NUM_PARTITION_METRICS_WINDOWS_CONFIG)).intValue() * 2 * ((Long) map.get(KafkaCruiseControlConfig.PARTITION_METRICS_WINDOW_MS_CONFIG)).longValue());
    }

    private static long getMinBrokerRetentionTimeMs(Map<String, ?> map) {
        return Math.max(((Long) map.get(KafkaCruiseControlConfig.MIN_BROKER_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS_CONFIG)).longValue(), ((Integer) map.get(KafkaCruiseControlConfig.NUM_BROKER_METRICS_WINDOWS_CONFIG)).intValue() * 2 * ((Long) map.get(KafkaCruiseControlConfig.BROKER_METRICS_WINDOW_MS_CONFIG)).longValue());
    }

    static boolean checkTopicsCreated(Map<String, ?> map) {
        try {
            return SbkTopicUtils.checkTopicPropertiesMaybeCreate(getTopicConfig(getPartitionMetricSampleStoreTopic(map), map).setMinRetentionTimeMs(getMinPartitionRetentionTimeMs(map)).setPartitionCount(((Integer) map.get(KafkaCruiseControlConfig.PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG)).intValue()).build(), map) && SbkTopicUtils.checkTopicPropertiesMaybeCreate(getTopicConfig(getBrokerMetricSampleStoreTopic(map), map).setMinRetentionTimeMs(getMinBrokerRetentionTimeMs(map)).setPartitionCount(((Integer) map.get(KafkaCruiseControlConfig.BROKER_SAMPLE_STORE_TOPIC_PARTITION_COUNT_CONFIG)).intValue()).build(), map);
        } catch (Exception e) {
            LOG.error("Error when checking for sample store topics.", e);
            return false;
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore
    public void storeSamples(MetricSampler.Samples samples) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (PartitionMetricSample partitionMetricSample : samples.partitionMetricSamples()) {
            this.producer.send(new ProducerRecord(this.partitionMetricSampleStoreTopic, (Integer) null, Long.valueOf(partitionMetricSample.sampleTime()), (Object) null, partitionMetricSample.toBytes()), (recordMetadata, exc) -> {
                if (exc == null) {
                    atomicInteger.incrementAndGet();
                } else {
                    LOG.error("Failed to produce partition metric sample for {} of timestamp {} due to exception", new Object[]{partitionMetricSample.entity().tp(), Long.valueOf(partitionMetricSample.sampleTime()), exc});
                }
            });
        }
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        Iterator<BrokerMetricSample> it = samples.brokerMetricSamples().iterator();
        while (it.hasNext()) {
            this.producer.send(new ProducerRecord(this.brokerMetricSampleStoreTopic, it.next().toBytes()), (recordMetadata2, exc2) -> {
                if (exc2 == null) {
                    atomicInteger2.incrementAndGet();
                } else {
                    LOG.error("Failed to produce model training sample due to exception", exc2);
                }
            });
        }
        this.producer.flush();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Stored {} partition metric samples and {} broker metric samples to Kafka", Integer.valueOf(atomicInteger.get()), Integer.valueOf(atomicInteger2.get()));
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore
    public void loadSamples(SampleStore.SampleLoader sampleLoader) {
        LOG.info("Starting loading samples.");
        long currentTimeMillis = System.currentTimeMillis();
        AtomicLong atomicLong = new AtomicLong(0L);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        AtomicLong atomicLong3 = new AtomicLong(0L);
        AtomicLong atomicLong4 = new AtomicLong(0L);
        try {
            try {
                prepareConsumers();
                Iterator<KafkaConsumer<byte[], byte[]>> it = this.consumers.iterator();
                while (it.hasNext()) {
                    this.metricProcessorExecutor.submit(new MetricLoader(it.next(), sampleLoader, atomicLong4, atomicLong, atomicLong2, atomicLong3, this.maxSampleLoadDuration));
                }
                KafkaCruiseControlUtils.executeSilently(this.metricProcessorExecutor, KafkaCruiseControlUtils.getExecutorShutdownConsumerWithTimeout(this.maxSampleLoadDuration.toMillis() + 2000));
                this.consumers.forEach(kafkaConsumer -> {
                    kafkaConsumer.close(Duration.ofSeconds(0L));
                });
            } catch (Exception e) {
                LOG.error("Received exception when loading samples", e);
                this.consumers.forEach(kafkaConsumer2 -> {
                    kafkaConsumer2.close(Duration.ofSeconds(0L));
                });
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            long partitionSampleCount = sampleLoader.partitionSampleCount();
            long brokerSampleCount = sampleLoader.brokerSampleCount();
            long j = atomicLong.get() - partitionSampleCount;
            long j2 = atomicLong2.get() - brokerSampleCount;
            Logger logger = LOG;
            Object[] objArr = new Object[5];
            objArr[0] = Long.valueOf(partitionSampleCount);
            objArr[1] = j > 0 ? String.format("(%d discarded)", Long.valueOf(j)) : "";
            objArr[2] = Long.valueOf(sampleLoader.brokerSampleCount());
            objArr[3] = j2 > 0 ? String.format("(%d discarded)", Long.valueOf(j2)) : "";
            objArr[4] = Long.valueOf(currentTimeMillis2 - currentTimeMillis);
            logger.info("Sample loading finished. Loaded {}{} partition metrics samples and {}{} broker metric samples in {} ms.", objArr);
        } catch (Throwable th) {
            this.consumers.forEach(kafkaConsumer22 -> {
                kafkaConsumer22.close(Duration.ofSeconds(0L));
            });
            throw th;
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore
    public double sampleLoadingProgress() {
        return this.loadingProgress;
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore
    public void evictSamplesBefore(long j) {
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.SampleStore, java.lang.AutoCloseable
    public void close() {
        this.shutdown = true;
        KafkaCruiseControlUtils.executeSilently(this.producer, producer -> {
            producer.close(Duration.ofSeconds(0L));
        });
    }

    private void prepareConsumers() {
        int size = this.consumers.size();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < size; i++) {
            arrayList.add(new ArrayList());
        }
        int i2 = 0;
        Iterator it = Arrays.asList(this.partitionMetricSampleStoreTopic, this.brokerMetricSampleStoreTopic).iterator();
        while (it.hasNext()) {
            for (PartitionInfo partitionInfo : this.consumers.get(0).partitionsFor((String) it.next())) {
                int i3 = i2;
                i2++;
                ((List) arrayList.get(i3 % size)).add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
        }
        for (int i4 = 0; i4 < size; i4++) {
            this.consumers.get(i4).assign((Collection) arrayList.get(i4));
        }
    }
}
