package kafka.tier.fetcher;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import kafka.log.LogConfig;
import kafka.log.LogSegment;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.store.MockInMemoryTierObjectStore;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import scala.collection.immutable.HashSet;

/* loaded from: input_file:kafka/tier/fetcher/SegmentFileFetchRequestTest.class */
public class SegmentFileFetchRequestTest {
    private MockTime mockTime = new MockTime();
    private Executor currentThreadExecutor = (v0) -> {
        v0.run();
    };
    private long baseTimestamp = 1500000000000L;

    @Test
    public void targetOffsetTest() {
        CancellationContext newContext = CancellationContext.newContext();
        MockInMemoryTierObjectStore mockInMemoryTierObjectStore = new MockInMemoryTierObjectStore(new TierObjectStoreConfig());
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        LogSegment createSegment = createSegment(0L, 3, 50);
        try {
            try {
                TierObjectMetadata segmentMetadata = segmentMetadata(topicPartition, createSegment);
                Metrics metrics = new Metrics();
                putSegment(mockInMemoryTierObjectStore, createSegment, segmentMetadata);
                Runnable pendingFetch = new PendingFetch(newContext, mockInMemoryTierObjectStore, metrics.sensor("recordBytesFetched"), segmentMetadata, delayedOperationKey -> {
                }, 149L, 1024, Long.MAX_VALUE, Collections.emptyList());
                this.currentThreadExecutor.execute(pendingFetch);
                TierFetchResult tierFetchResult = (TierFetchResult) pendingFetch.finish().get(topicPartition);
                Assert.assertTrue("Records should be complete", tierFetchResult.records.batches().iterator().hasNext());
                Assert.assertNotEquals("Should return records", tierFetchResult.records, MemoryRecords.EMPTY);
                RecordBatch recordBatch = (RecordBatch) tierFetchResult.records.batches().iterator().next();
                Assert.assertTrue("Results should include target offset in the first record batch", recordBatch.baseOffset() <= 149 && recordBatch.lastOffset() >= 149);
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            } catch (IOException e) {
                e.printStackTrace();
                Assert.fail("Unexpected exception");
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            }
        } catch (Throwable th) {
            newContext.close();
            createSegment.close();
            mockInMemoryTierObjectStore.close();
            throw th;
        }
    }

    @Test
    public void targetOffsetOutOfRangeTest() {
        CancellationContext newContext = CancellationContext.newContext();
        MockInMemoryTierObjectStore mockInMemoryTierObjectStore = new MockInMemoryTierObjectStore(new TierObjectStoreConfig());
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        LogSegment createSegment = createSegment(0L, 3, 50);
        try {
            try {
                TierObjectMetadata segmentMetadata = segmentMetadata(topicPartition, createSegment);
                putSegment(mockInMemoryTierObjectStore, createSegment, segmentMetadata);
                Long l = 150L;
                Runnable pendingFetch = new PendingFetch(newContext, mockInMemoryTierObjectStore, new Metrics().sensor("recordBytesFetched"), segmentMetadata, delayedOperationKey -> {
                }, l.longValue(), 1024, Long.MAX_VALUE, Collections.emptyList());
                this.currentThreadExecutor.execute(pendingFetch);
                TierFetchResult tierFetchResult = (TierFetchResult) pendingFetch.finish().get(topicPartition);
                Assert.assertFalse("Records should be incomplete", tierFetchResult.records.batches().iterator().hasNext());
                Assert.assertEquals("Should return empty records", tierFetchResult.records, MemoryRecords.EMPTY);
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            } catch (IOException e) {
                e.printStackTrace();
                Assert.fail("Unexpected exception");
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            }
        } catch (Throwable th) {
            newContext.close();
            createSegment.close();
            mockInMemoryTierObjectStore.close();
            throw th;
        }
    }

    @Test
    public void targetOffsetAndMaxOffsetTest() {
        CancellationContext newContext = CancellationContext.newContext();
        MockInMemoryTierObjectStore mockInMemoryTierObjectStore = new MockInMemoryTierObjectStore(new TierObjectStoreConfig());
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        LogSegment createSegment = createSegment(0L, 3, 50);
        try {
            try {
                TierObjectMetadata segmentMetadata = segmentMetadata(topicPartition, createSegment);
                putSegment(mockInMemoryTierObjectStore, createSegment, segmentMetadata);
                Long l = 51L;
                Runnable pendingFetch = new PendingFetch(newContext, mockInMemoryTierObjectStore, new Metrics().sensor("recordBytesFetched"), segmentMetadata, delayedOperationKey -> {
                }, l.longValue(), 1024, 100L, Collections.emptyList());
                this.currentThreadExecutor.execute(pendingFetch);
                TierFetchResult tierFetchResult = (TierFetchResult) pendingFetch.finish().get(topicPartition);
                Assert.assertTrue("Records should be complete", tierFetchResult.records.batches().iterator().hasNext());
                Assert.assertNotEquals("Should return records", tierFetchResult.records, MemoryRecords.EMPTY);
                Assert.assertFalse("Results should not include records at or beyond max offset", StreamSupport.stream(tierFetchResult.records.records().spliterator(), false).anyMatch(record -> {
                    return record.offset() >= 100;
                }));
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            } catch (IOException e) {
                e.printStackTrace();
                Assert.fail("Unexpected exception");
                newContext.close();
                createSegment.close();
                mockInMemoryTierObjectStore.close();
            }
        } catch (Throwable th) {
            newContext.close();
            createSegment.close();
            mockInMemoryTierObjectStore.close();
            throw th;
        }
    }

    private TierObjectMetadata segmentMetadata(TopicPartition topicPartition, LogSegment logSegment) {
        return new TierObjectMetadata(topicPartition, 0, logSegment.baseOffset(), (int) ((logSegment.readNextOffset() - 1) - logSegment.baseOffset()), 1L, logSegment.largestTimestamp(), logSegment.size(), true, false, (byte) 1);
    }

    private MemoryRecords createRecords(long j, int i) {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(2048), (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, j);
        IntStream.range(0, i).forEach(i2 -> {
            builder.appendWithOffset(j + i2, this.baseTimestamp + j, "a".getBytes(), "v".getBytes());
        });
        return builder.build();
    }

    private LogSegment createSegment(long j, int i, int i2) {
        File tempDirectory = TestUtils.tempDirectory();
        tempDirectory.deleteOnExit();
        Properties properties = new Properties();
        properties.put(LogConfig.IndexIntervalBytesProp(), 1);
        LogSegment open = LogSegment.open(tempDirectory, j, LogConfig.apply(properties, new HashSet()), this.mockTime, false, 4096, false, "");
        IntStream.range(0, i).forEach(i3 -> {
            long readNextOffset = open.readNextOffset();
            MemoryRecords createRecords = createRecords(readNextOffset, i2);
            long j2 = (readNextOffset + i2) - 1;
            open.append(j2, this.baseTimestamp + j2, i2 - 1, createRecords);
            open.flush();
        });
        open.offsetIndex().flush();
        open.offsetIndex().trimToValidSize();
        return open;
    }

    private void putSegment(TierObjectStore tierObjectStore, LogSegment logSegment, TierObjectMetadata tierObjectMetadata) throws IOException {
        tierObjectStore.putSegment(tierObjectMetadata, logSegment.log().file(), logSegment.offsetIndex().file(), logSegment.timeIndex().file(), logSegment.timeIndex().file(), logSegment.timeIndex().file(), Optional.of(logSegment.timeIndex().file()));
    }
}
