/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metadata.ingester;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.ingester.IngesterRecord;
import org.apache.kafka.metadata.ingester.IngestionWorker;
import org.apache.kafka.metadata.ingester.IngestionWorkerManager;
import org.apache.kafka.metadata.ingester.KafkaConsumerIngestionWorker;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class KafkaConsumerIngestionWorkerTest {
    static final String TEST_TOPIC = "test";

    @Test
    public void testStartupAndShutdown() throws Exception {
        KafkaConsumerIngestionWorkerTestEnv env = new KafkaConsumerIngestionWorkerTestEnv();
        env.close();
    }

    @Test
    public void testStartupCreateAndShutdown() throws Exception {
        try (KafkaConsumerIngestionWorkerTestEnv env = new KafkaConsumerIngestionWorkerTestEnv();){
            env.createWorker(Arrays.asList(0L, 0L, 0L));
        }
    }

    static IngesterRecord rec(int partitionId, long offset, String key) {
        return new IngesterRecord(offset, new TopicPartition(TEST_TOPIC, partitionId), offset, key, "value-for-" + key);
    }

    @Test
    public void testIncomingRecords() throws Exception {
        try (KafkaConsumerIngestionWorkerTestEnv env = new KafkaConsumerIngestionWorkerTestEnv();){
            env.createWorker(Arrays.asList(0L, 0L, 0L));
            env.consumer().produce(new TopicPartition(TEST_TOPIC, 0), Arrays.asList(KafkaConsumerIngestionWorkerTest.rec(0, 0L, "foo"), KafkaConsumerIngestionWorkerTest.rec(0, 1L, "bar")));
            Assertions.assertEquals((Object)KafkaConsumerIngestionWorkerTest.rec(0, 0L, "foo"), (Object)env.workerManager().incomingRecords.poll((long)5L, (TimeUnit)TimeUnit.SECONDS).record);
            Assertions.assertEquals((Object)KafkaConsumerIngestionWorkerTest.rec(0, 1L, "bar"), (Object)env.workerManager().incomingRecords.poll((long)5L, (TimeUnit)TimeUnit.SECONDS).record);
        }
    }

    @Test
    public void testThrottlingCausesPause() throws Exception {
        try (KafkaConsumerIngestionWorkerTestEnv env = new KafkaConsumerIngestionWorkerTestEnv();){
            env.createWorker(Arrays.asList(0L, 0L, 0L));
            env.consumer().produce(new TopicPartition(TEST_TOPIC, 0), Arrays.asList(KafkaConsumerIngestionWorkerTest.rec(0, 0L, "foo"), KafkaConsumerIngestionWorkerTest.rec(0, 1L, "bar"), KafkaConsumerIngestionWorkerTest.rec(0, 2L, "baz")));
            Assertions.assertEquals((Object)KafkaConsumerIngestionWorkerTest.rec(0, 0L, "foo"), (Object)env.workerManager().incomingRecords.poll((long)5L, (TimeUnit)TimeUnit.SECONDS).record);
            Assertions.assertEquals((Object)KafkaConsumerIngestionWorkerTest.rec(0, 1L, "bar"), (Object)env.workerManager().incomingRecords.poll((long)5L, (TimeUnit)TimeUnit.SECONDS).record);
            Assertions.assertEquals(Set.of(new TopicPartition(TEST_TOPIC, 0), new TopicPartition(TEST_TOPIC, 1), new TopicPartition(TEST_TOPIC, 2)), env.consumer().paused());
            Assertions.assertEquals((Object)KafkaConsumerIngestionWorkerTest.rec(0, 2L, "baz"), (Object)env.workerManager().incomingRecords.poll((long)5L, (TimeUnit)TimeUnit.SECONDS).record);
            TestUtils.waitForCondition(() -> env.consumer().paused().isEmpty(), (long)30000L, (String)"The consumer did not unpause partitions within the expected time.");
        }
    }

    static class KafkaConsumerIngestionWorkerTestEnv
    implements AutoCloseable {
        private FakeIngestionWorkerManager workerManager = new FakeIngestionWorkerManager(123L, 3);
        private FakeConsumer consumer = null;
        private IngestionWorker worker = null;

        KafkaConsumerIngestionWorkerTestEnv() {
        }

        FakeConsumer consumer() {
            return this.consumer;
        }

        FakeIngestionWorkerManager workerManager() {
            return this.workerManager;
        }

        void createWorker(List<Long> initialOffsets) {
            if (this.worker != null) {
                throw new IllegalStateException("There is already a worker.");
            }
            this.consumer = new FakeConsumer(KafkaConsumerIngestionWorkerTest.TEST_TOPIC, 3);
            HashMap configs = new HashMap();
            KafkaConsumerIngestionWorker.Factory factory = new KafkaConsumerIngestionWorker.Factory(KafkaConsumerIngestionWorkerTest.TEST_TOPIC, configs);
            factory.setConsumerFactory(c -> this.consumer);
            HashMap<TopicPartition, Long> factoryInitialOffsets = new HashMap<TopicPartition, Long>();
            for (int i = 0; i < initialOffsets.size(); ++i) {
                factoryInitialOffsets.put(new TopicPartition(KafkaConsumerIngestionWorkerTest.TEST_TOPIC, i), initialOffsets.get(i));
            }
            this.worker = factory.create((IngestionWorkerManager)this.workerManager, factoryInitialOffsets);
        }

        @Override
        public void close() throws Exception {
            if (this.worker != null) {
                this.worker.beginShutdown();
                this.worker = null;
                this.consumer = null;
                IncomingWorkerShutdown shutdown = this.workerManager.incomingShutdowns.poll(15L, TimeUnit.SECONDS);
                if (shutdown == null) {
                    throw new RuntimeException("Timed out waiting for worker shutdown.");
                }
                if (shutdown.shutdownException != null) {
                    if (shutdown.shutdownException instanceof Exception) {
                        throw (Exception)shutdown.shutdownException;
                    }
                    throw new RuntimeException(shutdown.shutdownException);
                }
            }
        }
    }

    static class FakeConsumer
    implements Consumer<String, String> {
        private final Map<TopicPartition, FakePartitionState> partitions = new HashMap<TopicPartition, FakePartitionState>();
        private boolean closed = false;

        FakeConsumer(String testTopicName, int testTopicNumParts) {
            for (int i = 0; i < testTopicNumParts; ++i) {
                this.partitions.put(new TopicPartition(testTopicName, i), new FakePartitionState());
            }
        }

        synchronized boolean closed() {
            return this.closed;
        }

        synchronized void produce(TopicPartition topicPartition, List<IngesterRecord> records) {
            FakePartitionState state = this.partitions.get(topicPartition);
            if (state == null) {
                throw new IllegalStateException("TopicPartition " + String.valueOf(topicPartition) + " not found.");
            }
            for (IngesterRecord record : records) {
                if (record.offset() <= state.highestRecordOffset()) {
                    throw new IllegalStateException("Can't produce record with offset " + record.offset() + " to partition with highest offset " + state.highestRecordOffset());
                }
                state.records.put(record.offset(), record);
            }
            this.notifyAll();
        }

        public synchronized Set<TopicPartition> assignment() {
            HashSet results = new HashSet();
            this.partitions.keySet().forEach(results::add);
            return Collections.unmodifiableSet(results);
        }

        public synchronized Set<String> subscription() {
            throw new UnsupportedOperationException();
        }

        public void subscribe(Collection<String> topics) {
            throw new UnsupportedOperationException();
        }

        public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
            throw new UnsupportedOperationException();
        }

        public synchronized void assign(Collection<TopicPartition> newPartitions) {
            this.partitions.entrySet().forEach(e -> {
                ((FakePartitionState)e.getValue()).assigned = newPartitions.contains(e.getKey());
            });
            this.notifyAll();
        }

        public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
            throw new UnsupportedOperationException();
        }

        public void subscribe(Pattern pattern) {
            throw new UnsupportedOperationException();
        }

        public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {
            throw new UnsupportedOperationException();
        }

        public void subscribe(SubscriptionPattern pattern) {
            throw new UnsupportedOperationException();
        }

        public void unsubscribe() {
        }

        public synchronized ConsumerRecords<String, String> poll(Duration timeout) {
            ConsumerRecords<String, String> records = this.pollRecords();
            if (!records.isEmpty()) {
                return records;
            }
            try {
                this.wait(timeout.toMillis());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return this.pollRecords();
        }

        synchronized ConsumerRecords<String, String> pollRecords() {
            HashMap<TopicPartition, List> found = new HashMap<TopicPartition, List>();
            for (Map.Entry<TopicPartition, FakePartitionState> entry : this.partitions.entrySet()) {
                Map.Entry<Long, IngesterRecord> recordEntry;
                FakePartitionState state = entry.getValue();
                if (!state.assigned || state.paused) continue;
                TopicPartition topicPartition = entry.getKey();
                while ((recordEntry = state.records.ceilingEntry(state.offset)) != null) {
                    found.computeIfAbsent(topicPartition, __ -> new ArrayList()).add(recordEntry.getValue().toConsumerRecord());
                    state.offset = recordEntry.getKey() + 1L;
                }
            }
            return new ConsumerRecords(found);
        }

        public synchronized void commitSync() {
            throw new UnsupportedOperationException();
        }

        public synchronized void commitSync(Duration timeout) {
            throw new UnsupportedOperationException();
        }

        public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
            throw new UnsupportedOperationException();
        }

        public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
            throw new UnsupportedOperationException();
        }

        public synchronized void commitAsync() {
            throw new UnsupportedOperationException();
        }

        public synchronized void commitAsync(OffsetCommitCallback callback) {
            throw new UnsupportedOperationException();
        }

        public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
            throw new UnsupportedOperationException();
        }

        public void registerMetricForSubscription(KafkaMetric metric) {
        }

        public void unregisterMetricFromSubscription(KafkaMetric metric) {
        }

        public synchronized void seek(TopicPartition partition, long offset) {
            FakePartitionState state = this.partitions.get(partition);
            if (state == null) {
                throw new IllegalStateException();
            }
            state.offset = offset;
        }

        public synchronized void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
            throw new UnsupportedOperationException();
        }

        public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
            partitions.forEach(partition -> this.seek((TopicPartition)partition, 0L));
        }

        public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
            throw new UnsupportedOperationException();
        }

        public synchronized long position(TopicPartition partition) {
            return this.position(partition, Duration.ZERO);
        }

        public synchronized long position(TopicPartition partition, Duration timeout) {
            FakePartitionState state = this.partitions.get(partition);
            if (state == null) {
                throw new IllegalStateException();
            }
            return state.offset;
        }

        public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set, Duration timeout) {
            throw new UnsupportedOperationException();
        }

        public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set) {
            throw new UnsupportedOperationException();
        }

        public synchronized Uuid clientInstanceId(Duration timeout) {
            throw new UnsupportedOperationException();
        }

        public synchronized Map<MetricName, ? extends Metric> metrics() {
            throw new UnsupportedOperationException();
        }

        public synchronized List<PartitionInfo> partitionsFor(String topic) {
            throw new UnsupportedOperationException();
        }

        public synchronized List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
            throw new UnsupportedOperationException();
        }

        public synchronized Map<String, List<PartitionInfo>> listTopics() {
            throw new UnsupportedOperationException();
        }

        public synchronized Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
            throw new UnsupportedOperationException();
        }

        public synchronized Set<TopicPartition> paused() {
            HashSet<TopicPartition> results = new HashSet<TopicPartition>();
            this.partitions.entrySet().forEach(entry -> {
                if (((FakePartitionState)entry.getValue()).paused) {
                    results.add((TopicPartition)entry.getKey());
                }
            });
            return results;
        }

        public synchronized void pause(Collection<TopicPartition> toPause) {
            for (TopicPartition topicPartition : toPause) {
                if (this.partitions.containsKey(topicPartition)) continue;
                throw new IllegalStateException("TopicPartition " + String.valueOf(topicPartition) + " not found.");
            }
            for (TopicPartition topicPartition : toPause) {
                FakePartitionState state = this.partitions.get(topicPartition);
                state.paused = true;
            }
        }

        public synchronized void resume(Collection<TopicPartition> toResume) {
            for (TopicPartition topicPartition : toResume) {
                if (this.partitions.containsKey(topicPartition)) continue;
                throw new IllegalStateException("TopicPartition " + String.valueOf(topicPartition) + " not found.");
            }
            for (TopicPartition topicPartition : toResume) {
                FakePartitionState state = this.partitions.get(topicPartition);
                state.paused = false;
            }
        }

        public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
            return null;
        }

        public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
            return this.offsetsForTimes(timestampsToSearch, Duration.ZERO);
        }

        public synchronized Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Duration timeout) {
            throw new UnsupportedOperationException();
        }

        public synchronized Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection) {
            throw new UnsupportedOperationException();
        }

        public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Duration timeout) {
            throw new UnsupportedOperationException();
        }

        public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection) {
            throw new UnsupportedOperationException();
        }

        public synchronized OptionalLong currentLag(TopicPartition topicPartition) {
            throw new UnsupportedOperationException();
        }

        public synchronized ConsumerGroupMetadata groupMetadata() {
            throw new UnsupportedOperationException();
        }

        public synchronized void enforceRebalance() {
            throw new UnsupportedOperationException();
        }

        public synchronized void enforceRebalance(String reason) {
            throw new UnsupportedOperationException();
        }

        public synchronized void close() {
            this.close(Duration.ZERO);
        }

        public synchronized void close(Duration timeout) {
            this.closed = true;
        }

        public synchronized void wakeup() {
            this.notifyAll();
        }
    }

    static class FakeIngestionWorkerManager
    implements IngestionWorkerManager {
        final long epoch;
        final int backpressureThreshold;
        final LogContext logContext;
        BlockingDeque<IncomingIngesterRecord> incomingRecords;
        BlockingDeque<IncomingWorkerShutdown> incomingShutdowns;

        FakeIngestionWorkerManager(long epoch, int backpressureThreshold) {
            this.epoch = epoch;
            this.backpressureThreshold = backpressureThreshold;
            this.logContext = new LogContext();
            this.incomingRecords = new LinkedBlockingDeque<IncomingIngesterRecord>();
            this.incomingShutdowns = new LinkedBlockingDeque<IncomingWorkerShutdown>();
        }

        public LogContext logContext() {
            return this.logContext;
        }

        public long epoch() {
            return this.epoch;
        }

        public boolean handleRecords(List<IngesterRecord> records) {
            records.forEach(r -> this.incomingRecords.add(new IncomingIngesterRecord((IngesterRecord)r)));
            return this.incomingRecords.size() < this.backpressureThreshold;
        }

        public void handleWorkerShutdownComplete(Throwable e) {
            this.incomingShutdowns.add(new IncomingWorkerShutdown(e));
        }
    }

    static class IncomingIngesterRecord {
        final IngesterRecord record;

        IncomingIngesterRecord(IngesterRecord record) {
            this.record = record;
        }
    }

    static class IncomingWorkerShutdown {
        final Throwable shutdownException;

        IncomingWorkerShutdown(Throwable shutdownException) {
            this.shutdownException = shutdownException;
        }
    }

    static class FakePartitionState {
        TreeMap<Long, IngesterRecord> records = new TreeMap();
        boolean assigned = false;
        boolean paused = false;
        long offset = -1L;

        FakePartitionState() {
        }

        long highestRecordOffset() {
            if (this.records.isEmpty()) {
                return -1L;
            }
            return this.records.lastKey();
        }
    }
}

