/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.log.remote.storage;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.InmemoryRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InmemoryRemoteStorageManagerTest {
    private static final Logger log = LoggerFactory.getLogger(InmemoryRemoteStorageManagerTest.class);
    private static final TopicPartition TP = new TopicPartition("foo", 1);
    private static final File DIR = TestUtils.tempDirectory((String)"inmem-rsm-");
    private static final Random RANDOM = new Random();

    @Test
    public void testCopyLogSegment() throws Exception {
        InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
        RemoteLogSegmentMetadata segmentMetadata = this.createRemoteLogSegmentMetadata();
        LogSegmentData logSegmentData = this.createLogSegmentData();
        rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
        boolean containsSegment = rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForSegment(segmentMetadata));
        Assertions.assertTrue((boolean)containsSegment);
        for (RemoteStorageManager.IndexType indexType : RemoteStorageManager.IndexType.values()) {
            boolean containsIndex = rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForIndex(segmentMetadata, indexType));
            Assertions.assertTrue((boolean)containsIndex);
        }
    }

    private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
        TopicIdPartition topicPartition = new TopicIdPartition(Uuid.randomUuid(), TP);
        RemoteLogSegmentId id = new RemoteLogSegmentId(topicPartition, Uuid.randomUuid());
        return new RemoteLogSegmentMetadata(id, 100L, 200L, System.currentTimeMillis(), 0, System.currentTimeMillis(), 100, Collections.singletonMap(1, 100L));
    }

    @Test
    public void testFetchLogSegmentIndexes() throws Exception {
        InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
        RemoteLogSegmentMetadata segmentMetadata = this.createRemoteLogSegmentMetadata();
        int segSize = 100;
        LogSegmentData logSegmentData = this.createLogSegmentData(segSize);
        rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
        InputStream segmentStream = rsm.fetchLogSegment(segmentMetadata, 0);
        Object object = null;
        try {
            this.checkContentSame(segmentStream, logSegmentData.logSegment());
        }
        catch (Throwable throwable) {
            object = throwable;
            throw throwable;
        }
        finally {
            if (segmentStream != null) {
                if (object != null) {
                    try {
                        segmentStream.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)object).addSuppressed(throwable);
                    }
                } else {
                    segmentStream.close();
                }
            }
        }
        HashMap<RemoteStorageManager.IndexType, Path> expectedIndexToPaths = new HashMap<RemoteStorageManager.IndexType, Path>();
        expectedIndexToPaths.put(RemoteStorageManager.IndexType.OFFSET, logSegmentData.offsetIndex());
        expectedIndexToPaths.put(RemoteStorageManager.IndexType.TIMESTAMP, logSegmentData.timeIndex());
        expectedIndexToPaths.put(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, logSegmentData.producerSnapshotIndex());
        logSegmentData.transactionIndex().ifPresent(txnIndex -> expectedIndexToPaths.put(RemoteStorageManager.IndexType.TRANSACTION, (Path)txnIndex));
        for (Map.Entry entry : expectedIndexToPaths.entrySet()) {
            RemoteStorageManager.IndexType indexType = (RemoteStorageManager.IndexType)entry.getKey();
            Path indexPath = (Path)entry.getValue();
            log.debug("Fetching index type: {}, indexPath: {}", (Object)indexType, (Object)indexPath);
            InputStream offsetIndexStream = rsm.fetchIndex(segmentMetadata, indexType);
            Throwable throwable = null;
            try {
                this.checkContentSame(offsetIndexStream, indexPath);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (offsetIndexStream == null) continue;
                if (throwable != null) {
                    try {
                        offsetIndexStream.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                offsetIndexStream.close();
            }
        }
        Throwable throwable = null;
        try (InputStream leaderEpochIndexStream = rsm.fetchIndex(segmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);){
            ByteBuffer leaderEpochIndex = logSegmentData.leaderEpochIndex();
            Assertions.assertEquals((Object)leaderEpochIndex, (Object)this.readAsByteBuffer(leaderEpochIndexStream, leaderEpochIndex.array().length));
        }
        catch (Throwable throwable4) {
            Throwable throwable5 = throwable4;
            throw throwable4;
        }
    }

    @Test
    public void testFetchSegmentsForRange() throws Exception {
        InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
        RemoteLogSegmentMetadata segmentMetadata = this.createRemoteLogSegmentMetadata();
        int segSize = 100;
        LogSegmentData logSegmentData = this.createLogSegmentData(segSize);
        Path path = logSegmentData.logSegment();
        rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
        this.doTestFetchForRange(rsm, segmentMetadata, path, 0, 40);
        this.doTestFetchForRange(rsm, segmentMetadata, path, 0, segSize);
        this.doTestFetchForRange(rsm, segmentMetadata, path, 90, segSize - 90);
        this.doTestFetchForRange(rsm, segmentMetadata, path, 0, 1);
        this.doTestFetchForRange(rsm, segmentMetadata, path, segSize - 1, 1);
        this.doTestFetchForRange(rsm, segmentMetadata, path, 3, 90);
    }

    private void doTestFetchForRange(InmemoryRemoteStorageManager rsm, RemoteLogSegmentMetadata rlsm, Path path, int startPos, int len) throws Exception {
        ByteBuffer expectedSegRangeBytes = ByteBuffer.allocate(len);
        try (SeekableByteChannel seekableByteChannel = Files.newByteChannel(path, new OpenOption[0]);){
            seekableByteChannel.position(startPos).read(expectedSegRangeBytes);
        }
        expectedSegRangeBytes.rewind();
        ByteBuffer fetchedSegRangeBytes = ByteBuffer.allocate(len);
        try (InputStream segmentRangeStream = rsm.fetchLogSegment(rlsm, startPos, startPos + len - 1);){
            Utils.readFully((InputStream)segmentRangeStream, (ByteBuffer)fetchedSegRangeBytes);
        }
        fetchedSegRangeBytes.rewind();
        Assertions.assertEquals((Object)expectedSegRangeBytes, (Object)fetchedSegRangeBytes);
    }

    @Test
    public void testFetchInvalidRange() throws Exception {
        InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
        RemoteLogSegmentMetadata remoteLogSegmentMetadata = this.createRemoteLogSegmentMetadata();
        int segSize = 100;
        LogSegmentData logSegmentData = this.createLogSegmentData(segSize);
        rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData);
        Assertions.assertThrows(Exception.class, () -> rsm.fetchLogSegment(remoteLogSegmentMetadata, 2, 1));
        Assertions.assertThrows(Exception.class, () -> rsm.fetchLogSegment(remoteLogSegmentMetadata, -1, 0));
        Assertions.assertThrows(Exception.class, () -> rsm.fetchLogSegment(remoteLogSegmentMetadata, -2, -1));
    }

    @Test
    public void testDeleteSegment() throws Exception {
        InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
        RemoteLogSegmentMetadata segmentMetadata = this.createRemoteLogSegmentMetadata();
        LogSegmentData logSegmentData = this.createLogSegmentData();
        rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
        try (InputStream segmentStream = rsm.fetchLogSegment(segmentMetadata, 0);){
            this.checkContentSame(segmentStream, logSegmentData.logSegment());
        }
        rsm.deleteLogSegmentData(segmentMetadata);
        Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> rsm.fetchLogSegment(segmentMetadata, 0));
        Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> rsm.fetchLogSegment(segmentMetadata, 0, 1));
        for (RemoteStorageManager.IndexType indexType : RemoteStorageManager.IndexType.values()) {
            Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> rsm.fetchIndex(segmentMetadata, indexType));
        }
    }

    private void checkContentSame(InputStream segmentStream, Path path) throws IOException {
        byte[] segmentBytes = Files.readAllBytes(path);
        ByteBuffer byteBuffer = this.readAsByteBuffer(segmentStream, segmentBytes.length);
        Assertions.assertEquals((Object)ByteBuffer.wrap(segmentBytes), (Object)byteBuffer);
    }

    private ByteBuffer readAsByteBuffer(InputStream segmentStream, int len) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[len]);
        Utils.readFully((InputStream)segmentStream, (ByteBuffer)byteBuffer);
        byteBuffer.rewind();
        return byteBuffer;
    }

    private LogSegmentData createLogSegmentData() throws Exception {
        return this.createLogSegmentData(100);
    }

    private LogSegmentData createLogSegmentData(int segSize) throws Exception {
        int prefix = Math.abs(RANDOM.nextInt());
        Path segment = new File(DIR, prefix + ".seg").toPath();
        Files.write(segment, TestUtils.randomBytes((int)segSize), new OpenOption[0]);
        Path offsetIndex = new File(DIR, prefix + ".oi").toPath();
        Files.write(offsetIndex, TestUtils.randomBytes((int)10), new OpenOption[0]);
        Path timeIndex = new File(DIR, prefix + ".ti").toPath();
        Files.write(timeIndex, TestUtils.randomBytes((int)10), new OpenOption[0]);
        Path txnIndex = new File(DIR, prefix + ".txni").toPath();
        Files.write(txnIndex, TestUtils.randomBytes((int)10), new OpenOption[0]);
        Path producerSnapshotIndex = new File(DIR, prefix + ".psi").toPath();
        Files.write(producerSnapshotIndex, TestUtils.randomBytes((int)10), new OpenOption[0]);
        ByteBuffer leaderEpochIndex = ByteBuffer.wrap(TestUtils.randomBytes((int)10));
        return new LogSegmentData(segment, offsetIndex, timeIndex, Optional.of(txnIndex), producerSnapshotIndex, leaderEpochIndex);
    }
}

