package org.apache.kafka.raft.internals;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.Writable;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/raft/internals/BatchAccumulatorTest.class */
class BatchAccumulatorTest {
    private final MemoryPool memoryPool = (MemoryPool) Mockito.mock(MemoryPool.class);
    private final MockTime time = new MockTime();
    private final StringSerde serde = new StringSerde();
    static final Appender APPEND_ATOMIC = new Appender() { // from class: org.apache.kafka.raft.internals.BatchAccumulatorTest.1
        @Override // org.apache.kafka.raft.internals.BatchAccumulatorTest.Appender
        public Long call(BatchAccumulator<String> batchAccumulator, int i, List<String> list) {
            return Long.valueOf(batchAccumulator.append(i, list, OptionalLong.empty(), true));
        }
    };
    static final Appender APPEND = new Appender() { // from class: org.apache.kafka.raft.internals.BatchAccumulatorTest.2
        @Override // org.apache.kafka.raft.internals.BatchAccumulatorTest.Appender
        public Long call(BatchAccumulator<String> batchAccumulator, int i, List<String> list) {
            return Long.valueOf(batchAccumulator.append(i, list, OptionalLong.empty(), false));
        }
    };

    /* loaded from: input_file:org/apache/kafka/raft/internals/BatchAccumulatorTest$Appender.class */
    interface Appender {
        Long call(BatchAccumulator<String> batchAccumulator, int i, List<String> list);
    }

    BatchAccumulatorTest() {
    }

    private BatchAccumulator<String> buildAccumulator(int i, long j, int i2, int i3) {
        return new BatchAccumulator<>(i, j, i2, i3, this.memoryPool, this.time, CompressionType.NONE, this.serde);
    }

    @Test
    public void testLeaderChangeMessageWritten() {
        ByteBuffer allocate = ByteBuffer.allocate(256);
        Mockito.when(this.memoryPool.tryAllocate(256)).thenReturn(allocate);
        BatchAccumulator<String> buildAccumulator = buildAccumulator(17, 0L, 50, 512);
        buildAccumulator.appendLeaderChangeMessage(new LeaderChangeMessage(), this.time.milliseconds());
        Assertions.assertTrue(buildAccumulator.needsDrain(this.time.milliseconds()));
        List drain = buildAccumulator.drain();
        Assertions.assertEquals(1, drain.size());
        ((BatchAccumulator.CompletedBatch) drain.get(0)).release();
        ((MemoryPool) Mockito.verify(this.memoryPool)).release(allocate);
    }

