/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.log.remote.metadata.storage;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataManagerTestUtils;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataTopicPartitioner;
import org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@Tag(value="integration")
@ExtendWith(value={ClusterTestExtensions.class})
@ClusterTestDefaults(brokers=3)
public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
    private final ClusterInstance clusterInstance;
    private final Time time = new SystemTime();

    TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest(ClusterInstance clusterInstance) {
        this.clusterInstance = clusterInstance;
    }

    @ClusterTest
    public void testMultiplePartitionSubscriptions() throws Exception {
        String leaderTopic = "leader";
        this.createTopic(leaderTopic, Collections.singletonMap(0, Arrays.asList(0, 1, 2)));
        String followerTopic = "follower";
        this.createTopic(followerTopic, Collections.singletonMap(0, Arrays.asList(1, 2, 0)));
        String topicWithNoMessages = "no-messages-topic";
        this.createTopic(topicWithNoMessages, Collections.singletonMap(0, Arrays.asList(1, 2, 0)));
        TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
        TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
        final TopicIdPartition emptyTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0));
        RemotePartitionMetadataStore spyRemotePartitionMetadataStore = (RemotePartitionMetadataStore)Mockito.spy((Object)new RemotePartitionMetadataStore());
        Phaser initializationPhaser = new Phaser(2);
        ((RemotePartitionMetadataStore)Mockito.doAnswer(invocationOnMock -> {
            Object result = invocationOnMock.callRealMethod();
            initializationPhaser.arriveAndDeregister();
            return result;
        }).when((Object)spyRemotePartitionMetadataStore)).markInitialized((TopicIdPartition)ArgumentMatchers.any());
        Phaser handleRemoteLogSegmentMetadataPhaser = new Phaser(2);
        ((RemotePartitionMetadataStore)Mockito.doAnswer(invocationOnMock -> {
            Object result = invocationOnMock.callRealMethod();
            handleRemoteLogSegmentMetadataPhaser.arriveAndDeregister();
            return result;
        }).when((Object)spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)ArgumentMatchers.any());
        try (TopicBasedRemoteLogMetadataManager remoteLogMetadataManager = RemoteLogMetadataManagerTestUtils.builder().bootstrapServers(this.clusterInstance.bootstrapServers()).startConsumerThread(true).remoteLogMetadataTopicPartitioner(numMetadataTopicPartitions -> new RemoteLogMetadataTopicPartitioner((int)numMetadataTopicPartitions){

            public int metadataPartition(TopicIdPartition topicIdPartition) {
                if (emptyTopicIdPartition.equals((Object)topicIdPartition)) {
                    return 1;
                }
                return 0;
            }
        }).remotePartitionMetadataStore(() -> spyRemotePartitionMetadataStore).build();){
            int segSize = 0x100000;
            RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), 0L, 100L, -1L, 0, this.time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
            ExecutionException exception = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
                Void cfr_ignored_0 = (Void)remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get();
            });
            Assertions.assertEquals((Object)"org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", (Object)exception.getMessage());
            RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()), 0L, 100L, -1L, 0, this.time.milliseconds(), segSize, Collections.singletonMap(0, 0L));
            exception = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
                Void cfr_ignored_0 = (Void)remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get();
            });
            Assertions.assertEquals((Object)"org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", (Object)exception.getMessage());
            Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition));
            Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition));
            remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition), Collections.emptySet());
            initializationPhaser.awaitAdvanceInterruptibly(initializationPhaser.arrive(), 30000L, TimeUnit.MILLISECONDS);
            handleRemoteLogSegmentMetadataPhaser.awaitAdvanceInterruptibly(handleRemoteLogSegmentMetadataPhaser.arrive(), 30000L, TimeUnit.MILLISECONDS);
            ((RemotePartitionMetadataStore)Mockito.verify((Object)spyRemotePartitionMetadataStore)).markInitialized(leaderTopicIdPartition);
            ((RemotePartitionMetadataStore)Mockito.verify((Object)spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadata(leaderSegmentMetadata);
            Mockito.clearInvocations((Object[])new RemotePartitionMetadataStore[]{spyRemotePartitionMetadataStore});
            Assertions.assertTrue((boolean)remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext());
            Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition));
            initializationPhaser.bulkRegister(2);
            handleRemoteLogSegmentMetadataPhaser.register();
            remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition), Collections.singleton(followerTopicIdPartition));
            initializationPhaser.awaitAdvanceInterruptibly(initializationPhaser.arrive(), 30000L, TimeUnit.MILLISECONDS);
            handleRemoteLogSegmentMetadataPhaser.awaitAdvanceInterruptibly(handleRemoteLogSegmentMetadataPhaser.arrive(), 30000L, TimeUnit.MILLISECONDS);
            ((RemotePartitionMetadataStore)Mockito.verify((Object)spyRemotePartitionMetadataStore)).markInitialized(followerTopicIdPartition);
            ((RemotePartitionMetadataStore)Mockito.verify((Object)spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadata(followerSegmentMetadata);
            Assertions.assertTrue((boolean)remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext(), (String)"No segments found");
            Assertions.assertTrue((boolean)remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition).hasNext(), (String)"No segments found");
        }
    }

    private void createTopic(String topic, Map<Integer, List<Integer>> replicasAssignments) {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            admin.createTopics(Collections.singletonList(new NewTopic(topic, replicasAssignments)));
            Assertions.assertDoesNotThrow(() -> this.clusterInstance.waitForTopic(topic, replicasAssignments.size()));
        }
    }
}

