package kafka.tier.fetcher;

import io.confluent.kafka.storage.checksum.ChecksumParams;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import kafka.tier.fetcher.TierSegmentReader;
import kafka.utils.TestUtils;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TimestampOffset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:kafka/tier/fetcher/TierSegmentReaderTest.class */
public class TierSegmentReaderTest {
    private final TierSegmentReader reader = new TierSegmentReader("");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kafka/tier/fetcher/TierSegmentReaderTest$BatchAndPosition.class */
    public static class BatchAndPosition {
        MemoryRecords records;
        int bytePosition;

        BatchAndPosition(MemoryRecords memoryRecords, int i) {
            this.records = memoryRecords;
            this.bytePosition = i;
        }
    }

    @Test
    public void homogenousRecordBatchTest() throws IOException {
        SimpleRecord[] simpleRecordArr = {new SimpleRecord(1L, "foo".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())};
        ByteBuffer buffer = MemoryRecords.withRecords((byte) 2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, simpleRecordArr).buffer();
        ByteBuffer buffer2 = MemoryRecords.withRecords((byte) 2, 3L, CompressionType.NONE, TimestampType.CREATE_TIME, simpleRecordArr).buffer();
        ByteBuffer allocate = ByteBuffer.allocate(buffer.limit() + buffer2.limit());
        allocate.put(buffer);
        allocate.put(buffer2);
        allocate.flip();
        testExpected(allocate, 0L, 0L, 5L);
        testExpected(allocate, 1L, 0L, 5L);
        testExpected(allocate, 2L, 0L, 5L);
        testExpected(allocate, 3L, 3L, 5L);
        testExpected(allocate, 4L, 3L, 5L);
        testExpected(allocate, 5L, 3L, 5L);
        testThrows(this.reader, allocate, 6L, EOFException.class);
        testThrows(this.reader, allocate, 7L, EOFException.class);
    }

