package kafka.tier.fetcher;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import junit.framework.TestCase;
import kafka.log.LogConfig;
import kafka.log.LogSegment;
import kafka.server.DelayedOperation;
import kafka.tier.TierTimestampAndOffset;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.mutable.Set;

/* loaded from: input_file:kafka/tier/fetcher/TierFetcherTest.class */
public class TierFetcherTest {
    private MockTime mockTime = new MockTime();

    /* loaded from: input_file:kafka/tier/fetcher/TierFetcherTest$MockDelayedFetch.class */
    class MockDelayedFetch extends DelayedOperation {
        PendingFetch fetch;

        MockDelayedFetch(PendingFetch pendingFetch) {
            super(0L, Option.empty());
            this.fetch = pendingFetch;
        }

        public void onExpiration() {
        }

        public void onComplete() {
            this.fetch.finish();
        }

        public boolean tryComplete() {
            if (this.fetch.isComplete()) {
                return forceComplete();
            }
            return false;
        }
    }

    /* loaded from: input_file:kafka/tier/fetcher/TierFetcherTest$MockedTierObjectStore.class */
    class MockedTierObjectStore implements TierObjectStore {
        private final ByteBuffer segmentByteBuffer;
        private final ByteBuffer offsetByteBuffer;
        private final ByteBuffer timestampByteBuffer;
        private final AtomicBoolean failNextRequest = new AtomicBoolean(false);

        /* loaded from: input_file:kafka/tier/fetcher/TierFetcherTest$MockedTierObjectStore$MockTierObjectStoreResponse.class */
        class MockTierObjectStoreResponse implements TierObjectStoreResponse {
            private final InputStream is;
            private final long size;

            MockTierObjectStoreResponse(InputStream inputStream, long j) {
                this.is = inputStream;
                this.size = j;
            }

            public InputStream getInputStream() {
                return this.is;
            }

            public Long getObjectSize() {
                return Long.valueOf(this.size);
            }

            public void close() {
            }
        }

        MockedTierObjectStore(ByteBuffer byteBuffer, ByteBuffer byteBuffer2, ByteBuffer byteBuffer3) {
            this.segmentByteBuffer = byteBuffer;
            this.offsetByteBuffer = byteBuffer2;
            this.timestampByteBuffer = byteBuffer3;
        }

        void failNextRequest() {
            this.failNextRequest.set(true);
        }

        public void close() {
        }

        public TierObjectStoreResponse getObject(TierObjectMetadata tierObjectMetadata, TierObjectStore.TierObjectStoreFileType tierObjectStoreFileType, Integer num, Integer num2) throws IOException {
            ByteBuffer byteBuffer;
            if (this.failNextRequest.compareAndSet(true, false)) {
                throw new IOException("Failed to retrieve object.");
            }
            if (tierObjectStoreFileType == TierObjectStore.TierObjectStoreFileType.OFFSET_INDEX) {
                byteBuffer = this.offsetByteBuffer;
            } else if (tierObjectStoreFileType == TierObjectStore.TierObjectStoreFileType.SEGMENT) {
                byteBuffer = this.segmentByteBuffer;
            } else {
                if (tierObjectStoreFileType != TierObjectStore.TierObjectStoreFileType.TIMESTAMP_INDEX) {
                    throw new UnsupportedOperationException();
                }
                byteBuffer = this.timestampByteBuffer;
            }
            int intValue = num == null ? 0 : num.intValue();
            int length = num2 == null ? byteBuffer.array().length : num2.intValue();
            int min = Math.min(length - intValue, byteBuffer.array().length);
            ByteBuffer allocate = ByteBuffer.allocate(min);
            allocate.put(byteBuffer.array(), intValue, length - intValue);
            allocate.flip();
            return new MockTierObjectStoreResponse(new ByteBufferInputStream(allocate), min);
        }

        public TierObjectMetadata putSegment(TierObjectMetadata tierObjectMetadata, File file, File file2, File file3, File file4, File file5, Optional<File> optional) {
            throw new UnsupportedOperationException();
        }
    }

