package kafka.tier.fetcher;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.stream.IntStream;
import kafka.log.AbortedTxn;
import kafka.log.Log;
import kafka.log.LogConfig;
import kafka.log.LogSegment;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.fetcher.offsetcache.FetchOffsetCache;
import kafka.tier.store.MockInMemoryTierObjectStore;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreConfig;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.collection.JavaConverters;
import scala.collection.immutable.HashSet;
import scala.compat.java8.OptionConverters;

/* loaded from: input_file:kafka/tier/fetcher/SegmentFileFetchRequestTest.class */
public class SegmentFileFetchRequestTest {
    private MockTime mockTime = new MockTime();
    private Executor currentThreadExecutor = (v0) -> {
        v0.run();
    };
    private long baseTimestamp = 1500000000000L;

    @Test
    public void targetOffsetTest() {
        CancellationContext newContext = CancellationContext.newContext();
        MockInMemoryTierObjectStore mockInMemoryTierObjectStore = new MockInMemoryTierObjectStore(TierObjectStoreConfig.createEmpty());
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TopicPartition topicPartition = topicIdPartition.topicPartition();
        LogSegment createSegment = createSegment(0L, 3, 50);
        try {
            try {
                TierObjectStore.ObjectMetadata objectMetadata = new TierObjectStore.ObjectMetadata(segmentMetadata(topicIdPartition, createSegment, false));
                putSegment(mockInMemoryTierObjectStore, createSegment, objectMetadata, Optional.empty());
                Runnable pendingFetch = new PendingFetch(newContext, mockInMemoryTierObjectStore, new FetchOffsetCache(Time.SYSTEM, 10, 1000), Optional.empty(), objectMetadata, delayedOperationKey -> {
                }, 149L, 1024, createSegment.size(), IsolationLevel.READ_UNCOMMITTED, new MemoryTracker(this.mockTime, 0L), Collections.emptyList(), this.mockTime);
                this.currentThreadExecutor.execute(pendingFetch);
                TierFetchResult tierFetchResult = (TierFetchResult) pendingFetch.finish().get(topicPartition);
                Assertions.assertTrue(tierFetchResult.records.batches().iterator().hasNext(), "Records should be complete");
                Assertions.assertNotEquals(tierFetchResult.records, MemoryRecords.EMPTY, "Should return records");
                RecordBatch recordBatch = (RecordBatch) tierFetchResult.records.batches().iterator().next();
                Assertions.assertTrue(recordBatch.baseOffset() <= 149 && recordBatch.lastOffset() >= 149, "Results should include target offset in the first record batch");
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            } catch (IOException e) {
                e.printStackTrace();
                Assertions.fail("Unexpected exception");
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            }
        } catch (Throwable th) {
            newContext.close();
            createSegment.close();
            mockInMemoryTierObjectStore.close();
            throw th;
        }
    }

    @Test
    public void targetOffsetOutOfRangeTest() {
        CancellationContext newContext = CancellationContext.newContext();
        MockInMemoryTierObjectStore mockInMemoryTierObjectStore = new MockInMemoryTierObjectStore(TierObjectStoreConfig.createEmpty());
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TopicPartition topicPartition = topicIdPartition.topicPartition();
        LogSegment createSegment = createSegment(0L, 3, 50);
        try {
            try {
                TierObjectStore.ObjectMetadata objectMetadata = new TierObjectStore.ObjectMetadata(segmentMetadata(topicIdPartition, createSegment, false));
                putSegment(mockInMemoryTierObjectStore, createSegment, objectMetadata, Optional.empty());
                Long l = 150L;
                Runnable pendingFetch = new PendingFetch(newContext, mockInMemoryTierObjectStore, new FetchOffsetCache(Time.SYSTEM, 10, 1000), Optional.empty(), objectMetadata, delayedOperationKey -> {
                }, l.longValue(), 1024, createSegment.size(), IsolationLevel.READ_UNCOMMITTED, new MemoryTracker(this.mockTime, 0L), Collections.emptyList(), this.mockTime);
                this.currentThreadExecutor.execute(pendingFetch);
                TierFetchResult tierFetchResult = (TierFetchResult) pendingFetch.finish().get(topicPartition);
                Assertions.assertFalse(tierFetchResult.records.batches().iterator().hasNext(), "Records should be incomplete");
                Assertions.assertEquals(tierFetchResult.records, ReclaimableMemoryRecords.EMPTY, "Should return empty records");
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            } catch (IOException e) {
                e.printStackTrace();
                Assertions.fail("Unexpected exception");
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            }
        } catch (Throwable th) {
            newContext.close();
            createSegment.close();
            mockInMemoryTierObjectStore.close();
            throw th;
        }
    }