    @Test
    public void testAbortOnEOF() {
        List<MemoryRecords> createBatches = createBatches();
        final int sum = createBatches.stream().mapToInt((v0) -> {
            return v0.sizeInBytes();
        }).sum();
        try {
            final InputStream stream = toStream(createBatches);
            Throwable th = null;
            try {
                try {
                    long j = 100;
                    Iterator it = new TierSegmentReader("").readRecords(CancellationContext.newContext(), Optional.empty(), new InputStream() { // from class: kafka.tier.fetcher.TierSegmentReaderTest.1
                        private final InputStream inner;
                        private int bytesRead = -1;

                        {
                            this.inner = stream;
                        }

                        @Override // java.io.InputStream
                        public int read() throws IOException {
                            if (this.bytesRead >= sum - 1) {
                                throw new IOException("hit eof!");
                            }
                            this.bytesRead++;
                            return this.inner.read();
                        }
                    }, 1048576, 100L, 0, sum * 2).records.batches().iterator();
                    while (it.hasNext()) {
                        Iterator it2 = ((RecordBatch) it.next()).iterator();
                        while (it2.hasNext()) {
                            Assertions.assertEquals(j, ((Record) it2.next()).offset(), "expected to find target offset 100 and all offsets after to be linearly increasing.");
                            j++;
                        }
                    }
                    if (stream != null) {
                        if (0 != 0) {
                            try {
                                stream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            stream.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (IOException e) {
            Assertions.fail("expected no exception to be thrown");
        }
    }

    @Test
    public void testReadRecordsMissingRecordsBetweenBatches() throws IOException {
        SimpleRecord[] simpleRecordArr = {new SimpleRecord(1L, "foo".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())};
        ByteBuffer buffer = MemoryRecords.withRecords((byte) 2, 3L, CompressionType.NONE, TimestampType.CREATE_TIME, simpleRecordArr).buffer();
        ByteBuffer buffer2 = MemoryRecords.withRecords((byte) 2, 6L, CompressionType.NONE, TimestampType.CREATE_TIME, simpleRecordArr).buffer();
        ByteBuffer buffer3 = MemoryRecords.withRecords((byte) 2, 12L, CompressionType.NONE, TimestampType.CREATE_TIME, simpleRecordArr).buffer();
        ByteBuffer allocate = ByteBuffer.allocate(buffer.limit() + buffer2.limit() + buffer3.limit());
        allocate.put(buffer);
        allocate.put(buffer2);
        allocate.put(buffer3);
        allocate.flip();
        testExpected(allocate, 0L, 3L, 14L);
        testExpected(allocate, 1L, 3L, 14L);
        testExpected(allocate, 2L, 3L, 14L);
        testExpected(allocate, 3L, 3L, 14L);
        testExpected(allocate, 4L, 3L, 14L);
        testExpected(allocate, 5L, 3L, 14L);
        testExpected(allocate, 6L, 6L, 14L);
        testExpected(allocate, 7L, 6L, 14L);
        testExpected(allocate, 8L, 6L, 14L);
        testExpected(allocate, 9L, 12L, 14L);
        testExpected(allocate, 10L, 12L, 14L);
        testExpected(allocate, 11L, 12L, 14L);
        testExpected(allocate, 12L, 12L, 14L);
        testExpected(allocate, 13L, 12L, 14L);
        testExpected(allocate, 14L, 12L, 14L);
        testThrows(this.reader, allocate, 15L, EOFException.class);
    }

    @Test
    public void testReadRecordsMissingRecordsWithinBatch() throws IOException {
        SimpleRecord[] simpleRecordArr = {new SimpleRecord(1L, "foo".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())};
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(simpleRecordArr.length), (byte) 2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, 1L, System.currentTimeMillis(), 0);
        builder.appendWithOffset(3L, simpleRecordArr[0]);
        builder.appendWithOffset(5L, simpleRecordArr[1]);
        ByteBuffer buffer = builder.build().buffer();
        ByteBuffer allocate = ByteBuffer.allocate(buffer.limit());
        allocate.put(buffer);
        allocate.flip();
        testExpected(allocate, 0L, 1L, 5L);
        testExpected(allocate, 1L, 1L, 5L);
        testExpected(allocate, 2L, 1L, 5L);
        testExpected(allocate, 3L, 1L, 5L);
        testExpected(allocate, 5L, 1L, 5L);
        testExpected(allocate, 4L, 1L, 5L);
        testThrows(this.reader, allocate, 6L, EOFException.class);
    }

    @Test
    public void testReadRecordsOneBatchAlignedBoundaries() throws IOException {
        List<MemoryRecords> createBatches = createBatches();
        int sizeInBytes = createBatches.get(0).sizeInBytes();
        testReadSegment(createBatches, sizeInBytes);
        testReadSegment(createBatches, sizeInBytes - 1);
        testReadSegment(createBatches, sizeInBytes * 2);
    }

    @Test
    public void testReadRecordsOneBatchUnalignedBoundaries() throws IOException {
        List<MemoryRecords> createBatches = createBatches();
        testReadSegment(createBatches, createBatches.get(0).sizeInBytes() + 1);
    }

    @Test
    public void testReadRecordsOneBatchMaxLessThanBatchSize() throws IOException {
        List<MemoryRecords> createBatches = createBatches();
        testReadSegment(createBatches, createBatches.get(0).sizeInBytes() - 2);
    }

    @Test
    public void testReadRecordsMultipleBatchesAligned() throws IOException {
        List<MemoryRecords> createBatches = createBatches();
        testReadSegment(createBatches, createBatches.get(0).sizeInBytes() * 3);
    }

    @Test
    public void testReadRecordsMultipleBatchesUnaligned1() throws IOException {
        List<MemoryRecords> createBatches = createBatches();
        testReadSegment(createBatches, (createBatches.get(0).sizeInBytes() * 3) + 2);
    }

    @Test
    public void testReadRecordsMultipleBatchesUnaligned2() throws IOException {
        List<MemoryRecords> createBatches = createBatches();
        testReadSegment(createBatches, (createBatches.get(0).sizeInBytes() * 3) - 2);
    }

    @Test
    public void offsetForTimestampTest() {
        ByteBuffer buffer = MemoryRecords.withRecords((byte) 2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, new SimpleRecord[]{new SimpleRecord(1L, "foo".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())}).buffer();
        ByteBuffer buffer2 = MemoryRecords.withRecords((byte) 2, 3L, CompressionType.NONE, TimestampType.CREATE_TIME, new SimpleRecord[]{new SimpleRecord(2L, "foo".getBytes(), "1".getBytes()), new SimpleRecord(5L, "b".getBytes(), "2".getBytes()), new SimpleRecord(6L, "c".getBytes(), "3".getBytes())}).buffer();
        ByteBuffer allocate = ByteBuffer.allocate(buffer.limit() + buffer2.limit());
        allocate.put(buffer);
        allocate.put(buffer2);
        assertCorrectOffsetForTimestamp(allocate, 1L, Optional.of(0L));
        assertCorrectOffsetForTimestamp(allocate, 2L, Optional.of(1L));
        assertCorrectOffsetForTimestamp(allocate, 3L, Optional.of(2L));
        assertCorrectOffsetForTimestamp(allocate, 5L, Optional.of(4L));
        assertCorrectOffsetForTimestamp(allocate, 6L, Optional.of(5L));
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void timestampIndexIteratorTest() {
        try {
            File createTempFile = File.createTempFile("kafka", ".tmp");
            try {
                try {
                    ArrayList<TimestampOffset> arrayList = new ArrayList();
                    TimeIndex timeIndex = new TimeIndex(createTempFile, 1000L, 800, false, true, new ChecksumParams(Optional.of(TestUtils.createChecksumStore()), true, true));
                    timeIndex.resize(800);
                    timeIndex.flush();
                    arrayList.add(new TimestampOffset(20000L, 14000L));
                    arrayList.add(new TimestampOffset(30000L, 18000L));
                    arrayList.add(new TimestampOffset(40000L, 20000L));
                    arrayList.add(new TimestampOffset(40001L, 20001L));
                    for (TimestampOffset timestampOffset : arrayList) {
                        timeIndex.maybeAppend(timestampOffset.timestamp, timestampOffset.offset, false);
                    }
                    timeIndex.flush();
                    timeIndex.close();
                    ArrayList arrayList2 = new ArrayList();
                    TierTimestampIndexIterator tierTimestampIndexIterator = new TierTimestampIndexIterator(new FileInputStream(createTempFile), 1000L);
                    while (tierTimestampIndexIterator.hasNext()) {
                        arrayList2.add(tierTimestampIndexIterator.next());
                    }
                    Assertions.assertEquals(arrayList, arrayList2);
                } catch (Throwable th) {
                    createTempFile.delete();
                    throw th;
                }
            } catch (IOException e) {
                Assertions.fail(e.getMessage());
            }
            createTempFile.delete();
        } catch (IOException e2) {
            Assertions.fail(e2.getMessage());
        }
    }

    private MemoryRecords createRecords(List<SimpleRecord> list, long j, int i) {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(list)), (byte) 2, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, j, System.currentTimeMillis(), i);
        Iterator<SimpleRecord> it = list.iterator();
        while (it.hasNext()) {
            builder.append(it.next());
        }
        return builder.build();
    }

    private InputStream toStream(List<MemoryRecords> list) {
        ByteBuffer allocate = ByteBuffer.allocate(list.stream().mapToInt((v0) -> {
            return v0.sizeInBytes();
        }).sum());
        Iterator<MemoryRecords> it = list.iterator();
        while (it.hasNext()) {
            allocate.put(it.next().buffer());
        }
        allocate.flip();
        return new ByteBufferInputStream(allocate);
    }

    private List<MemoryRecords> createBatches() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new SimpleRecord(1L, "k1".getBytes(), "v1".getBytes()));
        arrayList.add(new SimpleRecord(2L, "k2".getBytes(), "v2".getBytes()));
        arrayList.add(new SimpleRecord(3L, "k3".getBytes(), "v3".getBytes()));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(createRecords(arrayList, 100L, 0));
        arrayList2.add(createRecords(arrayList, 103L, 0));
        arrayList2.add(createRecords(arrayList, 106L, 0));
        arrayList2.add(createRecords(arrayList, 109L, 0));
        return arrayList2;
    }

    private void testExpected(ByteBuffer byteBuffer, Long l, Long l2, Long l3) throws IOException {
        byteBuffer.position(0);
        ReclaimableMemoryRecords reclaimableMemoryRecords = this.reader.readRecords(CancellationContext.newContext().subContext(), Optional.empty(), new ByteBufferInputStream(byteBuffer), 1000, l.longValue(), 0, byteBuffer.limit()).records;
        Long l4 = null;
        Long l5 = null;
        if (reclaimableMemoryRecords.sizeInBytes() != 0) {
            for (MutableRecordBatch mutableRecordBatch : reclaimableMemoryRecords.batches()) {
                if (l4 == null) {
                    l4 = Long.valueOf(mutableRecordBatch.baseOffset());
                }
                l5 = Long.valueOf(mutableRecordBatch.lastOffset());
            }
        }
        Assertions.assertEquals(l2, l4);
        Assertions.assertEquals(l3, l5);
    }

    private static <T extends Throwable> void testThrows(TierSegmentReader tierSegmentReader, ByteBuffer byteBuffer, long j, Class<T> cls) {
        byteBuffer.position(0);
        ByteBufferInputStream byteBufferInputStream = new ByteBufferInputStream(byteBuffer);
        CancellationContext newContext = CancellationContext.newContext();
        Assertions.assertThrows(cls, () -> {
            tierSegmentReader.readRecords(newContext.subContext(), Optional.empty(), byteBufferInputStream, 1000, j, 0, byteBuffer.limit());
        });
    }

    private void testReadSegment(List<MemoryRecords> list, int i) throws IOException {
        TreeMap treeMap = new TreeMap();
        ArrayList arrayList = new ArrayList();
        CancellationContext newContext = CancellationContext.newContext();
        int intValue = list.get(0).firstBatchSize().intValue() * list.size();
        int i2 = 0;
        for (MemoryRecords memoryRecords : list) {
            RecordBatch firstBatch = memoryRecords.firstBatch();
            treeMap.put(Long.valueOf(firstBatch.baseOffset()), new BatchAndPosition(memoryRecords, i2));
            arrayList.add(Long.valueOf(firstBatch.baseOffset()));
            arrayList.add(Long.valueOf(firstBatch.baseOffset() + 1));
            i2 += memoryRecords.sizeInBytes();
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            long longValue = ((Long) it.next()).longValue();
            TreeSet treeSet = (TreeSet) treeMap.navigableKeySet().tailSet(Long.valueOf(((Long) treeMap.floorKey(Long.valueOf(longValue))).longValue())).stream().limit(Math.max(1, i / ((BatchAndPosition) treeMap.get(Long.valueOf(r0))).records.sizeInBytes())).collect(Collectors.toCollection(TreeSet::new));
            Long l = (Long) treeMap.higherKey(Long.valueOf(((Long) treeSet.last()).longValue()));
            TierSegmentReader.RecordsAndNextBatchMetadata readRecords = this.reader.readRecords(newContext, Optional.empty(), toStream(list), i, longValue, 0, intValue);
            if (treeSet.isEmpty()) {
                Assertions.assertNull(readRecords);
            } else {
                Iterator it2 = treeSet.iterator();
                RecordBatch recordBatch = null;
                for (RecordBatch recordBatch2 : readRecords.records.batches()) {
                    Assertions.assertEquals(((Long) it2.next()).longValue(), recordBatch2.baseOffset());
                    recordBatch = recordBatch2;
                }
                if (l != null) {
                    BatchAndPosition batchAndPosition = (BatchAndPosition) treeMap.get(l);
                    Assertions.assertEquals(recordBatch.nextOffset(), readRecords.nextOffsetAndBatchMetadata.nextOffset);
                    Assertions.assertEquals(l.longValue(), readRecords.nextOffsetAndBatchMetadata.nextOffset);
                    Assertions.assertEquals(batchAndPosition.bytePosition, readRecords.nextOffsetAndBatchMetadata.nextBatchMetadata.bytePosition);
                    if (readRecords.nextOffsetAndBatchMetadata.nextBatchMetadata.recordBatchSize.isPresent()) {
                        Assertions.assertEquals(batchAndPosition.records.sizeInBytes(), readRecords.nextOffsetAndBatchMetadata.nextBatchMetadata.recordBatchSize.getAsInt());
                    }
                } else {
                    Assertions.assertNull(readRecords.nextOffsetAndBatchMetadata);
                }
            }
        }
    }

    private void assertCorrectOffsetForTimestamp(ByteBuffer byteBuffer, long j, Optional<Long> optional) {
        byteBuffer.position(0);
        try {
            Assertions.assertEquals(optional, this.reader.offsetForTimestamp(CancellationContext.newContext().subContext(), new ByteBufferInputStream(byteBuffer), j, byteBuffer.limit()));
        } catch (IOException e) {
            Assertions.fail("IOexception encountered");
        }
    }
}