    @Test
    public void tierFetcherExceptionCausesOnComplete() throws Exception {
        MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(ByteBuffer.allocate(1), ByteBuffer.allocate(1), ByteBuffer.allocate(1));
        TierObjectMetadata tierObjectMetadata = new TierObjectMetadata(new TopicPartition("foo", 0), 0, 0L, 0, 0L, 0L, 0, false, false, (byte) 1);
        Metrics metrics = new Metrics();
        TierFetcher tierFetcher = new TierFetcher(mockedTierObjectStore, metrics);
        try {
            TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(0L, Option.apply(1000L), 600, 1000L, true, tierObjectMetadata, Option.empty(), 0L, 1000);
            CompletableFuture completableFuture = new CompletableFuture();
            mockedTierObjectStore.failNextRequest();
            Assert.assertEquals(metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue(), Double.valueOf(0.0d));
            new MockDelayedFetch(tierFetcher.fetch(new ArrayList(Arrays.asList(tierFetchMetadata)), delayedOperationKey -> {
                completableFuture.complete(true);
            }));
            TestCase.assertTrue(((Boolean) completableFuture.get(2000L, TimeUnit.MILLISECONDS)).booleanValue());
            Assert.assertEquals(((Double) metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()).doubleValue(), 0.0d, 0.0d);
            tierFetcher.close();
        } catch (Throwable th) {
            tierFetcher.close();
            throw th;
        }
    }