    private TierObjectMetadata segmentMetadata(TopicIdPartition topicIdPartition, LogSegment logSegment, boolean z) {
        return new TierObjectMetadata(topicIdPartition, 0, UUID.randomUUID(), logSegment.baseOffset(), logSegment.readNextOffset() - 1, logSegment.largestTimestamp(), logSegment.size(), TierObjectMetadata.State.SEGMENT_UPLOAD_INITIATE, false, z, true);
    }

    private MemoryRecords createRecords(long j, int i) {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(2048), (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, j, 0L);
        IntStream.range(0, i).forEach(i2 -> {
            builder.appendWithOffset(j + i2, this.baseTimestamp + j, "a".getBytes(), "v".getBytes());
        });
        return builder.build();
    }

    @Test
    public void testSerializingAbortedTransactions() {
        Assertions.assertEquals(Arrays.asList(new AbortedTxn(0L, 0L, 5L, 0L), new AbortedTxn(0L, 10L, 20L, 0L), new AbortedTxn(0L, 50L, 100L, 0L)), TierAbortedTxnReader.readInto(CancellationContext.newContext(), new ByteBufferInputStream(serializeAbortedTxns(new AbortedTxn(0L, 0L, 5L, 0L), new AbortedTxn(0L, 10L, 20L, 0L), new AbortedTxn(0L, 50L, 100L, 0L)).get()), 0L, 100L));
    }

    @Test
    public void testReadCommittedEmptyBatch() {
        CancellationContext newContext = CancellationContext.newContext();
        MockInMemoryTierObjectStore mockInMemoryTierObjectStore = new MockInMemoryTierObjectStore(TierObjectStoreConfig.createEmpty());
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TopicPartition topicPartition = topicIdPartition.topicPartition();
        LogSegment createSegment = createSegment(0L, 0, 0);
        try {
            try {
                Optional<ByteBuffer> serializeAbortedTxns = serializeAbortedTxns(new AbortedTxn(0L, 0L, 5L, 0L), new AbortedTxn(0L, 10L, 20L, 0L), new AbortedTxn(0L, 50L, 100L, 0L), new AbortedTxn(0L, 101L, 150L, 0L));
                TierObjectStore.ObjectMetadata objectMetadata = new TierObjectStore.ObjectMetadata(segmentMetadata(topicIdPartition, createSegment, serializeAbortedTxns.isPresent()));
                putSegment(mockInMemoryTierObjectStore, createSegment, objectMetadata, serializeAbortedTxns);
                Runnable pendingFetch = new PendingFetch(newContext, mockInMemoryTierObjectStore, new FetchOffsetCache(Time.SYSTEM, 10, 1000), Optional.empty(), objectMetadata, delayedOperationKey -> {
                }, 0L, 1024, createSegment.size(), IsolationLevel.READ_COMMITTED, new MemoryTracker(this.mockTime, 0L), Collections.emptyList(), this.mockTime);
                this.currentThreadExecutor.execute(pendingFetch);
                TierFetchResult tierFetchResult = (TierFetchResult) pendingFetch.finish().get(topicPartition);
                Assertions.assertFalse(tierFetchResult.records.records().iterator().hasNext(), "expected to find 0 records");
                Assertions.assertEquals(Collections.emptyList(), tierFetchResult.abortedTxns, "expected to find 0 aborted transactions overlapping the fetched range");
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            } catch (IOException e) {
                e.printStackTrace();
                Assertions.fail("Unexpected exception");
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            }
        } catch (Throwable th) {
            newContext.close();
            createSegment.close();
            mockInMemoryTierObjectStore.close();
            throw th;
        }
    }

