package kafka.tier.fetcher;

import io.confluent.kafka.storage.checksum.ChecksumParams;
import io.confluent.kafka.storage.checksum.E2EChecksumStore;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import kafka.server.DelayedOperation;
import kafka.server.KafkaConfig;
import kafka.tier.TierUnfetchedTimestampAndOffset;
import kafka.tier.TopicIdPartition;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.fetcher.MemoryTracker;
import kafka.tier.fetcher.objectcache.ObjectCacheConfig;
import kafka.tier.fetcher.objectcache.PrefetchCache;
import kafka.tier.fetcher.offsetcache.FetchOffsetCache;
import kafka.tier.state.SegmentAndMetadataLayout;
import kafka.tier.store.BucketHealthResult;
import kafka.tier.store.MockInMemoryTierObjectStore;
import kafka.tier.store.MockInMemoryTierObjectStoreConfig;
import kafka.tier.store.OpaqueData;
import kafka.tier.store.TierObjectAttribute;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import kafka.tier.store.VersionInformation;
import kafka.tier.store.objects.FragmentType;
import kafka.tier.store.objects.ObjectType;
import kafka.tier.store.objects.TierSegmentUpload;
import kafka.tier.store.objects.metadata.ObjectMetadata;
import kafka.tier.store.objects.metadata.ObjectStoreMetadata;
import kafka.utils.TestUtils;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.storage.internals.log.FetchedTimestampAndOffset;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogSegment;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import scala.compat.java8.OptionConverters;

/* loaded from: input_file:kafka/tier/fetcher/TierFetcherTest.class */
public class TierFetcherTest {
    private MockTime mockTime = new MockTime();
    private E2EChecksumStore checksumStore = TestUtils.createChecksumStore();
    private ChecksumParams checksumParams = new ChecksumParams(Optional.of(this.checksumStore), true, true);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/tier/fetcher/TierFetcherTest$MockDelayedFetch.class */
    public static class MockDelayedFetch extends DelayedOperation {
        PendingFetch fetch;

        MockDelayedFetch(PendingFetch pendingFetch) {
            super(0L, OptionConverters.toScala(Optional.empty()));
            this.fetch = pendingFetch;
        }

        public void onExpiration() {
        }

        public void onComplete() {
            this.fetch.finish();
        }

