/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.log.remote.metadata.storage;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataTopicPartitioner;
import org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class ConsumerTaskTest {
    private final int numMetadataTopicPartitions = 5;
    private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(5);
    private final DummyEventHandler handler = new DummyEventHandler();
    private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, 5).boxed().map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
    private final Uuid topicId = Uuid.randomUuid();
    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
    private ConsumerTask consumerTask;
    private MockConsumer<byte[], byte[]> consumer;
    private Thread thread;

    @BeforeEach
    public void beforeEach() {
        Map offsets = this.remoteLogPartitions.stream().collect(Collectors.toMap(Function.identity(), e -> 0L));
        this.consumer = (MockConsumer)Mockito.spy((Object)new MockConsumer(OffsetResetStrategy.EARLIEST));
        this.consumer.updateBeginningOffsets(offsets);
        this.consumerTask = new ConsumerTask((RemotePartitionMetadataEventHandler)this.handler, this.partitioner, this.consumer, 10L, 300000L, (Time)new SystemTime());
        this.thread = new Thread((Runnable)this.consumerTask);
    }

    @AfterEach
    public void afterEach() throws InterruptedException {
        if (this.thread != null) {
            Assertions.assertDoesNotThrow(() -> this.consumerTask.close(), (String)"Close method threw exception");
            this.thread.join(10000L);
            Assertions.assertFalse((boolean)this.thread.isAlive(), (String)"Consumer task thread is still alive");
        }
    }

    @Test
    public void testCloseOnNoAssignment() throws InterruptedException {
        this.thread.start();
        Thread.sleep(10L);
        Assertions.assertDoesNotThrow(() -> this.consumerTask.close(), (String)"Close method threw exception");
    }

    @Test
    public void testIdempotentClose() {
        this.thread.start();
        this.consumerTask.close();
        this.consumerTask.close();
    }

    @Test
    public void testUserTopicIdPartitionEquals() {
        TopicIdPartition tpId = new TopicIdPartition(this.topicId, new TopicPartition("sample", 0));
        ConsumerTask.UserTopicIdPartition utp1 = new ConsumerTask.UserTopicIdPartition(tpId, Integer.valueOf(this.partitioner.metadataPartition(tpId)));
        ConsumerTask.UserTopicIdPartition utp2 = new ConsumerTask.UserTopicIdPartition(tpId, Integer.valueOf(this.partitioner.metadataPartition(tpId)));
        utp1.isInitialized = true;
        utp1.isAssigned = true;
        Assertions.assertFalse((boolean)utp2.isInitialized);
        Assertions.assertFalse((boolean)utp2.isAssigned);
        Assertions.assertEquals((Object)utp1, (Object)utp2);
    }

    @Test
    public void testAddAssignmentsForPartitions() throws InterruptedException {
        List<TopicIdPartition> idPartitions = this.getIdPartitions("sample", 3);
        Map endOffsets = idPartitions.stream().map(idp -> ConsumerTask.toRemoteLogPartition((int)this.partitioner.metadataPartition(idp))).collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b));
        this.consumer.updateEndOffsets(endOffsets);
        this.consumerTask.addAssignmentsForPartitions(new HashSet<TopicIdPartition>(idPartitions));
        this.thread.start();
        for (TopicIdPartition idPartition : idPartitions) {
            TestUtils.waitForCondition(() -> this.consumerTask.isUserPartitionAssigned(idPartition), (String)("Timed out waiting for " + idPartition + " to be assigned"));
            Assertions.assertTrue((boolean)this.consumerTask.isMetadataPartitionAssigned(this.partitioner.metadataPartition(idPartition)));
            Assertions.assertTrue((boolean)((Boolean)this.handler.isPartitionLoaded.get(idPartition)));
        }
    }

    @Test
    public void testRemoveAssignmentsForPartitions() throws InterruptedException {
        List<TopicIdPartition> allPartitions = this.getIdPartitions("sample", 3);
        Map endOffsets = allPartitions.stream().map(idp -> ConsumerTask.toRemoteLogPartition((int)this.partitioner.metadataPartition(idp))).collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b));
        this.consumer.updateEndOffsets(endOffsets);
        this.consumerTask.addAssignmentsForPartitions(new HashSet<TopicIdPartition>(allPartitions));
        this.thread.start();
        TopicIdPartition tpId = allPartitions.get(0);
        TestUtils.waitForCondition(() -> this.consumerTask.isUserPartitionAssigned(tpId), (String)("Timed out waiting for " + tpId + " to be assigned"));
        this.addRecord(this.consumer, this.partitioner.metadataPartition(tpId), tpId, 0L);
        TestUtils.waitForCondition(() -> this.consumerTask.readOffsetForMetadataPartition(this.partitioner.metadataPartition(tpId)).isPresent(), (String)"Couldn't read record");
        Set<TopicIdPartition> removePartitions = Collections.singleton(tpId);
        this.consumerTask.removeAssignmentsForPartitions(removePartitions);
        for (TopicIdPartition idPartition : allPartitions) {
            TestCondition condition = () -> removePartitions.contains(idPartition) == !this.consumerTask.isUserPartitionAssigned(idPartition);
            TestUtils.waitForCondition((TestCondition)condition, (String)("Timed out waiting for " + idPartition + " to be removed"));
        }
        for (TopicIdPartition removePartition : removePartitions) {
            TestUtils.waitForCondition(() -> this.handler.isPartitionCleared.containsKey(removePartition), (String)("Timed out waiting for " + removePartition + " to be cleared"));
        }
    }

    @Test
    public void testConcurrentPartitionAssignments() throws InterruptedException, ExecutionException {
        List<TopicIdPartition> allPartitions = this.getIdPartitions("sample", 100);
        Map endOffsets = allPartitions.stream().map(idp -> ConsumerTask.toRemoteLogPartition((int)this.partitioner.metadataPartition(idp))).collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b));
        this.consumer.updateEndOffsets(endOffsets);
        AtomicBoolean isAllPartitionsAssigned = new AtomicBoolean(false);
        CountDownLatch latch = new CountDownLatch(1);
        Thread assignor = new Thread(() -> {
            int partitionsAssigned = 0;
            for (TopicIdPartition partition : allPartitions) {
                if (partitionsAssigned == 50) {
                    try {
                        latch.await(1L, TimeUnit.MINUTES);
                    }
                    catch (InterruptedException e) {
                        Assertions.fail((String)e.getMessage());
                    }
                }
                this.consumerTask.addAssignmentsForPartitions(Collections.singleton(partition));
                ++partitionsAssigned;
            }
            isAllPartitionsAssigned.set(true);
        });
        Runnable consumerRunnable = () -> {
            try {
                while (!isAllPartitionsAssigned.get()) {
                    this.consumerTask.maybeWaitForPartitionAssignments();
                    latch.countDown();
                }
            }
            catch (Exception e) {
                Assertions.fail((String)e.getMessage());
            }
        };
        ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
        Future<?> future = consumerExecutor.submit(consumerRunnable);
        assignor.start();
        assignor.join();
        future.get();
    }

    @Test
    public void testCanProcessRecord() throws InterruptedException {
        Uuid topicId = Uuid.fromString((String)"Bp9TDduJRGa9Q5rlvCJOxg");
        TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0));
        TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1));
        TopicIdPartition tpId2 = new TopicIdPartition(topicId, new TopicPartition("sample", 2));
        Assertions.assertEquals((int)this.partitioner.metadataPartition(tpId0), (int)this.partitioner.metadataPartition(tpId1));
        Assertions.assertEquals((int)this.partitioner.metadataPartition(tpId0), (int)this.partitioner.metadataPartition(tpId2));
        int metadataPartition = this.partitioner.metadataPartition(tpId0);
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition((int)metadataPartition), 0L));
        Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
        this.consumerTask.addAssignmentsForPartitions(assignments);
        this.thread.start();
        TestUtils.waitForCondition(() -> this.consumerTask.isUserPartitionAssigned(tpId0), (String)("Timed out waiting for " + tpId0 + " to be assigned"));
        this.addRecord(this.consumer, metadataPartition, tpId0, 0L);
        this.addRecord(this.consumer, metadataPartition, tpId0, 1L);
        TestUtils.waitForCondition(() -> this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), (String)"Couldn't read record");
        Assertions.assertEquals((int)2, (int)this.handler.metadataCounter);
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
        TestUtils.waitForCondition(() -> this.consumerTask.isUserPartitionAssigned(tpId1), (String)("Timed out waiting for " + tpId1 + " to be assigned"));
        this.addRecord(this.consumer, metadataPartition, tpId1, 2L);
        TestUtils.waitForCondition(() -> this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(2L)), (String)"Couldn't read record");
        Assertions.assertEquals((int)3, (int)this.handler.metadataCounter);
        this.addRecord(this.consumer, metadataPartition, tpId2, 3L);
        TestUtils.waitForCondition(() -> this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(3L)), (String)"Couldn't read record");
        Assertions.assertEquals((int)3, (int)this.handler.metadataCounter);
    }

    @Test
    public void testCanReprocessSkippedRecords() throws InterruptedException {
        Uuid topicId = Uuid.fromString((String)"Bp9TDduJRGa9Q5rlvCJOxg");
        TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0));
        TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1));
        TopicIdPartition tpId3 = new TopicIdPartition(topicId, new TopicPartition("sample", 3));
        Assertions.assertEquals((int)this.partitioner.metadataPartition(tpId0), (int)this.partitioner.metadataPartition(tpId1));
        Assertions.assertNotEquals((int)this.partitioner.metadataPartition(tpId3), (int)this.partitioner.metadataPartition(tpId0));
        int metadataPartition = this.partitioner.metadataPartition(tpId0);
        int anotherMetadataPartition = this.partitioner.metadataPartition(tpId3);
        ((MockConsumer)Mockito.doAnswer(invocation -> {
            if (this.consumerTask.isUserPartitionAssigned(tpId1) && !this.consumerTask.isUserPartitionAssigned(tpId3)) {
                return ConsumerRecords.empty();
            }
            return invocation.callRealMethod();
        }).when(this.consumer)).poll((Duration)ArgumentMatchers.any());
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition((int)metadataPartition), 0L));
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition((int)anotherMetadataPartition), 0L));
        Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
        this.consumerTask.addAssignmentsForPartitions(assignments);
        this.thread.start();
        TestUtils.waitForCondition(() -> this.consumerTask.isUserPartitionAssigned(tpId0), (String)("Timed out waiting for " + tpId0 + " to be assigned"));
        this.addRecord(this.consumer, metadataPartition, tpId1, 0L);
        this.addRecord(this.consumer, metadataPartition, tpId0, 1L);
        TestUtils.waitForCondition(() -> this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), (String)"Couldn't read record");
        Assertions.assertEquals((int)1, (int)this.handler.metadataCounter);
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
        TestUtils.waitForCondition(() -> this.consumerTask.isUserPartitionAssigned(tpId1), (String)("Timed out waiting for " + tpId1 + " to be assigned"));
        HashSet<TopicIdPartition> partitions = new HashSet<TopicIdPartition>();
        partitions.add(tpId0);
        partitions.add(tpId3);
        this.consumerTask.addAssignmentsForPartitions(partitions);
        this.addRecord(this.consumer, metadataPartition, tpId1, 0L);
        this.addRecord(this.consumer, metadataPartition, tpId0, 1L);
        TestUtils.waitForCondition(() -> this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), (String)"Couldn't read record");
        TestUtils.waitForCondition(() -> this.handler.metadataCounter == 2, (String)"Couldn't read record");
    }

    @Test
    public void testMaybeMarkUserPartitionsAsReady() throws InterruptedException {
        TopicIdPartition tpId = this.getIdPartitions("hello", 1).get(0);
        int metadataPartition = this.partitioner.metadataPartition(tpId);
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition((int)metadataPartition), 2L));
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
        this.thread.start();
        TestUtils.waitForCondition(() -> this.consumerTask.isUserPartitionAssigned(tpId), (String)("Waiting for " + tpId + " to be assigned"));
        Assertions.assertTrue((boolean)this.consumerTask.isMetadataPartitionAssigned(metadataPartition));
        Assertions.assertFalse((boolean)this.handler.isPartitionInitialized.containsKey(tpId));
        IntStream.range(0, 5).forEach(offset -> this.addRecord(this.consumer, metadataPartition, tpId, offset));
        TestUtils.waitForCondition(() -> this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)), (String)"Couldn't read record");
        Assertions.assertTrue((boolean)((Boolean)this.handler.isPartitionInitialized.get(tpId)));
    }

    @ParameterizedTest
    @CsvSource(value={"0, 0", "500, 500"})
    public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long beginOffset, long endOffset) throws InterruptedException {
        TopicIdPartition tpId = this.getIdPartitions("world", 1).get(0);
        int metadataPartition = this.partitioner.metadataPartition(tpId);
        this.consumer.updateBeginningOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition((int)metadataPartition), beginOffset));
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition((int)metadataPartition), endOffset));
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
        this.thread.start();
        TestUtils.waitForCondition(() -> this.consumerTask.isUserPartitionAssigned(tpId), (String)("Waiting for " + tpId + " to be assigned"));
        Assertions.assertTrue((boolean)this.consumerTask.isMetadataPartitionAssigned(metadataPartition));
        TestUtils.waitForCondition(() -> this.handler.isPartitionInitialized.containsKey(tpId), (String)"should have initialized the partition");
        Assertions.assertFalse((boolean)this.consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent());
    }

    @Test
    public void testConcurrentAccess() throws InterruptedException {
        this.thread.start();
        CountDownLatch latch = new CountDownLatch(1);
        TopicIdPartition tpId = this.getIdPartitions("concurrent", 1).get(0);
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition((int)this.partitioner.metadataPartition(tpId)), 0L));
        Thread assignmentThread = new Thread(() -> {
            try {
                latch.await();
                this.consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
            }
            catch (InterruptedException e) {
                Assertions.fail((String)"Shouldn't have thrown an exception");
            }
        });
        Thread closeThread = new Thread(() -> {
            try {
                latch.await();
                this.consumerTask.close();
            }
            catch (InterruptedException e) {
                Assertions.fail((String)"Shouldn't have thrown an exception");
            }
        });
        assignmentThread.start();
        closeThread.start();
        latch.countDown();
        assignmentThread.join();
        closeThread.join();
    }

    @Test
    public void testConsumerShouldNotCloseOnRetriableError() throws InterruptedException {
        TopicIdPartition tpId = this.getIdPartitions("world", 1).get(0);
        int metadataPartition = this.partitioner.metadataPartition(tpId);
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition((int)metadataPartition), 1L));
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
        this.thread.start();
        TestUtils.waitForCondition(() -> this.consumerTask.isUserPartitionAssigned(tpId), (String)("Waiting for " + tpId + " to be assigned"));
        Assertions.assertTrue((boolean)this.consumerTask.isMetadataPartitionAssigned(metadataPartition));
        this.consumer.setPollException((KafkaException)new LeaderNotAvailableException("leader not available!"));
        this.addRecord(this.consumer, metadataPartition, tpId, 0L);
        this.consumer.setPollException((KafkaException)new TimeoutException("Not able to complete the operation within the timeout"));
        this.addRecord(this.consumer, metadataPartition, tpId, 1L);
        TestUtils.waitForCondition(() -> this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), (String)"Couldn't read record");
        Assertions.assertEquals((int)2, (int)this.handler.metadataCounter);
    }

    @Test
    public void testConsumerShouldCloseOnNonRetriableError() throws InterruptedException {
        TopicIdPartition tpId = this.getIdPartitions("world", 1).get(0);
        int metadataPartition = this.partitioner.metadataPartition(tpId);
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition((int)metadataPartition), 1L));
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
        this.thread.start();
        TestUtils.waitForCondition(() -> this.consumerTask.isUserPartitionAssigned(tpId), (String)("Waiting for " + tpId + " to be assigned"));
        Assertions.assertTrue((boolean)this.consumerTask.isMetadataPartitionAssigned(metadataPartition));
        this.consumer.setPollException((KafkaException)new AuthorizationException("Unauthorized to read the topic!"));
        TestUtils.waitForCondition(() -> this.consumer.closed(), (String)"Should close the consume on non-retriable error");
    }

    private void addRecord(MockConsumer<byte[], byte[]> consumer, int metadataPartition, TopicIdPartition idPartition, long recordOffset) {
        RemoteLogSegmentId segmentId = new RemoteLogSegmentId(idPartition, Uuid.randomUuid());
        RemoteLogSegmentMetadata metadata = new RemoteLogSegmentMetadata(segmentId, 0L, 1L, 0L, 0, 0L, 1, Collections.singletonMap(0, 0L));
        ConsumerRecord record = new ConsumerRecord("__remote_log_metadata", metadataPartition, recordOffset, null, (Object)this.serde.serialize((RemoteLogMetadata)metadata));
        consumer.addRecord(record);
    }

    private List<TopicIdPartition> getIdPartitions(String topic, int partitionCount) {
        ArrayList<TopicIdPartition> idPartitions = new ArrayList<TopicIdPartition>();
        for (int partition = 0; partition < partitionCount; ++partition) {
            idPartitions.add(new TopicIdPartition(this.topicId, new TopicPartition(topic, partition)));
        }
        return idPartitions;
    }

    private static class DummyEventHandler
    extends RemotePartitionMetadataEventHandler {
        private int metadataCounter = 0;
        private final Map<TopicIdPartition, Boolean> isPartitionInitialized = new HashMap<TopicIdPartition, Boolean>();
        private final Map<TopicIdPartition, Boolean> isPartitionLoaded = new HashMap<TopicIdPartition, Boolean>();
        private final Map<TopicIdPartition, Boolean> isPartitionCleared = new HashMap<TopicIdPartition, Boolean>();

        private DummyEventHandler() {
        }

        protected void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
            ++this.metadataCounter;
        }

        protected void handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) {
        }

        protected void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) {
        }

        public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition, int metadataPartition, Long metadataPartitionOffset) {
        }

        public void clearTopicPartition(TopicIdPartition topicIdPartition) {
            this.isPartitionCleared.put(topicIdPartition, true);
        }

        public void markInitialized(TopicIdPartition partition) {
            this.isPartitionInitialized.put(partition, true);
        }

        public boolean isInitialized(TopicIdPartition partition) {
            return true;
        }

        public void maybeLoadPartition(TopicIdPartition partition) {
            this.isPartitionLoaded.put(partition, true);
        }
    }
}