    @Test
    public void testFetchingReadCommitted() {
        CancellationContext newContext = CancellationContext.newContext();
        MockInMemoryTierObjectStore mockInMemoryTierObjectStore = new MockInMemoryTierObjectStore(TierObjectStoreConfig.createEmpty());
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TopicPartition topicPartition = topicIdPartition.topicPartition();
        LogSegment createSegment = createSegment(0L, 3, 50);
        try {
            try {
                Optional<ByteBuffer> serializeAbortedTxns = serializeAbortedTxns(new AbortedTxn(0L, 0L, 5L, 0L), new AbortedTxn(0L, 10L, 20L, 0L), new AbortedTxn(0L, 50L, 100L, 0L), new AbortedTxn(0L, 101L, 150L, 0L));
                TierObjectStore.ObjectMetadata objectMetadata = new TierObjectStore.ObjectMetadata(segmentMetadata(topicIdPartition, createSegment, serializeAbortedTxns.isPresent()));
                putSegment(mockInMemoryTierObjectStore, createSegment, objectMetadata, serializeAbortedTxns);
                Runnable pendingFetch = new PendingFetch(newContext, mockInMemoryTierObjectStore, new FetchOffsetCache(Time.SYSTEM, 10, 1000), Optional.empty(), objectMetadata, delayedOperationKey -> {
                }, 0L, 1024, createSegment.size(), IsolationLevel.READ_COMMITTED, new MemoryTracker(this.mockTime, 0L), Collections.emptyList(), this.mockTime);
                this.currentThreadExecutor.execute(pendingFetch);
                TierFetchResult tierFetchResult = (TierFetchResult) pendingFetch.finish().get(topicPartition);
                int i = 0;
                Iterator it = tierFetchResult.records.records().iterator();
                while (it.hasNext()) {
                    Assertions.assertTrue(((Record) it.next()).isValid(), "Expected all records to be valid");
                    i++;
                }
                Assertions.assertEquals(100, i, "expected to find 100 records");
                Assertions.assertEquals(Arrays.asList(new AbortedTxn(0L, 0L, 5L, 0L), new AbortedTxn(0L, 10L, 20L, 0L), new AbortedTxn(0L, 50L, 100L, 0L)), tierFetchResult.abortedTxns, "expected to find the 3 aborted transactions overlapping the fetch range");
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            } catch (IOException e) {
                e.printStackTrace();
                Assertions.fail("Unexpected exception");
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            }
        } catch (Throwable th) {
            newContext.close();
            createSegment.close();
            mockInMemoryTierObjectStore.close();
            throw th;
        }
    }