        public boolean tryComplete() {
            if (this.fetch.isComplete()) {
                return forceComplete();
            }
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/tier/fetcher/TierFetcherTest$MockedTierObjectStore.class */
    public class MockedTierObjectStore extends TierObjectStore {
        private final ByteBuffer segmentByteBuffer;
        private final ByteBuffer offsetByteBuffer;
        private final ByteBuffer timestampByteBuffer;
        private final AtomicBoolean failNextRequest = new AtomicBoolean(false);
        int segmentReads = 0;
        int offsetIndexReads = 0;
        int timestampIndexReads = 0;

        /* loaded from: input_file:kafka/tier/fetcher/TierFetcherTest$MockedTierObjectStore$MockTierObjectStoreResponse.class */
        class MockTierObjectStoreResponse implements TierObjectStoreResponse {
            private final InputStream is;

            MockTierObjectStoreResponse(InputStream inputStream) {
                this.is = inputStream;
            }

            public InputStream getInputStream() {
                return this.is;
            }

            public void close() {
            }
        }

        MockedTierObjectStore(ByteBuffer byteBuffer, ByteBuffer byteBuffer2, ByteBuffer byteBuffer3) {
            this.segmentByteBuffer = byteBuffer;
            this.offsetByteBuffer = byteBuffer2;
            this.timestampByteBuffer = byteBuffer3;
        }

        void failNextRequest() {
            this.failNextRequest.set(true);
        }

        public String keyPrefix() {
            return "";
        }

        public BucketHealthResult checkBucketHealth() {
            return BucketHealthResult.HEALTHY;
        }

        public TierObjectStore.Backend getBackend() {
            return TierObjectStore.Backend.Mock;
        }

        public Map<String, List<VersionInformation>> listObject(String str, boolean z) {
            return null;
        }

        public ByteBuffer getSnapshot(ObjectStoreMetadata objectStoreMetadata, FragmentType fragmentType, int i) {
            return null;
        }

        public TierObjectStoreResponse getObject(ObjectStoreMetadata objectStoreMetadata, ObjectType objectType, Long l, Long l2, VersionInformation versionInformation) throws IOException {
            ByteBuffer byteBuffer;
            if (this.failNextRequest.compareAndSet(true, false)) {
                throw new IOException("Failed to retrieve object.");
            }
            if (l != null && l.longValue() > 2147483647L) {
                throw new IllegalArgumentException("MockedTierObjectStore does not support byteOffset over Integer.MAX_VALUE");
            }
            if (objectType == ObjectType.OFFSET_INDEX) {
                this.offsetIndexReads++;
                byteBuffer = this.offsetByteBuffer;
            } else if (objectType == ObjectType.SEGMENT) {
                this.segmentReads++;
                byteBuffer = this.segmentByteBuffer;
            } else {
                if (objectType != ObjectType.TIMESTAMP_INDEX) {
                    throw new UnsupportedOperationException();
                }
                this.timestampIndexReads++;
                byteBuffer = this.timestampByteBuffer;
            }
            int intValue = l != null ? l.intValue() : 0;
            int min = Math.min(((int) Math.min(l2 != null ? l2.longValue() : Long.MAX_VALUE, byteBuffer.limit())) - intValue, byteBuffer.array().length);
            ByteBuffer allocate = ByteBuffer.allocate(min);
            allocate.put(byteBuffer.array(), intValue, min);
            allocate.flip();
            return new MockTierObjectStoreResponse(new ByteBufferInputStream(allocate));
        }

        public TierObjectAttribute objectExists(ObjectStoreMetadata objectStoreMetadata, ObjectType objectType) throws IOException, TierObjectStoreRetriableException {
            TierObjectAttribute tierObjectAttribute = new TierObjectAttribute(false);
            if (this.segmentByteBuffer.limit() > 0) {
                tierObjectAttribute.exist = true;
                tierObjectAttribute.size = this.segmentByteBuffer.capacity();
            }
            return tierObjectAttribute;
        }

        public OpaqueData prepPutSegment() throws TierObjectStoreRetriableException, IOException {
            return OpaqueData.ZEROED;
        }

        public void putSegment(TierSegmentUpload<?> tierSegmentUpload) {
            throw new UnsupportedOperationException();
        }

        public String putObject(ObjectStoreMetadata objectStoreMetadata, File file, ObjectType objectType) throws IOException {
            throw new IOException("");
        }

        public String putBuffer(ObjectStoreMetadata objectStoreMetadata, ByteBuffer byteBuffer, ObjectType objectType) throws IOException {
            throw new IOException("");
        }

        public void restoreObjectByCopy(ObjectMetadata objectMetadata, String str, VersionInformation versionInformation) {
        }

        public void deleteSegment(ObjectMetadata objectMetadata) {
        }

        public void deleteVersions(List<TierObjectStore.KeyAndVersion> list) {
        }
    }

    private boolean futureReady(long j, CompletableFuture<?> completableFuture) {
        try {
            completableFuture.get(j, TimeUnit.MILLISECONDS);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    @Test
    public void tierFetcherCancellationUnblocksWaitingForMemory() throws InterruptedException {
        MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(getMemoryRecordsBuffer(), ByteBuffer.allocate(0), ByteBuffer.allocate(0));
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
        Metrics metrics = new Metrics();
        KafkaScheduler kafkaScheduler = (KafkaScheduler) Mockito.mock(KafkaScheduler.class);
        TierFetcher tierFetcher = new TierFetcher(this.mockTime, new TierFetcherConfig(1, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, 1L, false, 1), mockedTierObjectStore, kafkaScheduler, metrics, new LogContext());
        tierFetcher.memoryTracker().newLease(CancellationContext.newContext(), 1048576L);
        Assertions.assertFalse(tierFetcher.memoryTracker().tryLease(100L).isPresent(), "expected tierfetcher to have less than zero bytes available in pool, further lease attempts should fail");
        TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, 600, 1000L, true, objectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        PendingFetch fetch = tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey -> {
            completableFuture.complete(true);
        }, 0);
        Thread thread = new Thread((Runnable) fetch);
        Assertions.assertFalse(futureReady(100L, completableFuture), "expected fetch to be blocked on memory allocation");
        fetch.cancel();
        Assertions.assertTrue(futureReady(1000L, completableFuture), "expected canceling the fetch to unblock the memory allocation");
        thread.join();
    }

    @Test
    public void tierFetcherExceptionCausesOnComplete() throws Exception {
        MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(getMemoryRecordsBuffer(), ByteBuffer.allocate(0), ByteBuffer.allocate(0));
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
        Metrics metrics = new Metrics();
        TierFetcher tierFetcher = new TierFetcher(this.mockTime, mockedTierObjectStore, (KafkaScheduler) Mockito.mock(KafkaScheduler.class), metrics);
        try {
            TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, 600, 1000L, true, objectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
            CompletableFuture completableFuture = new CompletableFuture();
            mockedTierObjectStore.failNextRequest();
            Assertions.assertEquals(0.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()).doubleValue(), 0.0d);
            Assertions.assertEquals(0.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.fetchExceptionTotalMetricName).metricValue()).doubleValue(), 0.0d);
            PendingFetch fetch = tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey -> {
                completableFuture.complete(true);
            }, 0);
            Assertions.assertTrue(((Boolean) completableFuture.get(2000L, TimeUnit.MILLISECONDS)).booleanValue());
            Assertions.assertEquals(ReclaimableMemoryRecords.EMPTY, ((TierFetchResult) fetch.finish().get(objectMetadata.topicIdPartition().topicPartition())).records, "expected returned records to be empty due to exception thrown");
            Assertions.assertEquals(0.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()).doubleValue(), 0.0d);
            Assertions.assertEquals(1.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.fetchExceptionTotalMetricName).metricValue()).doubleValue(), 0.0d);
            tierFetcher.close();
            Assertions.assertEquals(0L, tierFetcher.memoryTracker().leased(), "expected zero leased bytes");
        } catch (Throwable th) {
            tierFetcher.close();
            throw th;
        }
    }

    @Test
    public void tierFetcherFetchCancelled() throws Exception {
        MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(getMemoryRecordsBuffer(), ByteBuffer.allocate(0), ByteBuffer.allocate(0));
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
        Metrics metrics = new Metrics();
        TierFetcher tierFetcher = new TierFetcher(this.mockTime, mockedTierObjectStore, (KafkaScheduler) Mockito.mock(KafkaScheduler.class), metrics);
        try {
            TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, 600, 1000L, true, objectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
            CompletableFuture completableFuture = new CompletableFuture();
            Assertions.assertEquals(0.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()).doubleValue(), 0.0d);
            Assertions.assertEquals(0.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.fetchCancellationTotalMetricName).metricValue()).doubleValue(), 0.0d);
            PendingFetch fetch = tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey -> {
                completableFuture.complete(true);
            }, 0);
            fetch.cancel();
            Assertions.assertTrue(((Boolean) completableFuture.get(2000L, TimeUnit.MILLISECONDS)).booleanValue());
            TierFetchResult tierFetchResult = (TierFetchResult) fetch.finish().get(topicIdPartition.topicPartition());
            if (tierFetchResult.records.sizeInBytes() > 0) {
                Assertions.assertTrue(((Double) metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()).doubleValue() > 0.0d);
            }
            if (tierFetchResult.exception == null) {
                Assertions.assertEquals(0.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.fetchExceptionTotalMetricName).metricValue()).doubleValue(), 0.0d);
            }
            fetch.markFetchExpired();
            Assertions.assertEquals(1.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.fetchCancellationTotalMetricName).metricValue()).doubleValue(), 0.0d, "expected 1 cancellation");
            tierFetchResult.records.release();
            tierFetcher.close();
            Assertions.assertEquals(0L, tierFetcher.memoryTracker().leased(), "expected zero leased bytes");
        } catch (Throwable th) {
            tierFetcher.close();
            throw th;
        }
    }

    @Test
    public void testTierFetcherWithBacklog() throws Exception {
        ByteBuffer buffer = MemoryRecords.withRecords((byte) 2, 0L, Compression.NONE, TimestampType.CREATE_TIME, new SimpleRecord[]{new SimpleRecord(1L, "foo".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())}).buffer();
        int length = buffer.array().length;
        File tempFile = org.apache.kafka.test.TestUtils.tempFile();
        FileChannel channel = new FileOutputStream(tempFile, false).getChannel();
        channel.write(buffer);
        channel.close();
        MockInMemoryTierObjectStore mockInMemoryTierObjectStore = (MockInMemoryTierObjectStore) Mockito.mock(MockInMemoryTierObjectStore.class);
        ((MockInMemoryTierObjectStore) Mockito.doAnswer(new Answer<TierObjectStoreResponse>() { // from class: kafka.tier.fetcher.TierFetcherTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public TierObjectStoreResponse m319answer(InvocationOnMock invocationOnMock) throws Throwable {
                return (TierObjectStoreResponse) invocationOnMock.callRealMethod();
            }
        }).when(mockInMemoryTierObjectStore)).getObjectStoreFragment((ObjectStoreMetadata) ArgumentMatchers.any(), (FragmentType) ArgumentMatchers.any(), (Long) ArgumentMatchers.any());
        ((MockInMemoryTierObjectStore) Mockito.doAnswer(new Answer<TierObjectStoreResponse>() { // from class: kafka.tier.fetcher.TierFetcherTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public TierObjectStoreResponse m320answer(InvocationOnMock invocationOnMock) throws Throwable {
                return (TierObjectStoreResponse) invocationOnMock.callRealMethod();
            }
        }).when(mockInMemoryTierObjectStore)).getObjectStoreFragment((ObjectStoreMetadata) ArgumentMatchers.any(), (FragmentType) ArgumentMatchers.any(), (Long) ArgumentMatchers.any(), (Long) ArgumentMatchers.any(), (VersionInformation) ArgumentMatchers.eq((Object) null));
        ((MockInMemoryTierObjectStore) Mockito.doAnswer(new Answer<Void>() { // from class: kafka.tier.fetcher.TierFetcherTest.3
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m321answer(InvocationOnMock invocationOnMock) throws Throwable {
                invocationOnMock.callRealMethod();
                return null;
            }
        }).when(mockInMemoryTierObjectStore)).putObject((ObjectStoreMetadata) ArgumentMatchers.any(), (File) ArgumentMatchers.any(), (ObjectType) ArgumentMatchers.any());
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
        mockInMemoryTierObjectStore.putObject(objectMetadata, tempFile, ObjectType.SEGMENT);
        TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, Integer.valueOf(length), 1000L, true, objectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
        Metrics metrics = new Metrics();
        KafkaScheduler kafkaScheduler = (KafkaScheduler) Mockito.mock(KafkaScheduler.class);
        HashMap hashMap = new HashMap();
        hashMap.put("zookeeper.connect", "127.0.0.1:0000");
        hashMap.put(KafkaConfig.TierFetcherNumThreadsProp(), "1");
        TierFetcher tierFetcher = new TierFetcher(this.mockTime, new TierFetcherConfig(new KafkaConfig(hashMap)), mockInMemoryTierObjectStore, kafkaScheduler, metrics, new LogContext());
        CompletableFuture completableFuture = new CompletableFuture();
        PendingFetch fetch = tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey -> {
            completableFuture.complete(true);
        }, 0);
        CompletableFuture completableFuture2 = new CompletableFuture();
        PendingFetch fetch2 = tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey2 -> {
            completableFuture2.complete(true);
        }, 0);
        completableFuture2.getClass();
        TestUtils.waitUntilTrue(completableFuture2::isDone, () -> {
            return "PendingFetch fails to complete within the expected time";
        }, 5000L, 200L);
        Assertions.assertTrue(tierFetcher.executorService().getQueue().isEmpty());
        Assertions.assertEquals(0, ((Integer) metrics.metric(tierFetcher.tierFetcherMetrics.queueSizeMetricName).metricValue()).intValue());
        Assertions.assertEquals(1, fetch.finish().size());
        Assertions.assertEquals(1, fetch2.finish().size());
        final CompletableFuture completableFuture3 = new CompletableFuture();
        ((MockInMemoryTierObjectStore) Mockito.doAnswer(new Answer<TierObjectStoreResponse>() { // from class: kafka.tier.fetcher.TierFetcherTest.4
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public TierObjectStoreResponse m322answer(InvocationOnMock invocationOnMock) throws Throwable {
                completableFuture3.get();
                return (TierObjectStoreResponse) invocationOnMock.callRealMethod();
            }
        }).when(mockInMemoryTierObjectStore)).getObject((ObjectStoreMetadata) ArgumentMatchers.any(), (ObjectType) ArgumentMatchers.any(), (Long) ArgumentMatchers.any(), (Long) ArgumentMatchers.any(), (VersionInformation) ArgumentMatchers.eq((Object) null));
        CompletableFuture completableFuture4 = new CompletableFuture();
        tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey3 -> {
            completableFuture4.complete(true);
        }, 0);
        CompletableFuture completableFuture5 = new CompletableFuture();
        tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey4 -> {
            completableFuture5.complete(true);
        }, 0);
        CompletableFuture completableFuture6 = new CompletableFuture();
        PendingFetch fetch3 = tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey5 -> {
            completableFuture6.complete(true);
        }, 0);
        Assertions.assertFalse(tierFetcher.executorService().getQueue().isEmpty());
        Assertions.assertTrue(tierFetcher.executorService().getQueue().contains(fetch3));
        Assertions.assertTrue(((Integer) metrics.metric(tierFetcher.tierFetcherMetrics.queueSizeMetricName).metricValue()).intValue() > 0);
        Map finish = fetch3.finish();
        Assertions.assertEquals(1, finish.size());
        Assertions.assertEquals(ReclaimableMemoryRecords.EMPTY, ((TierFetchResult) finish.get(topicIdPartition.topicPartition())).records);
        Assertions.assertFalse(tierFetcher.executorService().getQueue().contains(fetch3));
        Assertions.assertFalse(completableFuture4.isDone());
        completableFuture3.complete(true);
        completableFuture5.getClass();
        TestUtils.waitUntilTrue(completableFuture5::isDone, () -> {
            return "PendingFetch fails to complete within the expected time";
        }, 5000L, 200L);
        Assertions.assertTrue(tierFetcher.executorService().getQueue().isEmpty());
        tierFetcher.close();
    }

    private ByteBuffer getMemoryRecordsBuffer() {
        ByteBuffer allocate = ByteBuffer.allocate(2048);
        ByteBuffer allocate2 = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder(allocate, (byte) 2, Compression.NONE, TimestampType.CREATE_TIME, 0L);
        MemoryRecordsBuilder builder2 = MemoryRecords.builder(allocate2, (byte) 2, Compression.NONE, TimestampType.CREATE_TIME, 0L);
        IntStream.range(0, 50).forEach(i -> {
            builder.appendWithOffset(i, 1L, "a".getBytes(), "v".getBytes());
        });
        IntStream.range(50, 101).forEach(i2 -> {
            builder2.appendWithOffset(i2, 1L, "a".getBytes(), "v".getBytes());
        });
        builder.build();
        builder2.build();
        allocate.flip();
        allocate2.flip();
        ByteBuffer put = ByteBuffer.allocate(4194304).put(allocate).put(allocate2);
        put.flip();
        return put;
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void tierFetcherRequestEmptyIndexTest(boolean z) throws Exception {
        MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(getMemoryRecordsBuffer(), ByteBuffer.allocate(0), ByteBuffer.allocate(0));
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
        Metrics metrics = new Metrics();
        TierFetcher tierFetcher = new TierFetcher(this.mockTime, mockedTierObjectStore, (KafkaScheduler) Mockito.mock(KafkaScheduler.class), metrics);
        if (z) {
            tierFetcher.reconfigureObjectCache(ObjectCacheConfig.DEFAULT, new ObjectCacheConfig(true, 209715200L, 100L, 500L));
        }
        try {
            TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, 10000, 1000L, true, objectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
            CompletableFuture completableFuture = new CompletableFuture();
            Assertions.assertEquals(metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue(), Double.valueOf(0.0d));
            PendingFetch fetch = tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey -> {
                completableFuture.complete(true);
            }, 0);
            MockDelayedFetch mockDelayedFetch = new MockDelayedFetch(fetch);
            Assertions.assertTrue(((Boolean) completableFuture.get(2000L, TimeUnit.MILLISECONDS)).booleanValue());
            Map finish = fetch.finish();
            Assertions.assertNotNull(finish, "expected non-null fetch result");
            Assertions.assertTrue(((Double) metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()).doubleValue() > 0.0d);
            Assertions.assertTrue(mockDelayedFetch.tryComplete());
            TierFetchResult tierFetchResult = (TierFetchResult) finish.get(topicIdPartition.topicPartition());
            long j = 0;
            Iterator it = tierFetchResult.records.records().iterator();
            while (it.hasNext()) {
                Assertions.assertEquals(((Record) it.next()).offset(), j, "Offset not expected");
                j++;
            }
            tierFetchResult.records.release();
            tierFetcher.close();
            Assertions.assertEquals(0L, tierFetcher.memoryTracker().leased(), "expected zero leased bytes");
        } catch (Throwable th) {
            tierFetcher.close();
            throw th;
        }
    }

    @Test
    public void tierFetcherLocateTargetOffsetTest() throws Exception {
        MemoryRecords[] memoryRecordsArr = {buildWithOffset(0L, 50), buildWithOffset(50L, 50), buildWithOffset(100L, 50), buildWithOffset(150L, 50), buildWithOffset(200L, 50)};
        int sizeInBytes = memoryRecordsArr[0].sizeInBytes() + 1;
        File tempDirectory = org.apache.kafka.test.TestUtils.tempDirectory();
        Properties properties = new Properties();
        properties.put("index.interval.bytes", Integer.valueOf(sizeInBytes));
        LogSegment open = LogSegment.open(tempDirectory, 0L, new LogConfig(properties, Collections.emptySet()), this.mockTime, false, 4096, false, "", this.checksumParams);
        try {
            for (MemoryRecords memoryRecords : memoryRecordsArr) {
                long baseOffset = ((MutableRecordBatch) memoryRecords.batches().iterator().next()).baseOffset();
                Iterator it = memoryRecords.batches().iterator();
                while (it.hasNext()) {
                    baseOffset = ((MutableRecordBatch) it.next()).lastOffset();
                }
                open.append(baseOffset, memoryRecords);
            }
            open.flush();
            open.offsetIndex().flush();
            open.offsetIndex().trimToValidSize();
            long readNextOffset = open.readNextOffset() - 1;
            ByteBuffer wrap = ByteBuffer.wrap(Files.readAllBytes(open.offsetIndex().file().toPath()));
            ByteBuffer wrap2 = ByteBuffer.wrap(Files.readAllBytes(open.offsetIndex().file().toPath()));
            ByteBuffer wrap3 = ByteBuffer.wrap(Files.readAllBytes(open.log().file().toPath()));
            MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(wrap3, wrap, wrap2);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
            Metrics metrics = new Metrics();
            TierFetcher tierFetcher = new TierFetcher(this.mockTime, mockedTierObjectStore, (KafkaScheduler) Mockito.mock(KafkaScheduler.class), metrics);
            int i = 0;
            long j = 150;
            while (j < readNextOffset) {
                try {
                    TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), j, 1000, 1000L, true, objectMetadata, OptionConverters.toScala(Optional.empty()), 0L, wrap3.limit());
                    CompletableFuture completableFuture = new CompletableFuture();
                    PendingFetch fetch = tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey -> {
                        completableFuture.complete(true);
                    }, 0);
                    MockDelayedFetch mockDelayedFetch = new MockDelayedFetch(fetch);
                    Assertions.assertTrue(((Boolean) completableFuture.get(4000L, TimeUnit.MILLISECONDS)).booleanValue());
                    Map finish = fetch.finish();
                    Assertions.assertNotNull(finish, "expected non-null fetch result");
                    Assertions.assertTrue(((Double) metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()).doubleValue() > 0.0d);
                    Assertions.assertTrue(mockDelayedFetch.tryComplete());
                    TierFetchResult tierFetchResult = (TierFetchResult) finish.get(topicIdPartition.topicPartition());
                    Iterator it2 = tierFetchResult.records.records().iterator();
                    while (it2.hasNext()) {
                        Assertions.assertEquals(j, ((Record) it2.next()).offset(), "Offset not expected");
                        j++;
                    }
                    if (j < readNextOffset) {
                        i++;
                    }
                    long j2 = i;
                    org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                        return j2 == tierFetcher.offsetCache.size();
                    }, "cache not updated by timeout");
                    tierFetchResult.records.release();
                } catch (Throwable th) {
                    tierFetcher.close();
                    throw th;
                }
            }
            Assertions.assertEquals(j - 1, readNextOffset);
            Assertions.assertEquals(1, mockedTierObjectStore.offsetIndexReads, "offset index should have been used exactly once, for the initial fetch");
            tierFetcher.close();
            Assertions.assertEquals(0L, tierFetcher.memoryTracker().leased(), "expected zero leased bytes");
            open.deleteIfExists();
        } catch (Throwable th2) {
            open.deleteIfExists();
            throw th2;
        }
    }

    /* JADX WARN: Finally extract failed */
    public void tierFetcherRepeatedFetchesViaOffsetCacheTest(boolean z, boolean z2, int i) throws Exception {
        TierFetcher tierFetcher;
        File tempDirectory = org.apache.kafka.test.TestUtils.tempDirectory();
        Properties properties = new Properties();
        properties.put("index.interval.bytes", 1);
        LogSegment open = LogSegment.open(tempDirectory, 0L, new LogConfig(properties), this.mockTime, false, 4096, false, "", this.checksumParams);
        try {
            open.append(open.readNextOffset() + 49, buildWithOffset(open.readNextOffset(), 50));
            open.flush();
            open.append(open.readNextOffset() + 49, buildWithOffset(open.readNextOffset(), 50));
            open.flush();
            open.append(open.readNextOffset() + 49, buildWithOffset(open.readNextOffset(), 50));
            open.flush();
            open.append(open.readNextOffset() + 49, buildWithOffset(open.readNextOffset(), 50));
            open.flush();
            open.offsetIndex().flush();
            open.offsetIndex().trimToValidSize();
            long readNextOffset = open.readNextOffset() - 1;
            ByteBuffer wrap = ByteBuffer.wrap(Files.readAllBytes(open.offsetIndex().file().toPath()));
            ByteBuffer wrap2 = ByteBuffer.wrap(Files.readAllBytes(open.offsetIndex().file().toPath()));
            ByteBuffer wrap3 = ByteBuffer.wrap(Files.readAllBytes(open.log().file().toPath()));
            MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(wrap3, wrap, wrap2);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
            Metrics metrics = new Metrics();
            KafkaScheduler kafkaScheduler = (KafkaScheduler) Mockito.mock(KafkaScheduler.class);
            if (z) {
                tierFetcher = new TierFetcher(this.mockTime, mockedTierObjectStore, kafkaScheduler, metrics);
            } else {
                HashMap hashMap = new HashMap();
                hashMap.put("zookeeper.connect", "127.0.0.1:0000");
                hashMap.put(KafkaConfig.TierFetcherOffsetCacheSizeProp(), "0");
                tierFetcher = new TierFetcher(this.mockTime, new TierFetcherConfig(new KafkaConfig(hashMap)), mockedTierObjectStore, kafkaScheduler, metrics, new LogContext());
            }
            if (z2) {
                tierFetcher.reconfigureObjectCache(ObjectCacheConfig.DEFAULT, new ObjectCacheConfig(true, 209715200L, 1000L, 5000L));
            }
            int i2 = 0;
            long j = 0;
            while (j < readNextOffset) {
                try {
                    TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), j, Integer.valueOf(i), 1000L, true, objectMetadata, OptionConverters.toScala(Optional.empty()), 0L, wrap3.limit());
                    CompletableFuture completableFuture = new CompletableFuture();
                    PendingFetch fetch = tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey -> {
                        completableFuture.complete(true);
                    }, 0);
                    MockDelayedFetch mockDelayedFetch = new MockDelayedFetch(fetch);
                    Assertions.assertTrue(((Boolean) completableFuture.get(4000L, TimeUnit.MILLISECONDS)).booleanValue());
                    Map finish = fetch.finish();
                    Assertions.assertNotNull(finish, "expected non-null fetch result");
                    Assertions.assertTrue(((Double) metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()).doubleValue() > 0.0d);
                    Assertions.assertTrue(mockDelayedFetch.tryComplete());
                    TierFetchResult tierFetchResult = (TierFetchResult) finish.get(topicIdPartition.topicPartition());
                    if (z2) {
                        Assertions.assertTrue(((Long) metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheSizeMetricName).metricValue()).longValue() > 0);
                        Assertions.assertTrue(((Double) metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheAccessTotalMetricName).metricValue()).doubleValue() > 0.0d);
                    } else {
                        Assertions.assertEquals(0L, (Long) metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheSizeMetricName).metricValue());
                    }
                    for (Record record : tierFetchResult.records.records()) {
                        Assertions.assertEquals(j, record.offset(), "Offset not expected");
                        Assertions.assertEquals(String.valueOf(j % 10), Utils.utf8(record.value(), record.valueSize()));
                        j++;
                    }
                    if (z) {
                        if (j < readNextOffset) {
                            i2++;
                        }
                        long j2 = i2;
                        TierFetcher tierFetcher2 = tierFetcher;
                        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                            return j2 == tierFetcher2.offsetCache.size();
                        }, "cache not updated by timeout");
                    }
                    tierFetchResult.records.release();
                } catch (Throwable th) {
                    tierFetcher.close();
                    throw th;
                }
            }
            Assertions.assertEquals(j - 1, readNextOffset);
            if (z) {
                Assertions.assertEquals(1.0d, tierFetcher.offsetCache.hitRatio(), 1.0E-4d);
                if (z2) {
                    Assertions.assertTrue(((double) mockedTierObjectStore.offsetIndexReads) > 0.0d);
                } else {
                    Assertions.assertEquals(0, mockedTierObjectStore.offsetIndexReads, "offset index should not have been used");
                }
            } else {
                Assertions.assertEquals(0L, tierFetcher.offsetCache.size());
                Assertions.assertEquals(0.0d, tierFetcher.offsetCache.hitRatio(), 0.0d);
            }
            tierFetcher.close();
            Assertions.assertEquals(0L, tierFetcher.memoryTracker().leased(), "expected zero leased bytes");
            open.deleteIfExists();
        } catch (Throwable th2) {
            open.deleteIfExists();
            throw th2;
        }
    }

    @Test
    public void tierFetcherRepeatedFetchesWithDisabledOffsetCacheTest() throws Exception {
        tierFetcherRepeatedFetchesViaOffsetCacheTest(false, false, 100);
        tierFetcherRepeatedFetchesViaOffsetCacheTest(false, true, 100);
        tierFetcherRepeatedFetchesViaOffsetCacheTest(false, false, 1000);
        tierFetcherRepeatedFetchesViaOffsetCacheTest(false, true, 1000);
    }

    @Test
    public void tierFetcherRepeatedFetchesWithEnabledOffsetCacheTest() throws Exception {
        tierFetcherRepeatedFetchesViaOffsetCacheTest(true, true, 100);
        tierFetcherRepeatedFetchesViaOffsetCacheTest(true, false, 100);
        tierFetcherRepeatedFetchesViaOffsetCacheTest(true, true, 1000);
        tierFetcherRepeatedFetchesViaOffsetCacheTest(true, false, 1000);
    }

    private MemoryRecords buildWithOffset(long j, int i) {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(2048), (byte) 2, Compression.NONE, TimestampType.CREATE_TIME, j);
        IntStream.range(0, i).forEach(i2 -> {
            builder.appendWithOffset(j + i2, j + i2, "a".getBytes(), String.valueOf((j + i2) % 10).getBytes());
        });
        return builder.build();
    }

    @Test
    public void tierFetcherIndexTest() throws Exception {
        File tempDirectory = org.apache.kafka.test.TestUtils.tempDirectory();
        Properties properties = new Properties();
        properties.put("index.interval.bytes", 1);
        LogSegment open = LogSegment.open(tempDirectory, 0L, new LogConfig(properties), this.mockTime, false, 4096, false, "", this.checksumParams);
        try {
            open.append(open.readNextOffset() + 49, buildWithOffset(open.readNextOffset(), 50));
            open.flush();
            open.append(open.readNextOffset() + 49, buildWithOffset(open.readNextOffset(), 50));
            open.flush();
            open.append(open.readNextOffset() + 49, buildWithOffset(open.readNextOffset(), 50));
            open.flush();
            open.offsetIndex().flush();
            open.offsetIndex().trimToValidSize();
            MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(ByteBuffer.wrap(Files.readAllBytes(open.log().file().toPath())), ByteBuffer.wrap(Files.readAllBytes(open.offsetIndex().file().toPath())), ByteBuffer.wrap(Files.readAllBytes(open.offsetIndex().file().toPath())));
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
            Metrics metrics = new Metrics();
            TierFetcher tierFetcher = new TierFetcher(this.mockTime, mockedTierObjectStore, (KafkaScheduler) Mockito.mock(KafkaScheduler.class), metrics);
            try {
                TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 100L, 10000, 1000L, true, objectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
                CompletableFuture completableFuture = new CompletableFuture();
                Assertions.assertEquals(metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue(), Double.valueOf(0.0d));
                PendingFetch fetch = tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey -> {
                    completableFuture.complete(true);
                }, 0);
                MockDelayedFetch mockDelayedFetch = new MockDelayedFetch(fetch);
                Assertions.assertTrue(((Boolean) completableFuture.get(2000L, TimeUnit.MILLISECONDS)).booleanValue());
                Map finish = fetch.finish();
                Assertions.assertNotNull(finish, "expected non-null fetch result");
                Assertions.assertTrue(((Double) metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()).doubleValue() > 0.0d);
                Assertions.assertTrue(mockDelayedFetch.tryComplete());
                TierFetchResult tierFetchResult = (TierFetchResult) finish.get(topicIdPartition.topicPartition());
                long j = 100;
                Iterator it = tierFetchResult.records.records().iterator();
                while (it.hasNext()) {
                    Assertions.assertEquals(j, ((Record) it.next()).offset(), "Offset not expected");
                    j++;
                }
                tierFetchResult.records.release();
                tierFetcher.close();
                Assertions.assertEquals(0L, tierFetcher.memoryTracker().leased(), "expected zero leased bytes");
                open.close();
            } catch (Throwable th) {
                tierFetcher.close();
                throw th;
            }
        } catch (Throwable th2) {
            open.close();
            throw th2;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void tierTimestampIndexTest(boolean z) throws Exception {
        File tempDirectory = org.apache.kafka.test.TestUtils.tempDirectory();
        Properties properties = new Properties();
        properties.put("index.interval.bytes", 1);
        LogSegment open = LogSegment.open(tempDirectory, 0L, new LogConfig(properties), this.mockTime, false, 4096, false, "", this.checksumParams);
        try {
            open.append(open.readNextOffset() + 49, buildWithOffset(open.readNextOffset(), 50));
            open.flush();
            open.append(open.readNextOffset() + 49, buildWithOffset(open.readNextOffset(), 50));
            open.flush();
            open.append(open.readNextOffset() + 49, buildWithOffset(open.readNextOffset(), 50));
            open.flush();
            open.append(open.readNextOffset() + 49, buildWithOffset(open.readNextOffset(), 50));
            open.offsetIndex().flush();
            open.offsetIndex().trimToValidSize();
            open.timeIndex().flush();
            open.timeIndex().trimToValidSize();
            ByteBuffer wrap = ByteBuffer.wrap(Files.readAllBytes(open.offsetIndex().file().toPath()));
            ByteBuffer wrap2 = ByteBuffer.wrap(Files.readAllBytes(open.timeIndex().file().toPath()));
            ByteBuffer wrap3 = ByteBuffer.wrap(Files.readAllBytes(open.log().file().toPath()));
            MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(wrap3, wrap, wrap2);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
            Metrics metrics = new Metrics();
            TierFetcher tierFetcher = new TierFetcher(this.mockTime, mockedTierObjectStore, (KafkaScheduler) Mockito.mock(KafkaScheduler.class), metrics);
            if (z) {
                tierFetcher.asyncFetchEnabled = true;
                tierFetcher.asyncOffsetForTimestampParallelism = 5;
            }
            try {
                CompletableFuture completableFuture = new CompletableFuture();
                HashMap hashMap = new HashMap();
                hashMap.put(topicIdPartition.topicPartition(), new TierUnfetchedTimestampAndOffset(101L, objectMetadata, wrap3.limit()));
                PendingOffsetForTimestamp fetchOffsetForTimestamp = tierFetcher.fetchOffsetForTimestamp(hashMap, delayedOperationKey -> {
                    completableFuture.complete(true);
                });
                if (z) {
                    Assertions.assertTrue(fetchOffsetForTimestamp instanceof PendingOffsetForTimestampAsync);
                } else {
                    Assertions.assertTrue(fetchOffsetForTimestamp instanceof PendingOffsetForTimestampSync);
                }
                completableFuture.get(2000L, TimeUnit.MILLISECONDS);
                Assertions.assertEquals(Optional.of(new FetchedTimestampAndOffset(101L, 101L, Optional.empty())), fetchOffsetForTimestamp.results().get(topicIdPartition.topicPartition()), "incorrect offset for supplied timestamp returned");
                CompletableFuture completableFuture2 = new CompletableFuture();
                HashMap hashMap2 = new HashMap();
                hashMap2.put(topicIdPartition.topicPartition(), new TierUnfetchedTimestampAndOffset(2L, objectMetadata, wrap3.limit()));
                PendingOffsetForTimestamp fetchOffsetForTimestamp2 = tierFetcher.fetchOffsetForTimestamp(hashMap2, delayedOperationKey2 -> {
                    completableFuture2.complete(true);
                });
                if (z) {
                    Assertions.assertTrue(fetchOffsetForTimestamp2 instanceof PendingOffsetForTimestampAsync);
                } else {
                    Assertions.assertTrue(fetchOffsetForTimestamp2 instanceof PendingOffsetForTimestampSync);
                }
                completableFuture2.get(2000L, TimeUnit.MILLISECONDS);
                Assertions.assertEquals(Optional.of(new FetchedTimestampAndOffset(2L, 2L, Optional.empty())), fetchOffsetForTimestamp2.results().get(topicIdPartition.topicPartition()), "incorrect offset for supplied timestamp returned");
                TopicIdPartition topicIdPartition2 = new TopicIdPartition("foo", UUID.randomUUID(), 1);
                ObjectMetadata objectMetadata2 = new ObjectMetadata(topicIdPartition2, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
                mockedTierObjectStore.failNextRequest();
                CompletableFuture completableFuture3 = new CompletableFuture();
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                linkedHashMap.put(topicIdPartition2.topicPartition(), new TierUnfetchedTimestampAndOffset(100L, objectMetadata2, wrap3.limit()));
                linkedHashMap.put(topicIdPartition.topicPartition(), new TierUnfetchedTimestampAndOffset(199L, objectMetadata, wrap3.limit()));
                PendingOffsetForTimestamp fetchOffsetForTimestamp3 = tierFetcher.fetchOffsetForTimestamp(linkedHashMap, delayedOperationKey3 -> {
                    completableFuture3.complete(true);
                });
                if (z) {
                    Assertions.assertTrue(fetchOffsetForTimestamp3 instanceof PendingOffsetForTimestampAsync);
                } else {
                    Assertions.assertTrue(fetchOffsetForTimestamp3 instanceof PendingOffsetForTimestampSync);
                }
                completableFuture3.get(2000L, TimeUnit.MILLISECONDS);
                Assertions.assertEquals(Optional.of(new FetchedTimestampAndOffset(199L, 199L, Optional.empty())), fetchOffsetForTimestamp3.results().get(topicIdPartition.topicPartition()), "incorrect offset for supplied timestamp returned");
                Exception exc = (Exception) ((FetchedTimestampAndOffset) ((Optional) fetchOffsetForTimestamp3.results().get(topicIdPartition2.topicPartition())).get()).responseException().get();
                Assertions.assertNotNull(exc, "tier object store through exception, pending result should have been completed exceptionally");
                Assertions.assertTrue(exc instanceof KafkaStorageException);
                Assertions.assertEquals(1.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.fetchOffsetForTimestampExceptionTotalMetricName).metricValue()).doubleValue(), 0.0d);
                CompletableFuture completableFuture4 = new CompletableFuture();
                HashMap hashMap3 = new HashMap();
                hashMap3.put(topicIdPartition.topicPartition(), new TierUnfetchedTimestampAndOffset(101L, objectMetadata, wrap3.limit()));
                PendingOffsetForTimestamp fetchOffsetForTimestamp4 = tierFetcher.fetchOffsetForTimestamp(hashMap3, delayedOperationKey4 -> {
                    completableFuture4.complete(true);
                });
                completableFuture4.getClass();
                TestUtils.waitUntilTrue(completableFuture4::isDone, () -> {
                    return "fetchCompletionCallback fails to complete within the expected time";
                }, 5000L, 200L);
                fetchOffsetForTimestamp4.cancel();
                if (z) {
                    Assertions.assertTrue(fetchOffsetForTimestamp4 instanceof PendingOffsetForTimestampAsync);
                } else {
                    Assertions.assertTrue(fetchOffsetForTimestamp4 instanceof PendingOffsetForTimestampSync);
                }
                Assertions.assertEquals(0.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.fetchOffsetForTimestampCancellationTotalMetricName).metricValue()).doubleValue(), 0.001d);
                Assertions.assertEquals(1, fetchOffsetForTimestamp4.results.size());
                tierFetcher.cancellationContext().cancel();
                CompletableFuture completableFuture5 = new CompletableFuture();
                HashMap hashMap4 = new HashMap();
                hashMap4.put(topicIdPartition.topicPartition(), new TierUnfetchedTimestampAndOffset(2L, objectMetadata, wrap3.limit()));
                PendingOffsetForTimestamp fetchOffsetForTimestamp5 = tierFetcher.fetchOffsetForTimestamp(hashMap4, delayedOperationKey5 -> {
                    completableFuture5.complete(true);
                });
                if (z) {
                    Assertions.assertTrue(fetchOffsetForTimestamp5 instanceof PendingOffsetForTimestampAsync);
                } else {
                    Assertions.assertTrue(fetchOffsetForTimestamp5 instanceof PendingOffsetForTimestampSync);
                }
                completableFuture5.get(2000L, TimeUnit.MILLISECONDS);
                Assertions.assertTrue(fetchOffsetForTimestamp5.results.isEmpty());
                tierFetcher.close();
                Assertions.assertEquals(0L, tierFetcher.memoryTracker().leased(), "expected zero leased bytes");
                open.close();
            } catch (Throwable th) {
                tierFetcher.close();
                throw th;
            }
        } catch (Throwable th2) {
            open.close();
            throw th2;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void tierFetcherMaxBytesTest() throws Exception {
        File tempDirectory = org.apache.kafka.test.TestUtils.tempDirectory();
        Properties properties = new Properties();
        properties.put("index.interval.bytes", 1);
        LogSegment open = LogSegment.open(tempDirectory, 0L, new LogConfig(properties), this.mockTime, false, 4096, false, "", this.checksumParams);
        try {
            TreeMap treeMap = new TreeMap();
            open.append(49L, buildWithOffset(0L, 50));
            treeMap.put(Integer.valueOf(open.size()), 49L);
            open.append(99L, buildWithOffset(50L, 50));
            treeMap.put(Integer.valueOf(open.size()), 99L);
            open.append(149L, buildWithOffset(100L, 50));
            treeMap.put(Integer.valueOf(open.size()), 149L);
            open.flush();
            open.offsetIndex().flush();
            open.offsetIndex().trimToValidSize();
            ByteBuffer wrap = ByteBuffer.wrap(Files.readAllBytes(open.offsetIndex().file().toPath()));
            ByteBuffer wrap2 = ByteBuffer.wrap(Files.readAllBytes(open.timeIndex().file().toPath()));
            ByteBuffer wrap3 = ByteBuffer.wrap(Files.readAllBytes(open.log().file().toPath()));
            MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(wrap3, wrap, wrap2);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            TopicPartition topicPartition = topicIdPartition.topicPartition();
            ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
            Metrics metrics = new Metrics();
            TierFetcher tierFetcher = new TierFetcher(this.mockTime, mockedTierObjectStore, (KafkaScheduler) Mockito.mock(KafkaScheduler.class), metrics);
            try {
                int[] iArr = {0, 400, 600, wrap3.remaining() - 1, wrap3.remaining(), wrap3.remaining() + 1};
                TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, 600, 1000L, true, objectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
                Assertions.assertEquals(metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue(), Double.valueOf(0.0d));
                for (int i : iArr) {
                    CompletableFuture completableFuture = new CompletableFuture();
                    PendingFetch fetch = tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey -> {
                        completableFuture.complete(true);
                    }, i);
                    MockDelayedFetch mockDelayedFetch = new MockDelayedFetch(fetch);
                    Assertions.assertTrue(((Boolean) completableFuture.get(2000L, TimeUnit.MILLISECONDS)).booleanValue());
                    Map finish = fetch.finish();
                    Assertions.assertNotNull(finish, "expected non-null fetch result");
                    Assertions.assertTrue(((Double) metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()).doubleValue() > 0.0d);
                    Assertions.assertTrue(mockDelayedFetch.tryComplete());
                    TierFetchResult tierFetchResult = (TierFetchResult) finish.get(topicPartition);
                    ReclaimableMemoryRecords reclaimableMemoryRecords = tierFetchResult.records;
                    int min = Math.min(Math.max(600, i), wrap3.remaining());
                    Assertions.assertTrue(tierFetchResult.records.sizeInBytes() <= min);
                    long j = -1;
                    Iterator it = reclaimableMemoryRecords.records().iterator();
                    while (it.hasNext()) {
                        j++;
                        Assertions.assertEquals(((Record) it.next()).offset(), j, "Offset not expected");
                    }
                    Assertions.assertEquals(((Long) treeMap.floorEntry(Integer.valueOf(min)).getValue()).longValue(), j, "Unexpected lastOffset for overrideMaxBytes " + i);
                    tierFetchResult.records.release();
                }
                tierFetcher.close();
                Assertions.assertEquals(0L, tierFetcher.memoryTracker().leased(), "expected zero leased bytes");
                open.close();
            } catch (Throwable th) {
                tierFetcher.close();
                throw th;
            }
        } catch (Throwable th2) {
            open.close();
            throw th2;
        }
    }

    @Test
    public void testTierObjectStoreExceptionReleasesLease() {
        CancellationContext newContext = CancellationContext.newContext();
        MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(getMemoryRecordsBuffer(), ByteBuffer.allocate(0), ByteBuffer.allocate(0));
        FetchOffsetCache fetchOffsetCache = new FetchOffsetCache(this.mockTime, 0, 0);
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
        MemoryTracker memoryTracker = new MemoryTracker(this.mockTime, 1L);
        PendingFetch pendingFetch = new PendingFetch(newContext, mockedTierObjectStore, fetchOffsetCache, Optional.empty(), objectMetadata, delayedOperationKey -> {
        }, 0L, 1024, 1024, IsolationLevel.READ_UNCOMMITTED, memoryTracker, Collections.emptyList(), this.mockTime);
        mockedTierObjectStore.failNextRequest();
        pendingFetch.run();
        TierFetchResult tierFetchResult = (TierFetchResult) pendingFetch.finish().get(topicIdPartition.topicPartition());
        Assertions.assertNull(tierFetchResult.exception, "supress IOException as it is retriable");
        Assertions.assertFalse(tierFetchResult.records.records().iterator().hasNext(), "Expected to find 0 records since exception was thrown");
        Assertions.assertEquals(0L, memoryTracker.leased(), "expected all memory to be returned to the memory tracker");
    }

    @Test
    public void memoryLeaseWithKnownFirstBatchSize() throws InterruptedException {
        CancellationContext newContext = CancellationContext.newContext();
        MockedTierObjectStore mockedTierObjectStore = new MockedTierObjectStore(getMemoryRecordsBuffer(), ByteBuffer.allocate(0), ByteBuffer.allocate(0));
        FetchOffsetCache fetchOffsetCache = new FetchOffsetCache(this.mockTime, Integer.MAX_VALUE, Integer.MAX_VALUE);
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
        fetchOffsetCache.put(objectMetadata.objectId(), 1L, 0, OptionalInt.of(2048));
        MemoryTracker memoryTracker = new MemoryTracker(this.mockTime, 1024L);
        MemoryTracker.MemoryLease newLease = memoryTracker.newLease(newContext, 1024 * 2);
        PendingFetch pendingFetch = new PendingFetch(newContext, mockedTierObjectStore, fetchOffsetCache, Optional.empty(), objectMetadata, delayedOperationKey -> {
        }, 1L, 1024, 1024, IsolationLevel.READ_UNCOMMITTED, memoryTracker, Collections.emptyList(), this.mockTime);
        Thread thread = new Thread((Runnable) pendingFetch);
        thread.start();
        Assertions.assertFalse(pendingFetch.isComplete(), "expected pending fetch to be blocked on memory allocation");
        newLease.release();
        memoryTracker.wakeup();
        thread.join();
        TierFetchResult tierFetchResult = (TierFetchResult) pendingFetch.finish().get(topicIdPartition.topicPartition());
        Assertions.assertEquals(2048, memoryTracker.leased(), "expected exactly firstBatchSize bytes to be leased");
        tierFetchResult.records.release();
        Assertions.assertEquals(0L, memoryTracker.leased(), "expected releasing the records returns leased memory to the MemoryTracker");
    }

    @Test
    public void testResizeTierFetcherMemoryPoolDynamically() {
        HashMap hashMap = new HashMap();
        hashMap.put("zookeeper.connect", "127.0.0.1:0000");
        hashMap.put(KafkaConfig.TierFetcherMemoryPoolSizeBytesProp(), "1024");
        KafkaConfig kafkaConfig = new KafkaConfig(hashMap);
        TierFetcher tierFetcher = new TierFetcher(this.mockTime, new TierFetcherConfig(kafkaConfig), new MockInMemoryTierObjectStore(this.mockTime, new MockInMemoryTierObjectStoreConfig()), (KafkaScheduler) Mockito.mock(KafkaScheduler.class), new Metrics(), new LogContext());
        Assertions.assertTrue(tierFetcher.reconfigurableConfigs().contains(KafkaConfig.TierFetcherMemoryPoolSizeBytesProp()), "expected TierFetcher memory pool size to be reconfigurable");
        Assertions.assertEquals(tierFetcher.memoryTracker().poolSize(), 1024L, "expected TierFetcher memory pool size to match what was set originally");
        hashMap.put(KafkaConfig.TierFetcherMemoryPoolSizeBytesProp(), "0");
        tierFetcher.reconfigure(kafkaConfig, new KafkaConfig(hashMap));
        Assertions.assertEquals(tierFetcher.memoryTracker().poolSize(), 0L, "expected TierFetcher memory pool size to be updated to the new size");
    }

    @Test
    public void testReconfigureTierFetcher() {
        HashMap hashMap = new HashMap();
        hashMap.put("zookeeper.connect", "127.0.0.1:0000");
        hashMap.put(KafkaConfig.TierFetcherMemoryPoolSizeBytesProp(), "1024");
        KafkaConfig kafkaConfig = new KafkaConfig(hashMap);
        TierFetcher tierFetcher = new TierFetcher(this.mockTime, new TierFetcherConfig(kafkaConfig), new MockInMemoryTierObjectStore(this.mockTime, new MockInMemoryTierObjectStoreConfig()), (KafkaScheduler) Mockito.mock(KafkaScheduler.class), new Metrics(), new LogContext());
        Assertions.assertTrue(tierFetcher.reconfigurableConfigs().contains(KafkaConfig.TierFetcherAsyncEnableProp()) && tierFetcher.reconfigurableConfigs().contains(KafkaConfig.TierFetcherAsyncOffsetForTimestampParallelismProp()), "expected TierFetcher async related configs to be reconfigurable");
        Assertions.assertTrue(tierFetcher.reconfigurableConfigs().contains(KafkaConfig.TierPrefetchCacheEnableProp()) && tierFetcher.reconfigurableConfigs().contains(KafkaConfig.TierPrefetchCacheEntrySizeBytesProp()) && tierFetcher.reconfigurableConfigs().contains(KafkaConfig.TierPrefetchCacheRangeBytesProp()) && tierFetcher.reconfigurableConfigs().contains(KafkaConfig.TierPrefetchCacheMaxSizeBytesProp()), "expected TierFetcher object cache related configs to be reconfigurable");
        Assertions.assertFalse(tierFetcher.asyncFetchEnabled, "expected TierFetcher async fetching is disabled");
        Assertions.assertFalse(tierFetcher.prefetchCacheOpt.isPresent(), "expected TierFetcher object cache is disabled");
        hashMap.put(KafkaConfig.TierFetcherAsyncEnableProp(), "true");
        hashMap.put(KafkaConfig.TierFetcherAsyncOffsetForTimestampParallelismProp(), "10");
        hashMap.put(KafkaConfig.TierPrefetchCacheEnableProp(), "true");
        tierFetcher.reconfigure(kafkaConfig, new KafkaConfig(hashMap));
        Assertions.assertTrue(tierFetcher.asyncFetchEnabled && tierFetcher.asyncOffsetForTimestampParallelism == 10, "expected TierFetcher async related configs to be updated");
        Assertions.assertTrue(tierFetcher.prefetchCacheOpt.isPresent());
        Assertions.assertThrows(ConfigException.class, () -> {
            TierFetcher.validateObjectCacheConfig(new ObjectCacheConfig(true, 2L, 5L, 25L));
        });
    }

    @Test
    public void testTierFetchTotalTimeMs() throws Exception {
        ByteBuffer buffer = MemoryRecords.withRecords((byte) 2, 0L, Compression.NONE, TimestampType.CREATE_TIME, new SimpleRecord[]{new SimpleRecord(1L, "foo".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())}).buffer();
        File tempFile = org.apache.kafka.test.TestUtils.tempFile();
        FileChannel channel = new FileOutputStream(tempFile, false).getChannel();
        channel.write(buffer);
        channel.close();
        MockInMemoryTierObjectStore mockInMemoryTierObjectStore = (MockInMemoryTierObjectStore) Mockito.mock(MockInMemoryTierObjectStore.class);
        ((MockInMemoryTierObjectStore) Mockito.doAnswer(new Answer<TierObjectStoreResponse>() { // from class: kafka.tier.fetcher.TierFetcherTest.5
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public TierObjectStoreResponse m323answer(InvocationOnMock invocationOnMock) throws Throwable {
                return (TierObjectStoreResponse) invocationOnMock.callRealMethod();
            }
        }).when(mockInMemoryTierObjectStore)).getObjectStoreFragment((ObjectStoreMetadata) ArgumentMatchers.any(), (FragmentType) ArgumentMatchers.any(), (Long) ArgumentMatchers.any());
        ((MockInMemoryTierObjectStore) Mockito.doAnswer(new Answer<TierObjectStoreResponse>() { // from class: kafka.tier.fetcher.TierFetcherTest.6
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public TierObjectStoreResponse m324answer(InvocationOnMock invocationOnMock) throws Throwable {
                return (TierObjectStoreResponse) invocationOnMock.callRealMethod();
            }
        }).when(mockInMemoryTierObjectStore)).getObjectStoreFragment((ObjectStoreMetadata) ArgumentMatchers.any(), (FragmentType) ArgumentMatchers.any(), (Long) ArgumentMatchers.any(), (Long) ArgumentMatchers.any(), (VersionInformation) ArgumentMatchers.eq((Object) null));
        ((MockInMemoryTierObjectStore) Mockito.doAnswer(new Answer<Void>() { // from class: kafka.tier.fetcher.TierFetcherTest.7
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m325answer(InvocationOnMock invocationOnMock) throws Throwable {
                invocationOnMock.callRealMethod();
                return null;
            }
        }).when(mockInMemoryTierObjectStore)).putObject((ObjectStoreMetadata) ArgumentMatchers.any(), (File) ArgumentMatchers.any(), (ObjectType) ArgumentMatchers.any());
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
        mockInMemoryTierObjectStore.putObject(objectMetadata, tempFile, ObjectType.SEGMENT);
        Metrics metrics = new Metrics();
        TierFetcher tierFetcher = new TierFetcher(this.mockTime, mockInMemoryTierObjectStore, (KafkaScheduler) Mockito.mock(KafkaScheduler.class), metrics);
        try {
            TierFetchMetadata tierFetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, Integer.valueOf(buffer.array().length), 1000L, true, objectMetadata, OptionConverters.toScala(Optional.empty()), 0L, 1000);
            int i = 0;
            while (i < 100) {
                final int i2 = i < 89 ? 100 : i < 98 ? 500 : 1000;
                ((MockInMemoryTierObjectStore) Mockito.doAnswer(new Answer<TierObjectStoreResponse>() { // from class: kafka.tier.fetcher.TierFetcherTest.8
                    /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                    public TierObjectStoreResponse m326answer(InvocationOnMock invocationOnMock) throws Throwable {
                        TierFetcherTest.this.mockTime.sleep(i2);
                        return (TierObjectStoreResponse) invocationOnMock.callRealMethod();
                    }
                }).when(mockInMemoryTierObjectStore)).getObject((ObjectStoreMetadata) ArgumentMatchers.any(), (ObjectType) ArgumentMatchers.any(), (Long) ArgumentMatchers.any(), (Long) ArgumentMatchers.any(), (VersionInformation) ArgumentMatchers.eq((Object) null));
                CompletableFuture<?> completableFuture = new CompletableFuture<>();
                PendingFetch fetch = tierFetcher.fetch(new ArrayList(Collections.singletonList(tierFetchMetadata)), IsolationLevel.READ_UNCOMMITTED, delayedOperationKey -> {
                    completableFuture.complete(true);
                }, 0);
                futureReady(2000L, completableFuture);
                Assertions.assertNotNull(fetch.finish(), "expected non-null fetch result");
                i++;
            }
            Assertions.assertEquals(100.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.fetchTotalTimeMs50PercentileMetricName).metricValue()).doubleValue(), 0.001d);
            Assertions.assertEquals(500.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.fetchTotalTimeMs90PercentileMetricName).metricValue()).doubleValue(), 0.001d);
            Assertions.assertEquals(1000.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.fetchTotalTimeMs99PercentileMetricName).metricValue()).doubleValue(), 0.001d);
            Assertions.assertEquals(1.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.fetchPartitionCount50PercentileMetricName).metricValue()).doubleValue(), 0.001d);
            Assertions.assertEquals(1.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.fetchPartitionCount90PercentileMetricName).metricValue()).doubleValue(), 0.001d);
            Assertions.assertEquals(1.0d, ((Double) metrics.metric(tierFetcher.tierFetcherMetrics.fetchPartitionCount99PercentileMetricName).metricValue()).doubleValue(), 0.001d);
            tierFetcher.close();
            Assertions.assertEquals(0L, tierFetcher.memoryTracker().leased(), "expected zero leased bytes");
        } catch (Throwable th) {
            tierFetcher.close();
            throw th;
        }
    }

    @Test
    public void testFetchFromObjectCache() throws ExecutionException, InterruptedException, IOException {
        int i = (25 * 10) + (10 / 2);
        byte[] randomBytes = org.apache.kafka.test.TestUtils.randomBytes(i);
        ByteBuffer wrap = ByteBuffer.wrap(randomBytes);
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        UUID randomUUID = UUID.randomUUID();
        ObjectMetadata objectMetadata = new ObjectMetadata(topicIdPartition, randomUUID, 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
        TierObjectStore tierObjectStore = (TierObjectStore) Mockito.spy(new MockedTierObjectStore(wrap, null, null));
        Metrics metrics = new Metrics();
        TierFetcher tierFetcher = new TierFetcher(this.mockTime, tierObjectStore, (KafkaScheduler) Mockito.mock(KafkaScheduler.class), metrics);
        Assertions.assertFalse(tierFetcher.prefetchCacheOpt.isPresent());
        Assertions.assertEquals(Double.valueOf(0.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheHitTotalMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheAccessTotalMetricName).metricValue());
        Assertions.assertEquals(0L, metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheSizeMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), metrics.metric(tierFetcher.tierFetcherMetrics.bytesPrefetchedTotalMetricName).metricValue());
        ObjectCacheConfig objectCacheConfig = new ObjectCacheConfig(true, 15 * 10, 10, 5 * 10);
        tierFetcher.reconfigureObjectCache(ObjectCacheConfig.DEFAULT, objectCacheConfig);
        Assertions.assertTrue(tierFetcher.prefetchCacheOpt.isPresent());
        PrefetchCache prefetchCache = (PrefetchCache) tierFetcher.prefetchCacheOpt.get();
        Assertions.assertEquals(Double.valueOf(0.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheHitTotalMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheAccessTotalMetricName).metricValue());
        Assertions.assertEquals(0L, metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheSizeMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), metrics.metric(tierFetcher.tierFetcherMetrics.bytesPrefetchedTotalMetricName).metricValue());
        InputStream inputStream = (InputStream) prefetchCache.get(objectMetadata, 1, 5, i).get();
        byte[] bArr = new byte[(5 - 1) * 2];
        int i2 = 5 - 1;
        Assertions.assertEquals(i2, inputStream.available());
        Assertions.assertEquals(i2, inputStream.read(bArr));
        int i3 = 0;
        int i4 = 1;
        while (i3 < i2) {
            Assertions.assertEquals(randomBytes[i4], bArr[i3]);
            i3++;
            i4++;
        }
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(prefetchCache.cacheMap.size() == 5);
        }, () -> {
            return prefetchCache.cacheMap.size() + " but expected 5";
        }, 5000L, 1000L);
        int i5 = 0;
        while (true) {
            int i6 = i5;
            if (i6 >= 50) {
                break;
            }
            PrefetchCache.CacheKey cacheKey = new PrefetchCache.CacheKey(randomUUID, i6);
            Assertions.assertTrue(prefetchCache.cacheMap.containsKey(cacheKey));
            ByteBuf byteBuf = (ByteBuf) ((PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(cacheKey)).future.get();
            for (int i7 = 0; i7 < 10; i7++) {
                Assertions.assertEquals(randomBytes[i6 + i7], byteBuf.getByte(i7));
            }
            i5 = i6 + 10;
        }
        int i8 = 0;
        while (true) {
            int i9 = i8;
            if (i9 >= 50) {
                break;
            }
            PrefetchCache.CacheEntry cacheEntry = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(new PrefetchCache.CacheKey(randomUUID, i9));
            Assertions.assertEquals(i9 == 0 ? 2 : 1, cacheEntry.refCnt.get());
            Assertions.assertEquals(i9 == 0 ? 2 : 1, ((ByteBuf) cacheEntry.future.get()).refCnt());
            i8 = i9 + 10;
        }
        inputStream.close();
        int i10 = 0;
        while (true) {
            int i11 = i10;
            if (i11 >= 50) {
                break;
            }
            PrefetchCache.CacheEntry cacheEntry2 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(new PrefetchCache.CacheKey(randomUUID, i11));
            Assertions.assertEquals(1, cacheEntry2.refCnt.get());
            Assertions.assertEquals(1, ((ByteBuf) cacheEntry2.future.get()).refCnt());
            i10 = i11 + 10;
        }
        Assertions.assertEquals(Double.valueOf(0.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheHitTotalMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(1.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheAccessTotalMetricName).metricValue());
        Assertions.assertEquals(Long.valueOf(5 * 10), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheSizeMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(50.0d), metrics.metric(tierFetcher.tierFetcherMetrics.bytesPrefetchedTotalMetricName).metricValue());
        InputStream inputStream2 = (InputStream) prefetchCache.get(objectMetadata, 70, 79, i).get();
        byte[] bArr2 = new byte[(79 - 70) * 2];
        int i12 = 79 - 70;
        Assertions.assertEquals(i12, inputStream2.available());
        Assertions.assertEquals(i12, inputStream2.read(bArr2));
        int i13 = 0;
        int i14 = 70;
        while (i13 < i12) {
            Assertions.assertEquals(randomBytes[i14], bArr2[i13]);
            i13++;
            i14++;
        }
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(prefetchCache.cacheMap.size() == 10);
        }, () -> {
            return prefetchCache.cacheMap.size() + " but expected 10";
        }, 5000L, 1000L);
        int i15 = 70;
        while (true) {
            int i16 = i15;
            if (i16 > 110) {
                break;
            }
            PrefetchCache.CacheKey cacheKey2 = new PrefetchCache.CacheKey(randomUUID, i16);
            Assertions.assertTrue(prefetchCache.cacheMap.containsKey(cacheKey2));
            ByteBuf byteBuf2 = (ByteBuf) ((PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(cacheKey2)).future.get();
            for (int i17 = 0; i17 < 10; i17++) {
                Assertions.assertEquals(randomBytes[i16 + i17], byteBuf2.getByte(i17));
            }
            i15 = i16 + 10;
        }
        int i18 = 70;
        while (true) {
            int i19 = i18;
            if (i19 > 110) {
                break;
            }
            PrefetchCache.CacheEntry cacheEntry3 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(new PrefetchCache.CacheKey(randomUUID, i19));
            Assertions.assertEquals(i19 == 70 ? 2 : 1, cacheEntry3.refCnt.get());
            Assertions.assertEquals(i19 == 70 ? 2 : 1, ((ByteBuf) cacheEntry3.future.get()).refCnt());
            i18 = i19 + 10;
        }
        inputStream2.close();
        int i20 = 70;
        while (true) {
            int i21 = i20;
            if (i21 > 110) {
                break;
            }
            PrefetchCache.CacheEntry cacheEntry4 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(new PrefetchCache.CacheKey(randomUUID, i21));
            Assertions.assertEquals(1, cacheEntry4.refCnt.get());
            Assertions.assertEquals(1, ((ByteBuf) cacheEntry4.future.get()).refCnt());
            i20 = i21 + 10;
        }
        Assertions.assertEquals(Double.valueOf(0.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheHitTotalMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(2.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheAccessTotalMetricName).metricValue());
        Assertions.assertEquals(Long.valueOf(10 * 10), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheSizeMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(100.0d), metrics.metric(tierFetcher.tierFetcherMetrics.bytesPrefetchedTotalMetricName).metricValue());
        InputStream inputStream3 = (InputStream) prefetchCache.get(objectMetadata, 52, 58, i).get();
        byte[] bArr3 = new byte[(58 - 52) * 2];
        int i22 = 58 - 52;
        Assertions.assertEquals(i22, inputStream3.available());
        Assertions.assertEquals(i22, inputStream3.read(bArr3));
        int i23 = 0;
        int i24 = 52;
        while (i23 < i22) {
            Assertions.assertEquals(randomBytes[i24], bArr3[i23]);
            i23++;
            i24++;
        }
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(prefetchCache.cacheMap.size() == 12);
        }, () -> {
            return prefetchCache.cacheMap.size() + " but expected 12";
        }, 5000L, 1000L);
        int i25 = 50;
        while (true) {
            int i26 = i25;
            if (i26 > 60) {
                break;
            }
            PrefetchCache.CacheKey cacheKey3 = new PrefetchCache.CacheKey(randomUUID, i26);
            Assertions.assertTrue(prefetchCache.cacheMap.containsKey(cacheKey3));
            ByteBuf byteBuf3 = (ByteBuf) ((PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(cacheKey3)).future.get();
            for (int i27 = 0; i27 < 10; i27++) {
                Assertions.assertEquals(randomBytes[i26 + i27], byteBuf3.getByte(i27));
            }
            i25 = i26 + 10;
        }
        int i28 = 50;
        while (true) {
            int i29 = i28;
            if (i29 > 60) {
                break;
            }
            PrefetchCache.CacheEntry cacheEntry5 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(new PrefetchCache.CacheKey(randomUUID, i29));
            Assertions.assertEquals(i29 == 50 ? 2 : 1, cacheEntry5.refCnt.get());
            Assertions.assertEquals(i29 == 50 ? 2 : 1, ((ByteBuf) cacheEntry5.future.get()).refCnt());
            i28 = i29 + 10;
        }
        inputStream3.close();
        int i30 = 50;
        while (true) {
            int i31 = i30;
            if (i31 > 60) {
                break;
            }
            PrefetchCache.CacheEntry cacheEntry6 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(new PrefetchCache.CacheKey(randomUUID, i31));
            Assertions.assertEquals(1, cacheEntry6.refCnt.get());
            Assertions.assertEquals(1, ((ByteBuf) cacheEntry6.future.get()).refCnt());
            i30 = i31 + 10;
        }
        Assertions.assertEquals(Double.valueOf(0.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheHitTotalMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(3.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheAccessTotalMetricName).metricValue());
        Assertions.assertEquals(Long.valueOf(12 * 10), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheSizeMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(120.0d), metrics.metric(tierFetcher.tierFetcherMetrics.bytesPrefetchedTotalMetricName).metricValue());
        InputStream inputStream4 = (InputStream) prefetchCache.get(objectMetadata, 58, 65, i).get();
        byte[] bArr4 = new byte[(65 - 58) * 2];
        int i32 = 65 - 58;
        Assertions.assertEquals(i32, inputStream4.available());
        Assertions.assertEquals(i32, inputStream4.read(bArr4));
        int i33 = 0;
        int i34 = 58;
        while (i33 < i32) {
            Assertions.assertEquals(randomBytes[i34], bArr4[i33]);
            i33++;
            i34++;
        }
        Thread.sleep(1000L);
        Assertions.assertEquals(12, prefetchCache.cacheMap.size());
        int i35 = 0;
        while (true) {
            int i36 = i35;
            if (i36 > 110) {
                break;
            }
            PrefetchCache.CacheKey cacheKey4 = new PrefetchCache.CacheKey(randomUUID, i36);
            Assertions.assertTrue(prefetchCache.cacheMap.containsKey(cacheKey4));
            ByteBuf byteBuf4 = (ByteBuf) ((PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(cacheKey4)).future.get();
            for (int i37 = 0; i37 < 10; i37++) {
                Assertions.assertEquals(randomBytes[i36 + i37], byteBuf4.getByte(i37));
            }
            i35 = i36 + 10;
        }
        int i38 = 0;
        while (true) {
            int i39 = i38;
            if (i39 > 110) {
                break;
            }
            PrefetchCache.CacheEntry cacheEntry7 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(new PrefetchCache.CacheKey(randomUUID, i39));
            Assertions.assertEquals((i39 == 50 || i39 == 60) ? 2 : 1, cacheEntry7.refCnt.get());
            Assertions.assertEquals((i39 == 50 || i39 == 60) ? 2 : 1, ((ByteBuf) cacheEntry7.future.get()).refCnt());
            i38 = i39 + 10;
        }
        inputStream4.close();
        int i40 = 0;
        while (true) {
            int i41 = i40;
            if (i41 > 110) {
                break;
            }
            PrefetchCache.CacheEntry cacheEntry8 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(new PrefetchCache.CacheKey(randomUUID, i41));
            Assertions.assertEquals(1, cacheEntry8.refCnt.get());
            Assertions.assertEquals(1, ((ByteBuf) cacheEntry8.future.get()).refCnt());
            i40 = i41 + 10;
        }
        Assertions.assertEquals(Double.valueOf(2.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheHitTotalMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(5.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheAccessTotalMetricName).metricValue());
        Assertions.assertEquals(Long.valueOf(12 * 10), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheSizeMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(120.0d), metrics.metric(tierFetcher.tierFetcherMetrics.bytesPrefetchedTotalMetricName).metricValue());
        PrefetchCache.CacheKey cacheKey5 = new PrefetchCache.CacheKey(randomUUID, 0L);
        PrefetchCache.CacheKey cacheKey6 = new PrefetchCache.CacheKey(randomUUID, 10L);
        PrefetchCache.CacheEntry cacheEntry9 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(cacheKey5);
        PrefetchCache.CacheEntry cacheEntry10 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(cacheKey6);
        InputStream inputStream5 = (InputStream) prefetchCache.get(objectMetadata, 152, 175, i).get();
        byte[] bArr5 = new byte[(175 - 152) + 5];
        int i42 = 175 - 152;
        Assertions.assertEquals(i42, inputStream5.available());
        Assertions.assertEquals(i42, inputStream5.read(bArr5));
        int i43 = 0;
        int i44 = 152;
        while (i43 < i42) {
            Assertions.assertEquals(randomBytes[i44], bArr5[i43]);
            i43++;
            i44++;
        }
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(prefetchCache.cacheMap.size() == 15 && !prefetchCache.cacheMap.containsKey(cacheKey5) && !prefetchCache.cacheMap.containsKey(cacheKey6) && cacheEntry9.refCnt.get() == 0 && cacheEntry10.refCnt.get() == 0);
        }, () -> {
            return prefetchCache.cacheMap.size() + " but expected 15";
        }, 5000L, 1000L);
        int i45 = 150;
        while (true) {
            int i46 = i45;
            if (i46 > 190) {
                break;
            }
            PrefetchCache.CacheKey cacheKey7 = new PrefetchCache.CacheKey(randomUUID, i46);
            Assertions.assertTrue(prefetchCache.cacheMap.containsKey(cacheKey7));
            ByteBuf byteBuf5 = (ByteBuf) ((PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(cacheKey7)).future.get();
            for (int i47 = 0; i47 < 10; i47++) {
                Assertions.assertEquals(randomBytes[i46 + i47], byteBuf5.getByte(i47));
            }
            i45 = i46 + 10;
        }
        int i48 = 150;
        while (true) {
            int i49 = i48;
            if (i49 > 190) {
                break;
            }
            PrefetchCache.CacheEntry cacheEntry11 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(new PrefetchCache.CacheKey(randomUUID, i49));
            Assertions.assertEquals((i49 == 150 || i49 == 160 || i49 == 170) ? 2 : 1, cacheEntry11.refCnt.get());
            Assertions.assertEquals((i49 == 150 || i49 == 160 || i49 == 170) ? 2 : 1, ((ByteBuf) cacheEntry11.future.get()).refCnt());
            i48 = i49 + 10;
        }
        inputStream5.close();
        int i50 = 150;
        while (true) {
            int i51 = i50;
            if (i51 > 190) {
                break;
            }
            PrefetchCache.CacheEntry cacheEntry12 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(new PrefetchCache.CacheKey(randomUUID, i51));
            Assertions.assertEquals(1, cacheEntry12.refCnt.get());
            Assertions.assertEquals(1, ((ByteBuf) cacheEntry12.future.get()).refCnt());
            i50 = i51 + 10;
        }
        Assertions.assertEquals(Double.valueOf(2.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheHitTotalMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(8.0d), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheAccessTotalMetricName).metricValue());
        Assertions.assertEquals(Long.valueOf(15 * 10), metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheSizeMetricName).metricValue());
        Assertions.assertEquals(Double.valueOf(170.0d), metrics.metric(tierFetcher.tierFetcherMetrics.bytesPrefetchedTotalMetricName).metricValue());
        prefetchCache.cacheMap.values().forEach((v0) -> {
            v0.release();
        });
        prefetchCache.cacheMap.values().forEach(cacheEntry13 -> {
            Assertions.assertEquals(0, cacheEntry13.refCnt.get());
        });
        prefetchCache.cacheMap.values().forEach(cacheEntry14 -> {
            Assertions.assertEquals(0, ((ByteBuf) cacheEntry14.future.join()).refCnt());
        });
        prefetchCache.cacheMap.clear();
        InputStream inputStream6 = (InputStream) prefetchCache.get(objectMetadata, 251, 277, i).get();
        byte[] bArr6 = new byte[10];
        int i52 = i - 251;
        Assertions.assertEquals(i52, inputStream6.available());
        Assertions.assertEquals(i52, inputStream6.read(bArr6));
        int i53 = 0;
        int i54 = 251;
        while (i53 < i52) {
            Assertions.assertEquals(randomBytes[i54], bArr6[i53]);
            i53++;
            i54++;
        }
        Assertions.assertEquals(1, prefetchCache.cacheMap.size());
        PrefetchCache.CacheKey cacheKey8 = new PrefetchCache.CacheKey(randomUUID, 250L);
        Assertions.assertTrue(prefetchCache.cacheMap.containsKey(cacheKey8));
        ByteBuf byteBuf6 = (ByteBuf) ((PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(cacheKey8)).future.get();
        Assertions.assertEquals(2, byteBuf6.refCnt());
        Assertions.assertEquals(i - cacheKey8.bytePosition, byteBuf6.readableBytes());
        int i55 = 251;
        int i56 = 1;
        while (i55 < i) {
            Assertions.assertEquals(randomBytes[i55], byteBuf6.getByte(i56));
            i55++;
            i56++;
        }
        Assertions.assertEquals(Double.valueOf(175.0d), metrics.metric(tierFetcher.tierFetcherMetrics.bytesPrefetchedTotalMetricName).metricValue());
        inputStream6.close();
        Assertions.assertEquals(1, byteBuf6.refCnt());
        TopicIdPartition topicIdPartition2 = new TopicIdPartition("bar", UUID.randomUUID(), 0);
        UUID randomUUID2 = UUID.randomUUID();
        ObjectMetadata objectMetadata2 = new ObjectMetadata(topicIdPartition2, randomUUID2, 0, 0L, false, false, false, OpaqueData.ZEROED, (SegmentAndMetadataLayout) null);
        int i57 = 10 / 2;
        TierObjectStoreResponse tierObjectStoreResponse = (TierObjectStoreResponse) tierObjectStore.getObjectStoreFragmentAsync(objectMetadata2, FragmentType.SEGMENT, 0L, Long.valueOf(10)).get();
        CompletableFuture completableFuture = new CompletableFuture();
        ((TierObjectStore) Mockito.doAnswer(invocationOnMock -> {
            return completableFuture;
        }).when(tierObjectStore)).getObjectStoreFragmentAsync((ObjectStoreMetadata) ArgumentMatchers.eq(objectMetadata2), (FragmentType) ArgumentMatchers.eq(FragmentType.SEGMENT), Long.valueOf(ArgumentMatchers.eq(0L)), Long.valueOf(ArgumentMatchers.eq(10)));
        CompletableFuture completableFuture2 = prefetchCache.get(objectMetadata2, 1, i57, i);
        PrefetchCache.CacheKey cacheKey9 = new PrefetchCache.CacheKey(randomUUID2, 0L);
        PrefetchCache.CacheKey cacheKey10 = new PrefetchCache.CacheKey(randomUUID2, 10 * 4);
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(prefetchCache.cacheMap.containsKey(cacheKey10));
        }, () -> {
            return "last fetch key not found";
        }, 5000L, 1000L);
        PrefetchCache.CacheEntry cacheEntry15 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(cacheKey9);
        Assertions.assertEquals(2, cacheEntry15.refCnt.get());
        int i58 = 10;
        while (true) {
            int i59 = i58;
            if (i59 > 4 * 10) {
                break;
            }
            PrefetchCache.CacheKey cacheKey11 = new PrefetchCache.CacheKey(randomUUID2, i59);
            Assertions.assertTrue(prefetchCache.cacheMap.containsKey(cacheKey11));
            ByteBuf byteBuf7 = (ByteBuf) ((PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(cacheKey11)).future.get();
            for (int i60 = 0; i60 < 10; i60++) {
                Assertions.assertEquals(randomBytes[i59 + i60], byteBuf7.getByte(i60));
            }
            i58 = i59 + 10;
        }
        Assertions.assertFalse(completableFuture2.isDone());
        completableFuture.complete(tierObjectStoreResponse);
        InputStream inputStream7 = (InputStream) completableFuture2.get();
        byte[] bArr7 = new byte[10];
        int i61 = i57 - 1;
        Assertions.assertEquals(i61, inputStream7.available());
        Assertions.assertEquals(i61, inputStream7.read(bArr7));
        int i62 = 0;
        int i63 = 1;
        while (i62 < i61) {
            Assertions.assertEquals(randomBytes[i63], bArr7[i62]);
            i62++;
            i63++;
        }
        Assertions.assertTrue(prefetchCache.cacheMap.containsKey(cacheKey9));
        ByteBuf byteBuf8 = (ByteBuf) ((PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(cacheKey9)).future.get();
        Assertions.assertEquals(2, cacheEntry15.refCnt.get());
        Assertions.assertEquals(2, byteBuf8.refCnt());
        for (int i64 = 1; i64 < i57; i64++) {
            Assertions.assertEquals(randomBytes[i64], byteBuf8.getByte(i64));
        }
        inputStream7.close();
        Assertions.assertEquals(1, cacheEntry15.refCnt.get());
        Assertions.assertEquals(1, byteBuf8.refCnt());
        int i65 = 10 / 2;
        PrefetchCache.CacheKey cacheKey12 = new PrefetchCache.CacheKey(randomUUID2, 0L);
        PrefetchCache.CacheEntry cacheEntry16 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.remove(cacheKey12);
        Assertions.assertNotNull(cacheEntry16);
        cacheEntry16.release();
        Assertions.assertEquals(0, cacheEntry16.refCnt.get());
        Assertions.assertEquals(0, ((ByteBuf) cacheEntry16.future.join()).refCnt());
        CompletableFuture completableFuture3 = new CompletableFuture();
        ((TierObjectStore) Mockito.doAnswer(invocationOnMock2 -> {
            return completableFuture3;
        }).when(tierObjectStore)).getObjectStoreFragmentAsync((ObjectStoreMetadata) ArgumentMatchers.eq(objectMetadata2), (FragmentType) ArgumentMatchers.eq(FragmentType.SEGMENT), Long.valueOf(ArgumentMatchers.eq(0L)), Long.valueOf(ArgumentMatchers.eq(10)));
        CompletableFuture completableFuture4 = prefetchCache.get(objectMetadata2, 1, i65, i);
        PrefetchCache.CacheEntry cacheEntry17 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(cacheKey12);
        Assertions.assertEquals(2, cacheEntry17.refCnt.get());
        Assertions.assertFalse(completableFuture4.isDone());
        completableFuture3.completeExceptionally(new TierObjectStoreRetriableException("Simulated error"));
        Assertions.assertThrows(TierObjectStoreRetriableException.class, () -> {
            try {
                completableFuture4.get();
            } catch (Exception e) {
                throw e.getCause();
            }
        });
        Assertions.assertEquals(1, cacheEntry17.refCnt.get());
        TierObjectStoreResponse objectStoreFragment = tierObjectStore.getObjectStoreFragment(objectMetadata2, FragmentType.SEGMENT, 0L, Long.valueOf(10));
        ((TierObjectStore) Mockito.doAnswer(invocationOnMock3 -> {
            return CompletableFuture.completedFuture(objectStoreFragment);
        }).when(tierObjectStore)).getObjectStoreFragmentAsync((ObjectStoreMetadata) ArgumentMatchers.eq(objectMetadata2), (FragmentType) ArgumentMatchers.eq(FragmentType.SEGMENT), Long.valueOf(ArgumentMatchers.eq(0L)), Long.valueOf(ArgumentMatchers.eq(10)));
        InputStream inputStream8 = (InputStream) prefetchCache.get(objectMetadata2, 1, i65, i).get();
        byte[] bArr8 = new byte[10];
        int i66 = i65 - 1;
        Assertions.assertEquals(i66, inputStream8.available());
        Assertions.assertEquals(i66, inputStream8.read(bArr8));
        int i67 = 0;
        int i68 = 1;
        while (i67 < i66) {
            Assertions.assertEquals(randomBytes[i68], bArr8[i67]);
            i67++;
            i68++;
        }
        Assertions.assertTrue(prefetchCache.cacheMap.containsKey(cacheKey12));
        PrefetchCache.CacheEntry cacheEntry18 = (PrefetchCache.CacheEntry) prefetchCache.cacheMap.get(cacheKey12);
        Assertions.assertNotEquals(cacheEntry17, cacheEntry18);
        ByteBuf byteBuf9 = (ByteBuf) cacheEntry18.future.get();
        Assertions.assertEquals(2, cacheEntry18.refCnt.get());
        Assertions.assertEquals(2, byteBuf9.refCnt());
        for (int i69 = 1; i69 < i65; i69++) {
            Assertions.assertEquals(randomBytes[i69], byteBuf9.getByte(i69));
        }
        inputStream8.close();
        Assertions.assertEquals(1, cacheEntry18.refCnt.get());
        Assertions.assertEquals(1, byteBuf9.refCnt());
        tierFetcher.reconfigureObjectCache(objectCacheConfig, ObjectCacheConfig.DEFAULT);
        Assertions.assertFalse(tierFetcher.prefetchCacheOpt.isPresent());
        Assertions.assertEquals(0L, metrics.metric(tierFetcher.tierFetcherMetrics.objectCacheSizeMetricName).metricValue());
        Assertions.assertFalse(prefetchCache.tierFetcherMetrics.isPresent());
        Assertions.assertEquals(0L, prefetchCache.cacheMaxSizeBytes);
        Assertions.assertTrue(prefetchCache.cacheMap.isEmpty());
        tierFetcher.close();
        tierObjectStore.close();
    }

    @Test
    public void testPrefetchCacheInputStream() throws IOException {
        PooledByteBufAllocator pooledByteBufAllocator = new PooledByteBufAllocator(true);
        ArrayList arrayList = new ArrayList();
        byte[] randomBytes = org.apache.kafka.test.TestUtils.randomBytes(10);
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(randomBytes);
        ByteBuf buffer = pooledByteBufAllocator.buffer(10);
        Assertions.assertEquals(randomBytes.length, buffer.writeBytes(byteArrayInputStream, randomBytes.length));
        PrefetchCache.CacheEntry cacheEntry = new PrefetchCache.CacheEntry(CompletableFuture.completedFuture(buffer));
        byteArrayInputStream.close();
        Assertions.assertTrue(buffer.isDirect());
        arrayList.add(cacheEntry);
        ByteArrayInputStream byteArrayInputStream2 = new ByteArrayInputStream(randomBytes);
        ByteBuf buffer2 = pooledByteBufAllocator.buffer(10);
        Assertions.assertEquals(randomBytes.length, buffer2.writeBytes(byteArrayInputStream2, randomBytes.length));
        PrefetchCache.CacheEntry cacheEntry2 = new PrefetchCache.CacheEntry(CompletableFuture.completedFuture(buffer2));
        byteArrayInputStream2.close();
        Assertions.assertTrue(buffer2.isDirect());
        arrayList.add(cacheEntry2);
        byte[] randomBytes2 = org.apache.kafka.test.TestUtils.randomBytes(10 / 2);
        ByteArrayInputStream byteArrayInputStream3 = new ByteArrayInputStream(randomBytes2);
        ByteBuf buffer3 = pooledByteBufAllocator.buffer(10);
        Assertions.assertEquals(randomBytes2.length, buffer3.writeBytes(byteArrayInputStream3, randomBytes2.length));
        byteArrayInputStream3.close();
        PrefetchCache.CacheEntry cacheEntry3 = new PrefetchCache.CacheEntry(CompletableFuture.completedFuture(buffer3));
        Assertions.assertTrue(buffer3.isDirect());
        arrayList.add(cacheEntry3);
        int i = 10 / 2;
        arrayList.forEach((v0) -> {
            v0.retain();
        });
        PrefetchCache.CacheEntryInputStream cacheEntryInputStream = new PrefetchCache.CacheEntryInputStream(arrayList, 0, i);
        Throwable th = null;
        try {
            try {
                int i2 = i - 0;
                Assertions.assertEquals(i2, cacheEntryInputStream.available());
                byte[] bArr = new byte[i2 + 2];
                Assertions.assertEquals(i2, cacheEntryInputStream.read(bArr));
                int i3 = 0;
                int i4 = 0;
                while (i3 < i2) {
                    Assertions.assertEquals(randomBytes[i4], bArr[i3]);
                    i3++;
                    i4++;
                }
                Assertions.assertEquals(2, cacheEntry.refCnt.get());
                Assertions.assertEquals(2, ((ByteBuf) cacheEntry.future.join()).refCnt());
                if (cacheEntryInputStream != null) {
                    if (0 != 0) {
                        try {
                            cacheEntryInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        cacheEntryInputStream.close();
                    }
                }
                Assertions.assertEquals(1, cacheEntry.refCnt.get());
                Assertions.assertEquals(1, ((ByteBuf) cacheEntry.future.join()).refCnt());
                int i5 = 10 / 2;
                arrayList.forEach((v0) -> {
                    v0.retain();
                });
                PrefetchCache.CacheEntryInputStream cacheEntryInputStream2 = new PrefetchCache.CacheEntryInputStream(arrayList, 1, i5);
                Throwable th3 = null;
                try {
                    int i6 = i5 - 1;
                    Assertions.assertEquals(i6, cacheEntryInputStream2.available());
                    byte[] bArr2 = new byte[i6 + 2];
                    Assertions.assertEquals(i6, cacheEntryInputStream2.read(bArr2));
                    int i7 = 0;
                    int i8 = 1;
                    while (i7 < i6) {
                        Assertions.assertEquals(randomBytes[i8], bArr2[i7]);
                        i7++;
                        i8++;
                    }
                    Assertions.assertEquals(2, cacheEntry.refCnt.get());
                    Assertions.assertEquals(2, ((ByteBuf) cacheEntry.future.join()).refCnt());
                    if (cacheEntryInputStream2 != null) {
                        if (0 != 0) {
                            try {
                                cacheEntryInputStream2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            cacheEntryInputStream2.close();
                        }
                    }
                    Assertions.assertEquals(1, cacheEntry.refCnt.get());
                    Assertions.assertEquals(1, ((ByteBuf) cacheEntry.future.join()).refCnt());
                    int i9 = (10 / 2) + 10;
                    arrayList.forEach((v0) -> {
                        v0.retain();
                    });
                    cacheEntryInputStream = new PrefetchCache.CacheEntryInputStream(arrayList, 1, i9);
                    Throwable th5 = null;
                    try {
                        try {
                            int i10 = i9 - 1;
                            Assertions.assertEquals(i10, cacheEntryInputStream.available());
                            byte[] bArr3 = new byte[i10 + 2];
                            Assertions.assertEquals(i10, cacheEntryInputStream.read(bArr3));
                            int i11 = 0;
                            int i12 = 1;
                            while (i11 < i10) {
                                Assertions.assertEquals(randomBytes[i12 % 10], bArr3[i11]);
                                i11++;
                                i12++;
                            }
                            Assertions.assertEquals(2, cacheEntry.refCnt.get());
                            Assertions.assertEquals(2, ((ByteBuf) cacheEntry.future.join()).refCnt());
                            Assertions.assertEquals(2, cacheEntry2.refCnt.get());
                            Assertions.assertEquals(2, ((ByteBuf) cacheEntry2.future.join()).refCnt());
                            if (cacheEntryInputStream != null) {
                                if (0 != 0) {
                                    try {
                                        cacheEntryInputStream.close();
                                    } catch (Throwable th6) {
                                        th5.addSuppressed(th6);
                                    }
                                } else {
                                    cacheEntryInputStream.close();
                                }
                            }
                            Assertions.assertEquals(1, cacheEntry.refCnt.get());
                            Assertions.assertEquals(1, ((ByteBuf) cacheEntry.future.join()).refCnt());
                            Assertions.assertEquals(1, cacheEntry2.refCnt.get());
                            Assertions.assertEquals(1, ((ByteBuf) cacheEntry2.future.join()).refCnt());
                            int i13 = (10 * 3) + 5;
                            arrayList.forEach((v0) -> {
                                v0.retain();
                            });
                            cacheEntryInputStream = new PrefetchCache.CacheEntryInputStream(arrayList, 2, i13);
                            Throwable th7 = null;
                            try {
                                try {
                                    byte[] bArr4 = new byte[i13 - 2];
                                    int length = ((2 * 10) - 2) + randomBytes2.length;
                                    Assertions.assertEquals(length, cacheEntryInputStream.available());
                                    Assertions.assertEquals(length, cacheEntryInputStream.read(bArr4));
                                    int i14 = 0;
                                    for (int i15 = 2; i15 < 10 * 2; i15++) {
                                        Assertions.assertEquals(randomBytes[i15 % 10], bArr4[i14]);
                                        i14++;
                                    }
                                    for (byte b : randomBytes2) {
                                        Assertions.assertEquals(b, bArr4[i14]);
                                        i14++;
                                    }
                                    Assertions.assertEquals(-1, cacheEntryInputStream.read());
                                    arrayList.forEach(cacheEntry4 -> {
                                        Assertions.assertEquals(2, cacheEntry4.refCnt.get());
                                        Assertions.assertEquals(2, ((ByteBuf) cacheEntry4.future.join()).refCnt());
                                    });
                                    if (cacheEntryInputStream != null) {
                                        if (0 != 0) {
                                            try {
                                                cacheEntryInputStream.close();
                                            } catch (Throwable th8) {
                                                th7.addSuppressed(th8);
                                            }
                                        } else {
                                            cacheEntryInputStream.close();
                                        }
                                    }
                                    arrayList.forEach(cacheEntry5 -> {
                                        Assertions.assertEquals(1, cacheEntry5.refCnt.get());
                                        Assertions.assertEquals(1, ((ByteBuf) cacheEntry5.future.join()).refCnt());
                                    });
                                    arrayList.forEach((v0) -> {
                                        v0.retain();
                                    });
                                    PrefetchCache.CacheEntryInputStream cacheEntryInputStream3 = new PrefetchCache.CacheEntryInputStream(arrayList, 3, (10 * 2) + r0);
                                    Throwable th9 = null;
                                    try {
                                        try {
                                            byte[] bArr5 = new byte[10 * 3];
                                            int length2 = ((2 * 10) - 3) + randomBytes2.length;
                                            Assertions.assertEquals(length2, cacheEntryInputStream3.available());
                                            Assertions.assertEquals(length2, cacheEntryInputStream3.read(bArr5));
                                            int i16 = 0;
                                            for (int i17 = 3; i17 < 10 * 2; i17++) {
                                                Assertions.assertEquals(randomBytes[i17 % 10], bArr5[i16]);
                                                i16++;
                                            }
                                            for (byte b2 : randomBytes2) {
                                                Assertions.assertEquals(b2, bArr5[i16]);
                                                i16++;
                                            }
                                            Assertions.assertEquals(-1, cacheEntryInputStream3.read());
                                            arrayList.forEach(cacheEntry6 -> {
                                                Assertions.assertEquals(2, cacheEntry6.refCnt.get());
                                                Assertions.assertEquals(2, ((ByteBuf) cacheEntry6.future.join()).refCnt());
                                            });
                                            if (cacheEntryInputStream3 != null) {
                                                if (0 != 0) {
                                                    try {
                                                        cacheEntryInputStream3.close();
                                                    } catch (Throwable th10) {
                                                        th9.addSuppressed(th10);
                                                    }
                                                } else {
                                                    cacheEntryInputStream3.close();
                                                }
                                            }
                                            arrayList.forEach(cacheEntry7 -> {
                                                Assertions.assertEquals(1, cacheEntry7.refCnt.get());
                                                Assertions.assertEquals(1, ((ByteBuf) cacheEntry7.future.join()).refCnt());
                                            });
                                            arrayList.forEach((v0) -> {
                                                v0.release();
                                            });
                                            arrayList.forEach(cacheEntry8 -> {
                                                Assertions.assertEquals(0, ((ByteBuf) cacheEntry8.future.join()).refCnt());
                                            });
                                            arrayList.forEach(cacheEntry9 -> {
                                                Assertions.assertEquals(0, cacheEntry9.refCnt.get());
                                            });
                                        } finally {
                                        }
                                    } finally {
                                        if (cacheEntryInputStream3 != null) {
                                            if (th9 != null) {
                                                try {
                                                    cacheEntryInputStream3.close();
                                                } catch (Throwable th11) {
                                                    th9.addSuppressed(th11);
                                                }
                                            } else {
                                                cacheEntryInputStream3.close();
                                            }
                                        }
                                    }
                                } finally {
                                }
                            } finally {
                                if (cacheEntryInputStream != null) {
                                    if (th7 != null) {
                                        try {
                                            cacheEntryInputStream.close();
                                        } catch (Throwable th12) {
                                            th7.addSuppressed(th12);
                                        }
                                    } else {
                                        cacheEntryInputStream.close();
                                    }
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                } catch (Throwable th13) {
                    if (cacheEntryInputStream2 != null) {
                        if (0 != 0) {
                            try {
                                cacheEntryInputStream2.close();
                            } catch (Throwable th14) {
                                th3.addSuppressed(th14);
                            }
                        } else {
                            cacheEntryInputStream2.close();
                        }
                    }
                    throw th13;
                }
            } finally {
            }
        } finally {
        }
    }
}
