package kafka.tier.tools;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.log.MergedLog;
import kafka.server.KafkaConfig;
import kafka.server.ReplicaManager;
import kafka.tier.TopicIdPartition;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.TierPartitionState;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import kafka.tier.store.objects.FragmentLocation;
import kafka.tier.store.objects.FragmentType;
import kafka.tier.store.objects.ObjectType;
import kafka.tier.store.objects.metadata.FileTierPartitionStateRecoveryUploadMetadata;
import kafka.tier.store.objects.metadata.ObjectStoreMetadata;
import kafka.utils.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.Option;
import scala.collection.JavaConverters;

/* loaded from: input_file:kafka/tier/tools/TierRecoveryDataUploadCoordinatorTest.class */
public class TierRecoveryDataUploadCoordinatorTest {
    private KafkaConfig config;
    private final int numThreads = 3;
    private final int broker = 5;
    private final String identifier = "rcca-1234";
    private final Properties props = TestUtils.createBrokerConfig(5, TestUtils.MockZkConnect(), true, true, TestUtils.MockZkPort(), Option.empty(), Option.empty(), Option.empty(), true, false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), Option.empty(), 1, false, 1, 1, false);
    private final ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
    private final LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
    private final TierObjectStore objectStore = (TierObjectStore) Mockito.mock(TierObjectStore.class);
    private final List<TopicIdPartition> topicIdPartitions = new ArrayList(Arrays.asList(new TopicIdPartition("topic0", UUID.randomUUID(), 0), new TopicIdPartition("topic0", UUID.randomUUID(), 1), new TopicIdPartition("topic0", UUID.randomUUID(), 2), new TopicIdPartition("topic1", UUID.randomUUID(), 0), new TopicIdPartition("topic1", UUID.randomUUID(), 1), new TopicIdPartition("topic1", UUID.randomUUID(), 2), new TopicIdPartition("topic2", UUID.randomUUID(), 0), new TopicIdPartition("topic2", UUID.randomUUID(), 1), new TopicIdPartition("topic2", UUID.randomUUID(), 2)));
    private final List<TopicIdPartition> leaderTopicIdPartitions = Arrays.asList(this.topicIdPartitions.get(0), this.topicIdPartitions.get(4), this.topicIdPartitions.get(8));
    private final Map<String, ByteBuffer> tierOffsets = new HashMap();
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final List<String> uploadJobsMetadata = new ArrayList();
    private final ConcurrentLinkedDeque<Map<Integer, OffsetAndEpoch>> uploadJobsTierOffsets = new ConcurrentLinkedDeque<>();
    private final Map<TopicIdPartition, String> topicIdPartitionToUploadPath = new ConcurrentHashMap();
    private TierObjectStoreResponse storedMetadata = null;

    @BeforeEach
    public void setup() throws IOException {
        setupLeaderPartitions();
        Mockito.when(this.replicaManager.logManager()).thenReturn(this.logManager);
        setupLogManager();
        setupTierOffsets();
        ((TierObjectStore) Mockito.doAnswer(invocationOnMock -> {
            ByteBuffer byteBuffer = (ByteBuffer) invocationOnMock.getArgument(1);
            ObjectType objectType = (ObjectType) invocationOnMock.getArgument(2);
            if (ObjectType.TIER_RECOVERY_METADATA_UPLOAD.equals(objectType)) {
                this.uploadJobsMetadata.add(new String(byteBuffer.array()));
                return null;
            }
            if (ObjectType.TIER_OFFSETS_UPLOAD.equals(objectType)) {
                this.uploadJobsTierOffsets.add(byteBufToTierOffsets(byteBuffer));
                return null;
            }
            Assertions.fail("Received unexpected object type: " + objectType);
            return null;
        }).when(this.objectStore)).putBuffer((ObjectStoreMetadata) ArgumentMatchers.any(), (ByteBuffer) ArgumentMatchers.any(), (ObjectType) ArgumentMatchers.any());
        ((TierObjectStore) Mockito.doAnswer(invocationOnMock2 -> {
            FileTierPartitionStateRecoveryUploadMetadata fileTierPartitionStateRecoveryUploadMetadata = (ObjectStoreMetadata) invocationOnMock2.getArgument(0);
            Assertions.assertEquals(ObjectType.FILE_TIER_PARTITION_STATE_UPLOAD, (ObjectType) invocationOnMock2.getArgument(2));
            String objectPath = ((FragmentLocation) fileTierPartitionStateRecoveryUploadMetadata.toFragmentLocation("", FragmentType.FILE_TIER_PARTITION_STATE_UPLOAD).get()).objectPath();
            this.topicIdPartitionToUploadPath.put(fileTierPartitionStateRecoveryUploadMetadata.topicIdPartition(), objectPath);
            return objectPath;
        }).when(this.objectStore)).putObject((ObjectStoreMetadata) ArgumentMatchers.any(), (File) ArgumentMatchers.any(), (ObjectType) ArgumentMatchers.any());
        ((TierObjectStore) Mockito.doAnswer(invocationOnMock3 -> {
            HashMap hashMap = new HashMap();
            if (this.uploadJobsMetadata.isEmpty()) {
                return hashMap;
            }
            hashMap.put("", new ArrayList());
            return hashMap;
        }).when(this.objectStore)).listObject(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean());
        ((TierObjectStore) Mockito.doAnswer(invocationOnMock4 -> {
            Assertions.assertEquals(FragmentType.TIER_RECOVERY_METADATA_UPLOAD, (FragmentType) invocationOnMock4.getArgument(1));
            this.storedMetadata = new TierObjectStoreResponse() { // from class: kafka.tier.tools.TierRecoveryDataUploadCoordinatorTest.1
                public InputStream getInputStream() {
                    return TierRecoveryDataUploadCoordinatorTest.this.uploadJobsMetadata.isEmpty() ? new ByteArrayInputStream("".getBytes()) : new ByteArrayInputStream(((String) TierRecoveryDataUploadCoordinatorTest.this.uploadJobsMetadata.get(TierRecoveryDataUploadCoordinatorTest.this.uploadJobsMetadata.size() - 1)).getBytes());
                }

                public void close() throws IOException {
                }
            };
            return this.storedMetadata;
        }).when(this.objectStore)).getObjectStoreFragment((ObjectStoreMetadata) ArgumentMatchers.any(), (FragmentType) ArgumentMatchers.any());
    }

    private void setupLeaderPartitions() {
        List asList = Arrays.asList((Partition) Mockito.mock(Partition.class), (Partition) Mockito.mock(Partition.class), (Partition) Mockito.mock(Partition.class));
        for (int i = 0; i < asList.size(); i++) {
            Partition partition = (Partition) asList.get(i);
            TopicIdPartition topicIdPartition = this.leaderTopicIdPartitions.get(i);
            Mockito.when(partition.topicPartition()).thenReturn(topicIdPartition.topicPartition());
            Mockito.when(partition.topicId()).thenReturn(Option.apply(topicIdPartition.kafkaTopicId()));
        }
        Mockito.when(this.replicaManager.leaderPartitionsIterator()).thenReturn(JavaConverters.asScalaIterator(asList.iterator()));
    }

    private void setupLogManager() throws IOException {
        this.props.put("confluent.checksum.enabled.files", "all");
        this.config = KafkaConfig.fromProps(this.props);
        ArrayList arrayList = new ArrayList();
        this.topicIdPartitions.forEach(topicIdPartition -> {
            arrayList.add(Mockito.mock(MergedLog.class));
        });
        ArrayList arrayList2 = new ArrayList();
        this.topicIdPartitions.forEach(topicIdPartition2 -> {
            arrayList2.add(Mockito.mock(FileTierPartitionState.class));
        });
        Mockito.when(this.logManager.allLogs()).thenReturn(JavaConverters.collectionAsScalaIterable(arrayList));
        for (int i = 0; i < arrayList.size(); i++) {
            AbstractLog abstractLog = (AbstractLog) arrayList.get(i);
            TierPartitionState tierPartitionState = (TierPartitionState) arrayList2.get(i);
            TopicIdPartition topicIdPartition3 = this.topicIdPartitions.get(i);
            Path absolutePath = Paths.get(String.format("recovery-uploads/%s_%d_00000000000000000000.tierstate.recoveryupload.adler", topicIdPartition3.topic(), Integer.valueOf(topicIdPartition3.partition())), new String[0]).toAbsolutePath();
            Mockito.when(abstractLog.topicIdPartition()).thenReturn(Option.apply(topicIdPartition3));
            Mockito.when(abstractLog.tierPartitionState()).thenReturn(tierPartitionState);
            Mockito.when(tierPartitionState.backupStateForRecovery()).thenReturn(absolutePath);
        }
        Mockito.when(this.logManager.readTierOffsets()).thenReturn(this.tierOffsets);
    }

    private void setupTierOffsets() {
        Random random = new Random();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 50; i++) {
            hashMap.put(Integer.valueOf(i), new OffsetAndEpoch(random.nextInt(), Optional.of(Integer.valueOf(random.nextInt()))));
        }
        this.tierOffsets.put("logDir", tierOffsetsToByteBuf(hashMap));
    }

    private ByteBuffer tierOffsetsToByteBuf(Map<Integer, OffsetAndEpoch> map) {
        StringBuilder sb = new StringBuilder();
        sb.append("1");
        map.forEach((num, offsetAndEpoch) -> {
            sb.append(String.format("%d %d %d\n", num, Long.valueOf(offsetAndEpoch.offset()), offsetAndEpoch.epoch().orElse(-1)));
        });
        return ByteBuffer.wrap(sb.toString().getBytes());
    }

    private Map<Integer, OffsetAndEpoch> byteBufToTierOffsets(ByteBuffer byteBuffer) {
        HashMap hashMap = new HashMap();
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(byteBuffer.array())));
        while (true) {
            try {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    break;
                }
                String[] split = readLine.split(" ");
                if (split.length != 1) {
                    Assertions.assertEquals(3, split.length);
                    hashMap.put(Integer.valueOf(Integer.parseInt(split[0])), new OffsetAndEpoch(Integer.parseInt(split[1]), Optional.of(Integer.valueOf(Integer.parseInt(split[2])))));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return hashMap;
    }

    @AfterEach
    public void cleanup() {
        this.uploadJobsMetadata.clear();
        this.topicIdPartitionToUploadPath.clear();
        this.storedMetadata = null;
    }

    @Test
    public void testUploadInitiate() throws InterruptedException, IOException {
        TierRecoveryDataUploadCoordinator tierRecoveryDataUploadCoordinator = new TierRecoveryDataUploadCoordinator(this.config, this.replicaManager, this.objectStore);
        verifyJobResult(tierRecoveryDataUploadCoordinator, maybeInitiateAndCompleteUpload(tierRecoveryDataUploadCoordinator), this.uploadJobsMetadata.get(0));
    }

    @Test
    public void testGetJobStatus() throws InterruptedException, IOException {
        TierRecoveryDataUploadCoordinator tierRecoveryDataUploadCoordinator = new TierRecoveryDataUploadCoordinator(this.config, this.replicaManager, this.objectStore);
        UUID maybeInitiateAndCompleteUpload = maybeInitiateAndCompleteUpload(tierRecoveryDataUploadCoordinator);
        UUID randomUUID = UUID.randomUUID();
        while (true) {
            UUID uuid = randomUUID;
            if (uuid != maybeInitiateAndCompleteUpload) {
                Assertions.assertEquals(TierRecoveryDataUploadJobStatus.UNKNOWN, tierRecoveryDataUploadCoordinator.getJobResult(uuid).status());
                verifyJobResult(tierRecoveryDataUploadCoordinator, maybeInitiateAndCompleteUpload, this.uploadJobsMetadata.get(0));
                UUID maybeInitiateAndCompleteUpload2 = maybeInitiateAndCompleteUpload(tierRecoveryDataUploadCoordinator);
                Assertions.assertEquals(TierRecoveryDataUploadJobStatus.COMPLETED, tierRecoveryDataUploadCoordinator.getJobResult(maybeInitiateAndCompleteUpload).status());
                Assertions.assertEquals(TierRecoveryDataUploadJobStatus.COMPLETED, tierRecoveryDataUploadCoordinator.getJobResult(maybeInitiateAndCompleteUpload2).status());
                return;
            }
            randomUUID = UUID.randomUUID();
        }
    }

    @Test
    public void testGetJobResults() throws InterruptedException, IOException {
        TierRecoveryDataUploadCoordinator tierRecoveryDataUploadCoordinator = new TierRecoveryDataUploadCoordinator(this.config, this.replicaManager, this.objectStore);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            setupTierOffsets();
            UUID maybeInitiateAndCompleteUpload = maybeInitiateAndCompleteUpload(tierRecoveryDataUploadCoordinator);
            arrayList.add(maybeInitiateAndCompleteUpload);
            verifyJobResult(tierRecoveryDataUploadCoordinator, maybeInitiateAndCompleteUpload, this.uploadJobsMetadata.get(i));
        }
        verifyJobResult(tierRecoveryDataUploadCoordinator, maybeInitiateAndCompleteUpload(tierRecoveryDataUploadCoordinator), this.uploadJobsMetadata.get(10));
        Assertions.assertEquals(TierRecoveryDataUploadResult.makeDummyJobResult(), tierRecoveryDataUploadCoordinator.getJobResult((UUID) arrayList.get(0)));
    }

    @Test
    public void testRerunJobWithAdditionalPartitions() throws IOException, InterruptedException {
        TierRecoveryDataUploadCoordinator tierRecoveryDataUploadCoordinator = new TierRecoveryDataUploadCoordinator(this.config, this.replicaManager, this.objectStore);
        verifyJobResult(tierRecoveryDataUploadCoordinator, maybeInitiateAndCompleteUpload(tierRecoveryDataUploadCoordinator), this.uploadJobsMetadata.get(0));
        this.topicIdPartitions.add(new TopicIdPartition("topic3", UUID.randomUUID(), 0));
        setupLogManager();
        verifyJobResult(tierRecoveryDataUploadCoordinator, maybeInitiateAndCompleteUpload(tierRecoveryDataUploadCoordinator), this.uploadJobsMetadata.get(1));
    }

    private UUID maybeInitiateAndCompleteUpload(TierRecoveryDataUploadCoordinator tierRecoveryDataUploadCoordinator) throws InterruptedException, IOException {
        setupLeaderPartitions();
        UUID initiateTierRecoveryDataUpload = tierRecoveryDataUploadCoordinator.initiateTierRecoveryDataUpload(new HashSet(this.topicIdPartitions), "rcca-1234", 3);
        while (tierRecoveryDataUploadCoordinator.getJobResult(initiateTierRecoveryDataUpload).status() != TierRecoveryDataUploadJobStatus.COMPLETED) {
            Assertions.assertThrows(IllegalStateException.class, () -> {
                tierRecoveryDataUploadCoordinator.initiateTierRecoveryDataUpload(new HashSet(this.topicIdPartitions), "rcca-1234", 3);
            });
            Thread.sleep(100L);
        }
        Assertions.assertEquals(TierRecoveryDataUploadJobStatus.COMPLETED, tierRecoveryDataUploadCoordinator.getJobResult(initiateTierRecoveryDataUpload).status());
        return initiateTierRecoveryDataUpload;
    }

    private void verifyJobResult(TierRecoveryDataUploadCoordinator tierRecoveryDataUploadCoordinator, UUID uuid, String str) throws IOException, InterruptedException {
        TierRecoveryDataUploadResult jobResult = tierRecoveryDataUploadCoordinator.getJobResult(uuid);
        Assertions.assertEquals(new HashMap(), jobResult.failedPartitions());
        Assertions.assertTrue(jobResult.metadataUploadCompleted());
        Assertions.assertNull(jobResult.metadataUploadFailedExceptionMessage());
        TierRecoveryUploadMetadataJson tierRecoveryUploadMetadataJson = (TierRecoveryUploadMetadataJson) this.objectMapper.readValue(str, TierRecoveryUploadMetadataJson.class);
        Map map = tierRecoveryUploadMetadataJson.partitions;
        map.forEach((str2, partitionUploadInfo) -> {
            Assertions.assertEquals(this.topicIdPartitions.size(), this.topicIdPartitionToUploadPath.size());
            Assertions.assertEquals(Boolean.valueOf(this.leaderTopicIdPartitions.contains(TopicIdPartition.fromString(str2))), Boolean.valueOf(partitionUploadInfo.isLeader));
            Assertions.assertEquals(this.topicIdPartitionToUploadPath.getOrDefault(TopicIdPartition.fromString(str2), "null"), partitionUploadInfo.objectStorePath);
        });
        Assertions.assertEquals(new HashSet(this.topicIdPartitions), map.keySet().stream().map(TopicIdPartition::fromString).collect(Collectors.toSet()));
        Assertions.assertEquals(TierRecoveryDataUploadCoordinator.CURRENT_METADATA_VERSION, tierRecoveryUploadMetadataJson.version);
        Assertions.assertTrue(jobResult.tierOffsetsUploadCompleted());
        Assertions.assertNull(jobResult.tierOffsetsUploadFailedExceptionMessage());
        Assertions.assertEquals(byteBufToTierOffsets(this.tierOffsets.get("logDir")), this.uploadJobsTierOffsets.getLast());
    }
}