    @Test
    public void testFetchingReadCommittedException() {
        CancellationContext newContext = CancellationContext.newContext();
        MockInMemoryTierObjectStore mockInMemoryTierObjectStore = new MockInMemoryTierObjectStore(TierObjectStoreConfig.createEmpty());
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TopicPartition topicPartition = topicIdPartition.topicPartition();
        LogSegment createSegment = createSegment(0L, 3, 50);
        try {
            try {
                Optional<ByteBuffer> serializeAbortedTxns = serializeAbortedTxns(new AbortedTxn(0L, 0L, 5L, 0L), new AbortedTxn(0L, 10L, 20L, 0L), new AbortedTxn(0L, 50L, 100L, 0L), new AbortedTxn(0L, 101L, 150L, 0L));
                TierObjectStore.ObjectMetadata objectMetadata = new TierObjectStore.ObjectMetadata(segmentMetadata(topicIdPartition, createSegment, serializeAbortedTxns.isPresent()));
                putSegment(mockInMemoryTierObjectStore, createSegment, objectMetadata, serializeAbortedTxns);
                mockInMemoryTierObjectStore.throwExceptionOnTransactionFetch = true;
                Runnable pendingFetch = new PendingFetch(newContext, mockInMemoryTierObjectStore, new FetchOffsetCache(Time.SYSTEM, 10, 1000), Optional.empty(), objectMetadata, delayedOperationKey -> {
                }, 0L, 1024, createSegment.size(), IsolationLevel.READ_COMMITTED, new MemoryTracker(this.mockTime, 0L), Collections.emptyList(), this.mockTime);
                this.currentThreadExecutor.execute(pendingFetch);
                TierFetchResult tierFetchResult = (TierFetchResult) pendingFetch.finish().get(topicPartition);
                Assertions.assertFalse(tierFetchResult.records.records().iterator().hasNext(), "Expected to find 0 records, because an exception was thrown");
                Assertions.assertTrue(tierFetchResult.exception instanceof IOException);
                Assertions.assertEquals(Collections.emptyList(), tierFetchResult.abortedTxns, "Expected to find 0 aborted transaction because an exception was thrown");
                newContext = CancellationContext.newContext();
                mockInMemoryTierObjectStore.throwExceptionOnTransactionFetch = false;
                mockInMemoryTierObjectStore.throwExceptionOnSegmentFetch = true;
                Runnable pendingFetch2 = new PendingFetch(newContext, mockInMemoryTierObjectStore, new FetchOffsetCache(Time.SYSTEM, 10, 1000), Optional.empty(), objectMetadata, delayedOperationKey2 -> {
                }, 0L, 1024, createSegment.size(), IsolationLevel.READ_COMMITTED, new MemoryTracker(this.mockTime, 0L), Collections.emptyList(), this.mockTime);
                this.currentThreadExecutor.execute(pendingFetch2);
                TierFetchResult tierFetchResult2 = (TierFetchResult) pendingFetch2.finish().get(topicPartition);
                Assertions.assertFalse(tierFetchResult2.records.records().iterator().hasNext(), "Expected to find 0 records, because an exception was thrown");
                Assertions.assertTrue(tierFetchResult2.exception instanceof IOException);
                Assertions.assertEquals(Collections.emptyList(), tierFetchResult2.abortedTxns, "Expected to find 0 aborted transaction because an exception was thrown");
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            } catch (IOException e) {
                e.printStackTrace();
                Assertions.fail("Unexpected exception");
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            }
        } catch (Throwable th) {
            newContext.close();
            createSegment.close();
            mockInMemoryTierObjectStore.close();
            throw th;
        }
    }

    Optional<ByteBuffer> serializeAbortedTxns(AbortedTxn... abortedTxnArr) {
        return OptionConverters.toJava(Log.serializeAbortedTransactions(JavaConverters.asScalaBuffer(Arrays.asList(abortedTxnArr))));
    }

    private LogSegment createSegment(long j, int i, int i2) {
        File tempDirectory = TestUtils.tempDirectory();
        tempDirectory.deleteOnExit();
        Properties properties = new Properties();
        properties.put(LogConfig.IndexIntervalBytesProp(), 1);
        LogSegment open = LogSegment.open(tempDirectory, j, LogConfig.apply(properties, new HashSet()), this.mockTime, false, 4096, false, "");
        IntStream.range(0, i).forEach(i3 -> {
            long readNextOffset = open.readNextOffset();
            MemoryRecords createRecords = createRecords(readNextOffset, i2);
            long j2 = (readNextOffset + i2) - 1;
            open.append(j2, this.baseTimestamp + j2, i2 - 1, createRecords);
            open.flush();
        });
        open.offsetIndex().flush();
        open.offsetIndex().trimToValidSize();
        return open;
    }

    private void putSegment(TierObjectStore tierObjectStore, LogSegment logSegment, TierObjectStore.ObjectMetadata objectMetadata, Optional<ByteBuffer> optional) throws IOException {
        tierObjectStore.putSegment(objectMetadata, logSegment.log().file(), logSegment.offsetIndex().file(), logSegment.timeIndex().file(), Optional.empty(), optional, Optional.empty());
    }
}