    private ByteBuffer getMemoryRecordsBuffer() {
        ByteBuffer allocate = ByteBuffer.allocate(2048);
        ByteBuffer allocate2 = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder(allocate, (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        MemoryRecordsBuilder builder2 = MemoryRecords.builder(allocate2, (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        IntStream.range(0, 50).forEach(i -> {
            builder.appendWithOffset(i, 1L, "a".getBytes(), "v".getBytes());
        });
        IntStream.range(50, 101).forEach(i2 -> {
            builder2.appendWithOffset(i2, 1L, "a".getBytes(), "v".getBytes());
        });
        builder.build();
        builder2.build();
        allocate.flip();
        allocate2.flip();
        ByteBuffer put = ByteBuffer.allocate(4194304).put(allocate).put(allocate2);
        put.flip();
        return put;
    }

    @Test
    public void tierFetcherRequestEmptyIndexTest() throws Exception {
        MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(getMemoryRecordsBuffer(), ByteBuffer.allocate(0), ByteBuffer.allocate(0));
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TierObjectMetadata tierObjectMetadata = new TierObjectMetadata(topicPartition, 0, 0L, 101, 0L, 0L, 0, false, false, (byte) 1);
        Metrics metrics = new Metrics();
        TierFetcher tierFetcher = new TierFetcher(mockedTierObjectStore, metrics);
        try {
            TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(0L, Option.apply(1000L), 10000, 1000L, true, tierObjectMetadata, Option.empty(), 0L, 1000);
            CompletableFuture completableFuture = new CompletableFuture();
            Assert.assertEquals(metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue(), Double.valueOf(0.0d));
            PendingFetch fetch = tierFetcher.fetch(new ArrayList(Arrays.asList(tierFetchMetadata)), delayedOperationKey -> {
                completableFuture.complete(true);
            });
            MockDelayedFetch mockDelayedFetch = new MockDelayedFetch(fetch);
            TestCase.assertTrue(((Boolean) completableFuture.get(2000L, TimeUnit.MILLISECONDS)).booleanValue());
            Map finish = fetch.finish();
            Assert.assertNotNull("expected non-null fetch result", finish);
            TestCase.assertTrue(((Double) metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()).doubleValue() > 0.0d);
            TestCase.assertTrue(mockDelayedFetch.tryComplete());
            long j = 0;
            Iterator it = ((TierFetchResult) finish.get(topicPartition)).records.records().iterator();
            while (it.hasNext()) {
                Assert.assertEquals("Offset not expected", ((Record) it.next()).offset(), j);
                j++;
            }
        } finally {
            tierFetcher.close();
        }
    }

    private MemoryRecords buildWithOffset(long j) {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(2048), (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, j);
        IntStream.range(0, 50).forEach(i -> {
            builder.appendWithOffset(j + i, j + i, "a".getBytes(), "v".getBytes());
        });
        return builder.build();
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void tierFetcherIndexTest() throws Exception {
        File tempDirectory = TestUtils.tempDirectory();
        Properties properties = new Properties();
        properties.put(LogConfig.IndexIntervalBytesProp(), 1);
        LogSegment open = LogSegment.open(tempDirectory, 0L, LogConfig.apply(properties, ((Set) JavaConverters.asScalaSetConverter(Collections.emptySet()).asScala()).toSet()), this.mockTime, false, 4096, false, "");
        try {
            open.append(open.readNextOffset() + 49, 1L, 1L, buildWithOffset(open.readNextOffset()));
            open.flush();
            open.append(open.readNextOffset() + 49, 1L, 1L, buildWithOffset(open.readNextOffset()));
            open.flush();
            open.append(open.readNextOffset() + 49, 1L, 1L, buildWithOffset(open.readNextOffset()));
            open.flush();
            int readNextOffset = (int) open.readNextOffset();
            open.offsetIndex().flush();
            open.offsetIndex().trimToValidSize();
            MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(ByteBuffer.wrap(Files.readAllBytes(open.log().file().toPath())), ByteBuffer.wrap(Files.readAllBytes(open.offsetIndex().file().toPath())), ByteBuffer.wrap(Files.readAllBytes(open.offsetIndex().file().toPath())));
            TopicPartition topicPartition = new TopicPartition("foo", 0);
            TierObjectMetadata tierObjectMetadata = new TierObjectMetadata(topicPartition, 0, 0L, readNextOffset, 0L, 0L, 0, false, false, (byte) 1);
            Metrics metrics = new Metrics();
            TierFetcher tierFetcher = new TierFetcher(mockedTierObjectStore, metrics);
            try {
                TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(100L, Option.apply(1000L), 10000, 1000L, true, tierObjectMetadata, Option.empty(), 0L, 1000);
                CompletableFuture completableFuture = new CompletableFuture();
                Assert.assertEquals(metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue(), Double.valueOf(0.0d));
                PendingFetch fetch = tierFetcher.fetch(new ArrayList(Arrays.asList(tierFetchMetadata)), delayedOperationKey -> {
                    completableFuture.complete(true);
                });
                MockDelayedFetch mockDelayedFetch = new MockDelayedFetch(fetch);
                TestCase.assertTrue(((Boolean) completableFuture.get(2000L, TimeUnit.MILLISECONDS)).booleanValue());
                Map finish = fetch.finish();
                Assert.assertNotNull("expected non-null fetch result", finish);
                TestCase.assertTrue(((Double) metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()).doubleValue() > 0.0d);
                TestCase.assertTrue(mockDelayedFetch.tryComplete());
                long j = 100;
                Iterator it = ((TierFetchResult) finish.get(topicPartition)).records.records().iterator();
                while (it.hasNext()) {
                    Assert.assertEquals("Offset not expected", j, ((Record) it.next()).offset());
                    j++;
                }
                tierFetcher.close();
            } catch (Throwable th) {
                tierFetcher.close();
                throw th;
            }
        } finally {
            open.close();
        }
    }

    @Test
    public void tierTimestampIndexTest() throws Exception {
        File tempDirectory = TestUtils.tempDirectory();
        Properties properties = new Properties();
        properties.put(LogConfig.IndexIntervalBytesProp(), 1);
        LogSegment open = LogSegment.open(tempDirectory, 0L, LogConfig.apply(properties, ((Set) JavaConverters.asScalaSetConverter(Collections.emptySet()).asScala()).toSet()), this.mockTime, false, 4096, false, "");
        try {
            MemoryRecords buildWithOffset = buildWithOffset(open.readNextOffset());
            long readNextOffset = open.readNextOffset() + 49;
            open.append(readNextOffset, readNextOffset, readNextOffset, buildWithOffset);
            open.flush();
            MemoryRecords buildWithOffset2 = buildWithOffset(open.readNextOffset());
            long readNextOffset2 = open.readNextOffset() + 49;
            open.append(readNextOffset2, readNextOffset2, readNextOffset2, buildWithOffset2);
            open.flush();
            MemoryRecords buildWithOffset3 = buildWithOffset(open.readNextOffset());
            long readNextOffset3 = open.readNextOffset() + 49;
            open.append(readNextOffset3, readNextOffset3, readNextOffset3, buildWithOffset3);
            open.flush();
            long readNextOffset4 = open.readNextOffset() + 49;
            open.append(readNextOffset4, readNextOffset4, readNextOffset4, buildWithOffset3);
            int readNextOffset5 = (int) open.readNextOffset();
            open.offsetIndex().flush();
            open.offsetIndex().trimToValidSize();
            open.timeIndex().flush();
            open.timeIndex().trimToValidSize();
            MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(ByteBuffer.wrap(Files.readAllBytes(open.log().file().toPath())), ByteBuffer.wrap(Files.readAllBytes(open.offsetIndex().file().toPath())), ByteBuffer.wrap(Files.readAllBytes(open.timeIndex().file().toPath())));
            TopicPartition topicPartition = new TopicPartition("foo", 0);
            TierObjectMetadata tierObjectMetadata = new TierObjectMetadata(topicPartition, 0, 0L, readNextOffset5, 0L, 0L, 0, false, false, (byte) 1);
            TierFetcher tierFetcher = new TierFetcher(mockedTierObjectStore, new Metrics());
            try {
                CompletableFuture completableFuture = new CompletableFuture();
                HashMap hashMap = new HashMap();
                hashMap.put(topicPartition, new TierTimestampAndOffset(101L, tierObjectMetadata));
                PendingOffsetForTimestamp fetchOffsetForTimestamp = tierFetcher.fetchOffsetForTimestamp(hashMap, Optional.of(IsolationLevel.READ_UNCOMMITTED), delayedOperationKey -> {
                    completableFuture.complete(true);
                });
                completableFuture.get(2000L, TimeUnit.MILLISECONDS);
                Assert.assertEquals("incorrect offset for supplied timestamp returned", Optional.of(new FileRecords.FileTimestampAndOffset(101L, 101L, Optional.empty())), fetchOffsetForTimestamp.results().get(topicPartition));
                mockedTierObjectStore.failNextRequest();
                CompletableFuture completableFuture2 = new CompletableFuture();
                HashMap hashMap2 = new HashMap();
                hashMap2.put(topicPartition, new TierTimestampAndOffset(101L, tierObjectMetadata));
                PendingOffsetForTimestamp fetchOffsetForTimestamp2 = tierFetcher.fetchOffsetForTimestamp(hashMap2, Optional.of(IsolationLevel.READ_UNCOMMITTED), delayedOperationKey2 -> {
                    completableFuture2.complete(true);
                });
                completableFuture2.get(2000L, TimeUnit.MILLISECONDS);
                Assert.assertNotNull("tier object store through exception, pending result should have been completed exceptionally", ((FileRecords.FileTimestampAndOffset) ((Optional) fetchOffsetForTimestamp2.results().get(topicPartition)).get()).exception);
                tierFetcher.close();
            } catch (Throwable th) {
                tierFetcher.close();
                throw th;
            }
        } finally {
            open.close();
        }
    }

    @Test
    public void tierFetcherMaxBytesTest() throws Exception {
        File tempDirectory = TestUtils.tempDirectory();
        Properties properties = new Properties();
        properties.put(LogConfig.IndexIntervalBytesProp(), 1);
        LogSegment open = LogSegment.open(tempDirectory, 0L, LogConfig.apply(properties, ((Set) JavaConverters.asScalaSetConverter(Collections.emptySet()).asScala()).toSet()), this.mockTime, false, 4096, false, "");
        try {
            open.append(open.readNextOffset() + 49, 1L, 1L, buildWithOffset(open.readNextOffset()));
            open.flush();
            open.append(open.readNextOffset() + 49, 1L, 1L, buildWithOffset(open.readNextOffset()));
            open.flush();
            open.append(open.readNextOffset() + 49, 1L, 1L, buildWithOffset(open.readNextOffset()));
            open.flush();
            int readNextOffset = (int) open.readNextOffset();
            open.offsetIndex().flush();
            open.offsetIndex().trimToValidSize();
            MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(ByteBuffer.wrap(Files.readAllBytes(open.log().file().toPath())), ByteBuffer.wrap(Files.readAllBytes(open.offsetIndex().file().toPath())), ByteBuffer.wrap(Files.readAllBytes(open.timeIndex().file().toPath())));
            TopicPartition topicPartition = new TopicPartition("foo", 0);
            TierObjectMetadata tierObjectMetadata = new TierObjectMetadata(topicPartition, 0, 0L, readNextOffset, 0L, 0L, 0, false, false, (byte) 1);
            Metrics metrics = new Metrics();
            TierFetcher tierFetcher = new TierFetcher(mockedTierObjectStore, metrics);
            try {
                TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(0L, Option.apply(1000L), 600, 1000L, true, tierObjectMetadata, Option.empty(), 0L, 1000);
                CompletableFuture completableFuture = new CompletableFuture();
                Assert.assertEquals(metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue(), Double.valueOf(0.0d));
                PendingFetch fetch = tierFetcher.fetch(new ArrayList(Arrays.asList(tierFetchMetadata)), delayedOperationKey -> {
                    completableFuture.complete(true);
                });
                MockDelayedFetch mockDelayedFetch = new MockDelayedFetch(fetch);
                TestCase.assertTrue(((Boolean) completableFuture.get(2000L, TimeUnit.MILLISECONDS)).booleanValue());
                Map finish = fetch.finish();
                Assert.assertNotNull("expected non-null fetch result", finish);
                TestCase.assertTrue(((Double) metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()).doubleValue() > 0.0d);
                TestCase.assertTrue(mockDelayedFetch.tryComplete());
                TierFetchResult tierFetchResult = (TierFetchResult) finish.get(topicPartition);
                Records records = tierFetchResult.records;
                TestCase.assertTrue(tierFetchResult.records.sizeInBytes() <= 600);
                long j = 0;
                Iterator it = records.records().iterator();
                while (it.hasNext()) {
                    Assert.assertEquals("Offset not expected", ((Record) it.next()).offset(), j);
                    j++;
                }
                Assert.assertEquals("When we set maxBytes low, we just read the first 50 records successfully.", 50L, j);
                tierFetcher.close();
            } catch (Throwable th) {
                tierFetcher.close();
                throw th;
            }
        } finally {
            open.close();
        }
    }
}