    @Test
    public void testForceDrain() {
        Arrays.asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
            Mockito.when(this.memoryPool.tryAllocate(512)).thenReturn(ByteBuffer.allocate(512));
            BatchAccumulator<String> buildAccumulator = buildAccumulator(17, 157L, 50, 512);
            List asList = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i");
            Assertions.assertEquals(157L, appender.call(buildAccumulator, 17, asList.subList(0, 1)));
            Assertions.assertEquals(157 + 2, appender.call(buildAccumulator, 17, asList.subList(1, 3)));
            Assertions.assertEquals(157 + 5, appender.call(buildAccumulator, 17, asList.subList(3, 6)));
            Assertions.assertEquals(157 + 7, appender.call(buildAccumulator, 17, asList.subList(6, 8)));
            Assertions.assertEquals(157 + 8, appender.call(buildAccumulator, 17, asList.subList(8, 9)));
            Assertions.assertFalse(buildAccumulator.needsDrain(this.time.milliseconds()));
            buildAccumulator.forceDrain();
            Assertions.assertTrue(buildAccumulator.needsDrain(this.time.milliseconds()));
            Assertions.assertEquals(0L, buildAccumulator.timeUntilDrain(this.time.milliseconds()));
            List drain = buildAccumulator.drain();
            Assertions.assertEquals(1, drain.size());
            Assertions.assertFalse(buildAccumulator.needsDrain(this.time.milliseconds()));
            Assertions.assertEquals(Long.MAX_VALUE - this.time.milliseconds(), buildAccumulator.timeUntilDrain(this.time.milliseconds()));
            BatchAccumulator.CompletedBatch completedBatch = (BatchAccumulator.CompletedBatch) drain.get(0);
            Assertions.assertEquals(asList, completedBatch.records.get());
            Assertions.assertEquals(157L, completedBatch.baseOffset);
            Assertions.assertEquals(this.time.milliseconds(), completedBatch.appendTimestamp());
        });
    }

    @Test
    public void testForceDrainBeforeAppendLeaderChangeMessage() {
        Arrays.asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
            Mockito.when(this.memoryPool.tryAllocate(512)).thenReturn(ByteBuffer.allocate(512));
            Mockito.when(this.memoryPool.tryAllocate(256)).thenReturn(ByteBuffer.allocate(256));
            BatchAccumulator<String> buildAccumulator = buildAccumulator(17, 157L, 50, 512);
            List asList = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i");
            Assertions.assertEquals(157L, appender.call(buildAccumulator, 17, asList.subList(0, 1)));
            Assertions.assertEquals(157 + 2, appender.call(buildAccumulator, 17, asList.subList(1, 3)));
            Assertions.assertEquals(157 + 5, appender.call(buildAccumulator, 17, asList.subList(3, 6)));
            Assertions.assertEquals(157 + 7, appender.call(buildAccumulator, 17, asList.subList(6, 8)));
            Assertions.assertEquals(157 + 8, appender.call(buildAccumulator, 17, asList.subList(8, 9)));
            Assertions.assertFalse(buildAccumulator.needsDrain(this.time.milliseconds()));
            buildAccumulator.appendLeaderChangeMessage(new LeaderChangeMessage(), this.time.milliseconds());
            Assertions.assertTrue(buildAccumulator.needsDrain(this.time.milliseconds()));
            Assertions.assertEquals(0L, buildAccumulator.timeUntilDrain(this.time.milliseconds()));
            List drain = buildAccumulator.drain();
            Assertions.assertEquals(2, drain.size());
            Assertions.assertFalse(buildAccumulator.needsDrain(this.time.milliseconds()));
            Assertions.assertEquals(Long.MAX_VALUE - this.time.milliseconds(), buildAccumulator.timeUntilDrain(this.time.milliseconds()));
            BatchAccumulator.CompletedBatch completedBatch = (BatchAccumulator.CompletedBatch) drain.get(0);
            Assertions.assertEquals(asList, completedBatch.records.get());
            Assertions.assertEquals(157L, completedBatch.baseOffset);
            Assertions.assertEquals(this.time.milliseconds(), completedBatch.appendTimestamp());
        });
    }

    @Test
    public void testLingerIgnoredIfAccumulatorEmpty() {
        BatchAccumulator<String> buildAccumulator = buildAccumulator(17, 157L, 50, 512);
        Assertions.assertTrue(buildAccumulator.isEmpty());
        Assertions.assertFalse(buildAccumulator.needsDrain(this.time.milliseconds()));
        Assertions.assertEquals(Long.MAX_VALUE - this.time.milliseconds(), buildAccumulator.timeUntilDrain(this.time.milliseconds()));
    }

    @Test
    public void testLingerBeginsOnFirstWrite() {
        Mockito.when(this.memoryPool.tryAllocate(512)).thenReturn(ByteBuffer.allocate(512));
        BatchAccumulator<String> buildAccumulator = buildAccumulator(17, 157L, 50, 512);
        this.time.sleep(15L);
        Assertions.assertEquals(157L, buildAccumulator.append(17, Collections.singletonList("a"), OptionalLong.empty(), false));
        Assertions.assertEquals(50, buildAccumulator.timeUntilDrain(this.time.milliseconds()));
        Assertions.assertFalse(buildAccumulator.isEmpty());
        this.time.sleep(50 / 2);
        Assertions.assertEquals(50 / 2, buildAccumulator.timeUntilDrain(this.time.milliseconds()));
        Assertions.assertFalse(buildAccumulator.isEmpty());
        this.time.sleep(50 / 2);
        Assertions.assertEquals(0L, buildAccumulator.timeUntilDrain(this.time.milliseconds()));
        Assertions.assertTrue(buildAccumulator.needsDrain(this.time.milliseconds()));
        Assertions.assertFalse(buildAccumulator.isEmpty());
    }

    @Test
    public void testCompletedBatchReleaseBuffer() {
        ByteBuffer allocate = ByteBuffer.allocate(512);
        Mockito.when(this.memoryPool.tryAllocate(512)).thenReturn(allocate);
        BatchAccumulator<String> buildAccumulator = buildAccumulator(17, 157L, 50, 512);
        Assertions.assertEquals(157L, buildAccumulator.append(17, Collections.singletonList("a"), OptionalLong.empty(), false));
        this.time.sleep(50);
        List drain = buildAccumulator.drain();
        Assertions.assertEquals(1, drain.size());
        ((BatchAccumulator.CompletedBatch) drain.get(0)).release();
        ((MemoryPool) Mockito.verify(this.memoryPool)).release(allocate);
    }

    @Test
    public void testUnflushedBuffersReleasedByClose() {
        ByteBuffer allocate = ByteBuffer.allocate(512);
        Mockito.when(this.memoryPool.tryAllocate(512)).thenReturn(allocate);
        BatchAccumulator<String> buildAccumulator = buildAccumulator(17, 157L, 50, 512);
        Assertions.assertEquals(157L, buildAccumulator.append(17, Collections.singletonList("a"), OptionalLong.empty(), false));
        buildAccumulator.close();
        ((MemoryPool) Mockito.verify(this.memoryPool)).release(allocate);
    }

    @Test
    public void testSingleBatchAccumulation() {
        Arrays.asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
            Mockito.when(this.memoryPool.tryAllocate(512)).thenReturn(ByteBuffer.allocate(512));
            BatchAccumulator<String> buildAccumulator = buildAccumulator(17, 157L, 50, 512);
            List asList = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i");
            Assertions.assertEquals(157L, appender.call(buildAccumulator, 17, asList.subList(0, 1)));
            Assertions.assertEquals(157 + 2, appender.call(buildAccumulator, 17, asList.subList(1, 3)));
            Assertions.assertEquals(157 + 5, appender.call(buildAccumulator, 17, asList.subList(3, 6)));
            Assertions.assertEquals(157 + 7, appender.call(buildAccumulator, 17, asList.subList(6, 8)));
            Assertions.assertEquals(157 + 8, appender.call(buildAccumulator, 17, asList.subList(8, 9)));
            long milliseconds = this.time.milliseconds();
            this.time.sleep(50);
            Assertions.assertTrue(buildAccumulator.needsDrain(this.time.milliseconds()));
            List drain = buildAccumulator.drain();
            Assertions.assertEquals(1, drain.size());
            Assertions.assertFalse(buildAccumulator.needsDrain(this.time.milliseconds()));
            Assertions.assertEquals(Long.MAX_VALUE - this.time.milliseconds(), buildAccumulator.timeUntilDrain(this.time.milliseconds()));
            BatchAccumulator.CompletedBatch completedBatch = (BatchAccumulator.CompletedBatch) drain.get(0);
            Assertions.assertEquals(asList, completedBatch.records.get());
            Assertions.assertEquals(157L, completedBatch.baseOffset);
            Assertions.assertEquals(milliseconds, completedBatch.appendTimestamp());
        });
    }

    @Test
    public void testMultipleBatchAccumulation() {
        Arrays.asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
            int i = 256;
            Mockito.when(this.memoryPool.tryAllocate(256)).thenReturn(ByteBuffer.allocate(256));
            BatchAccumulator<String> buildAccumulator = buildAccumulator(17, 157L, 50, 256);
            while (buildAccumulator.numCompletedBatches() < 3) {
                appender.call(buildAccumulator, 17, Collections.singletonList("foo"));
            }
            List drain = buildAccumulator.drain();
            Assertions.assertEquals(4, drain.size());
            Assertions.assertTrue(drain.stream().allMatch(completedBatch -> {
                return completedBatch.data.sizeInBytes() <= i;
            }));
        });
    }

    @Test
    public void testRecordsAreSplit() {
        String str = "a";
        int recordBatchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes((byte) 2, CompressionType.NONE) + (2 * recordSizeInBytes("a", 2));
        Mockito.when(this.memoryPool.tryAllocate(recordBatchHeaderSizeInBytes)).thenReturn(ByteBuffer.allocate(recordBatchHeaderSizeInBytes));
        BatchAccumulator<String> buildAccumulator = buildAccumulator(17, 157L, 50, recordBatchHeaderSizeInBytes);
        List list = (List) Stream.generate(() -> {
            return str;
        }).limit(9).collect(Collectors.toList());
        Assertions.assertEquals((157 + 9) - 1, buildAccumulator.append(17, list, OptionalLong.empty(), false));
        this.time.sleep(50);
        Assertions.assertTrue(buildAccumulator.needsDrain(this.time.milliseconds()));
        List drain = buildAccumulator.drain();
        Assertions.assertEquals(((list.size() + 2) - 1) / 2, drain.size());
        Assertions.assertTrue(drain.stream().allMatch(completedBatch -> {
            return completedBatch.data.sizeInBytes() <= recordBatchHeaderSizeInBytes;
        }));
    }

    @Test
    public void testCloseWhenEmpty() {
        buildAccumulator(17, 157L, 50, 256).close();
        Mockito.verifyNoInteractions(new Object[]{this.memoryPool});
    }

    @Test
    public void testDrainDoesNotBlockWithConcurrentAppend() throws Exception {
        int i = 17;
        StringSerde stringSerde = (StringSerde) Mockito.spy(new StringSerde());
        BatchAccumulator batchAccumulator = new BatchAccumulator(17, 157L, 50, 256, this.memoryPool, this.time, CompressionType.NONE, stringSerde);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Mockito.when(this.memoryPool.tryAllocate(256)).thenReturn(ByteBuffer.allocate(256));
        batchAccumulator.append(17, Collections.singletonList("a"), OptionalLong.empty(), false);
        ((StringSerde) Mockito.doAnswer(invocationOnMock -> {
            Writable writable = (Writable) invocationOnMock.getArgument(2);
            countDownLatch.countDown();
            countDownLatch2.await();
            writable.writeByteArray(Utils.utf8("b"));
            return null;
        }).when(stringSerde)).write((String) Mockito.eq("b"), (ObjectSerializationCache) Mockito.any(ObjectSerializationCache.class), (Writable) Mockito.any(Writable.class));
        Thread thread = new Thread(() -> {
            batchAccumulator.append(i, Collections.singletonList("b"), OptionalLong.empty(), false);
        });
        thread.start();
        countDownLatch.await();
        this.time.sleep(50);
        Assertions.assertTrue(batchAccumulator.needsDrain(this.time.milliseconds()));
        Assertions.assertEquals(Collections.emptyList(), batchAccumulator.drain());
        Assertions.assertTrue(batchAccumulator.needsDrain(this.time.milliseconds()));
        countDownLatch2.countDown();
        thread.join();
        List drain = batchAccumulator.drain();
        Assertions.assertEquals(1, drain.size());
        Assertions.assertEquals(Long.MAX_VALUE - this.time.milliseconds(), batchAccumulator.timeUntilDrain(this.time.milliseconds()));
        drain.stream().forEach(completedBatch -> {
            completedBatch.data.batches().forEach(mutableRecordBatch -> {
                Assertions.assertEquals(i, mutableRecordBatch.partitionLeaderEpoch());
            });
        });
    }

    int recordSizeInBytes(String str, int i) {
        int sizeOfBodyInBytes = DefaultRecord.sizeOfBodyInBytes(i, 0L, -1, this.serde.recordSize("a", new ObjectSerializationCache()), DefaultRecord.EMPTY_HEADERS);
        return ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testAppendWithRequiredBaseOffset(boolean z) {
        int i = 17;
        long j = 157;
        Mockito.when(this.memoryPool.tryAllocate(512)).thenReturn(ByteBuffer.allocate(512));
        BatchAccumulator<String> buildAccumulator = buildAccumulator(17, 157L, 50, 512);
        if (z) {
            Assertions.assertEquals(157L, buildAccumulator.append(17, Collections.singletonList("a"), OptionalLong.of(157L), true));
        } else {
            Assertions.assertEquals("Wanted base offset 156, but the next offset was 157", Assertions.assertThrows(UnexpectedBaseOffsetException.class, () -> {
                buildAccumulator.append(i, Collections.singletonList("a"), OptionalLong.of(j - 1), true);
            }).getMessage());
        }
        buildAccumulator.close();
    }
}
