package io.confluent.kafka.replication.push.buffer;

import io.confluent.kafka.replication.push.PushSession;
import io.confluent.kafka.replication.push.ReplicationConfig;
import io.confluent.kafka.replication.push.buffer.BufferingPartitionDataBuilder;
import io.confluent.kafka.replication.push.buffer.PushReplicationEvent;
import io.confluent.kafka.replication.push.utils.TestPushSession;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AppendRecordsRequestData;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/confluent/kafka/replication/push/buffer/BufferingAppendRecordsBuilderTest.class */
public class BufferingAppendRecordsBuilderTest {
    private ReplicationConfig config;
    private final RefCountingMemoryTracker<MemoryRecords> memoryTracker = (RefCountingMemoryTracker) Mockito.mock(RefCountingMemoryTracker.class);
    private static final Uuid TOPIC_UUID = Uuid.randomUuid();
    private static final TopicIdPartition PARTITION_0 = new TopicIdPartition(TOPIC_UUID, new TopicPartition("foo", 0));
    private static final int REPLICA_ID = 1;
    private static final TopicIdPartition PARTITION_1 = new TopicIdPartition(TOPIC_UUID, new TopicPartition("foo", REPLICA_ID));

    @BeforeEach
    public void setUp() {
        this.config = new ReplicationConfig(new HashMap());
    }

    @Test
    public void testStartPushCannotBeProcessed() {
        TestPushSession testPushSession = new TestPushSession(REPLICA_ID, 1L, 1L);
        Assertions.assertFalse(new BufferingAppendRecordsBuilder(REPLICA_ID, testPushSession.replicaEpoch(), this.config, this.memoryTracker, Time.SYSTEM).processEvent(PushReplicationEvent.forStartPush(PARTITION_0, REPLICA_ID, testPushSession), testPushSession));
    }

    @Test
    public void testFileRecordsCannotBeProcessed() {
        TestPushSession testPushSession = new TestPushSession(REPLICA_ID, 1L, 1L);
        Assertions.assertFalse(new BufferingAppendRecordsBuilder(REPLICA_ID, testPushSession.replicaEpoch(), this.config, this.memoryTracker, Time.SYSTEM).processEvent(PushReplicationEvent.forRecords(PARTITION_0, REPLICA_ID, (AbstractRecords) Mockito.mock(FileRecords.class), 2L, 2L), testPushSession));
    }

    @Test
    public void testStartPush() {
        TestPushSession testPushSession = new TestPushSession(REPLICA_ID, 1L, 1L);
        BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder = new BufferingAppendRecordsBuilder(REPLICA_ID, testPushSession.replicaEpoch(), this.config, this.memoryTracker, Time.SYSTEM);
        PushReplicationEvent forRecords = PushReplicationEvent.forRecords(PARTITION_0, REPLICA_ID, (AbstractRecords) Mockito.mock(FileRecords.class), 2L, 2L);
        Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(PushReplicationEvent.forTransitionRecords(forRecords, testPushSession.startFuture()), testPushSession));
        Assertions.assertFalse(bufferingAppendRecordsBuilder.processEvent(PushReplicationEvent.forRecords(PARTITION_0, REPLICA_ID, (AbstractRecords) Mockito.mock(MemoryRecords.class), 1L, 1L), testPushSession));
        PushReplicationEvent forHighWatermarkUpdate = PushReplicationEvent.forHighWatermarkUpdate(PARTITION_0, REPLICA_ID, 3L);
        Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(forHighWatermarkUpdate, testPushSession));
        PushReplicationEvent forLogStartOffsetUpdate = PushReplicationEvent.forLogStartOffsetUpdate(PARTITION_0, REPLICA_ID, 4L);
        Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(forLogStartOffsetUpdate, testPushSession));
        Assertions.assertFalse(testPushSession.startFuture().isDone());
        AppendRecordsRequestData build = bufferingAppendRecordsBuilder.build();
        Assertions.assertEquals(testPushSession.replicaEpoch(), build.replicaEpoch());
        Assertions.assertEquals(REPLICA_ID, build.topics().size());
        for (AppendRecordsRequestData.TopicData topicData : build.topics()) {
            Assertions.assertEquals(PARTITION_0.topicId(), topicData.topicId());
            Assertions.assertEquals(REPLICA_ID, topicData.partitions().size());
            for (AppendRecordsRequestData.PartitionData partitionData : topicData.partitions()) {
                Assertions.assertEquals(PARTITION_0.partition(), partitionData.partitionIndex());
                Assertions.assertEquals(testPushSession.replicationSessionId(), partitionData.replicationSessionId());
                Assertions.assertEquals(testPushSession.leaderEpoch(), partitionData.currentLeaderEpoch());
                Assertions.assertEquals(((PushReplicationEvent.RecordsPayload) forRecords.payload()).records(), partitionData.records());
                Assertions.assertEquals(((PushReplicationEvent.RecordsPayload) forRecords.payload()).appendOffset(), partitionData.appendOffset());
                Assertions.assertEquals(Math.max(((PushReplicationEvent.RecordsPayload) forRecords.payload()).highWatermark(), ((PushReplicationEvent.OffsetPayload) forHighWatermarkUpdate.payload()).offset()), partitionData.highWatermark());
                Assertions.assertEquals(((PushReplicationEvent.OffsetPayload) forLogStartOffsetUpdate.payload()).offset(), partitionData.logStartOffset());
            }
        }
        Assertions.assertTrue(testPushSession.startFuture().isDone());
    }

    @Test
    public void testPartitionDataLimitReachedOnMultiplePartitions() {
        Random random = new Random();
        TopicIdPartition[] topicIdPartitionArr = {PARTITION_0, PARTITION_1};
        this.config = new ReplicationConfig(new HashMap<String, Long>() { // from class: io.confluent.kafka.replication.push.buffer.BufferingAppendRecordsBuilderTest.1
            {
                put("confluent.replication.request.max.partition.bytes", 5242880L);
            }
        });
        HashMap hashMap = new HashMap();
        int abs = Math.abs(random.nextInt());
        long abs2 = Math.abs(random.nextInt());
        int length = topicIdPartitionArr.length;
        for (int i = 0; i < length; i += REPLICA_ID) {
            hashMap.put(Integer.valueOf(topicIdPartitionArr[i].partition()), new TestPushSession(abs, abs2, Math.abs(random.nextLong())));
        }
        BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder = new BufferingAppendRecordsBuilder(REPLICA_ID, abs2, this.config, this.memoryTracker, Time.SYSTEM);
        long maxRequestPartitionSizeBytes = this.config.maxRequestPartitionSizeBytes() / 10;
        MemoryRecords memoryRecords = (MemoryRecords) Mockito.mock(MemoryRecords.class);
        Mockito.when(Integer.valueOf(memoryRecords.sizeInBytes())).thenReturn(Integer.valueOf((int) maxRequestPartitionSizeBytes));
        long j = -1;
        long j2 = -1;
        int length2 = topicIdPartitionArr.length;
        for (int i2 = 0; i2 < length2; i2 += REPLICA_ID) {
            TopicIdPartition topicIdPartition = topicIdPartitionArr[i2];
            PushSession pushSession = (PushSession) hashMap.get(Integer.valueOf(topicIdPartition.partition()));
            for (int i3 = 0; i3 < 10; i3 += REPLICA_ID) {
                PushReplicationEvent forRecords = PushReplicationEvent.forRecords(topicIdPartition, REPLICA_ID, memoryRecords, i3 + REPLICA_ID, i3);
                Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(forRecords, pushSession));
                if (j == -1) {
                    j = ((PushReplicationEvent.RecordsPayload) forRecords.payload()).appendOffset();
                }
                j2 = Math.max(j2, ((PushReplicationEvent.RecordsPayload) forRecords.payload()).highWatermark());
            }
            Assertions.assertFalse(bufferingAppendRecordsBuilder.processEvent(PushReplicationEvent.forRecords(topicIdPartition, REPLICA_ID, memoryRecords, 1L, 0L), pushSession));
        }
        AppendRecordsRequestData build = bufferingAppendRecordsBuilder.build();
        Assertions.assertEquals(abs2, build.replicaEpoch());
        Assertions.assertEquals(REPLICA_ID, build.topics().size());
        for (AppendRecordsRequestData.TopicData topicData : build.topics()) {
            Assertions.assertEquals(TOPIC_UUID, topicData.topicId());
            Assertions.assertEquals(topicIdPartitionArr.length, topicData.partitions().size());
            for (AppendRecordsRequestData.PartitionData partitionData : topicData.partitions()) {
                int partitionIndex = partitionData.partitionIndex();
                Assertions.assertTrue(hashMap.containsKey(Integer.valueOf(partitionIndex)));
                Assertions.assertEquals(((PushSession) hashMap.get(Integer.valueOf(partitionIndex))).replicationSessionId(), partitionData.replicationSessionId());
                Assertions.assertEquals(abs, partitionData.currentLeaderEpoch());
                Assertions.assertTrue(partitionData.records() instanceof BufferingPartitionDataBuilder.PartitionRecords);
                BufferingPartitionDataBuilder.PartitionRecords records = partitionData.records();
                Assertions.assertEquals(10 * maxRequestPartitionSizeBytes, records.sizeInBytes());
                Assertions.assertEquals(10, records.buffers().length);
                Assertions.assertEquals(j, partitionData.appendOffset());
                Assertions.assertEquals(j2, partitionData.highWatermark());
                Assertions.assertEquals(-1L, partitionData.logStartOffset());
            }
        }
    }

    @Test
    public void testRequestDataLimitReached() {
        Random random = new Random();
        TopicIdPartition[] topicIdPartitionArr = new TopicIdPartition[100];
        for (int i = 0; i < topicIdPartitionArr.length; i += REPLICA_ID) {
            topicIdPartitionArr[i] = new TopicIdPartition(TOPIC_UUID, new TopicPartition("bar", i));
        }
        final long j = (100 * 9) / 10;
        this.config = new ReplicationConfig(new HashMap<String, Long>() { // from class: io.confluent.kafka.replication.push.buffer.BufferingAppendRecordsBuilderTest.2
            {
                put("confluent.replication.request.max.partition.bytes", Long.valueOf(52428800 / j));
            }
        });
        HashMap hashMap = new HashMap();
        int abs = Math.abs(random.nextInt());
        long abs2 = Math.abs(random.nextInt());
        int length = topicIdPartitionArr.length;
        for (int i2 = 0; i2 < length; i2 += REPLICA_ID) {
            hashMap.put(Integer.valueOf(topicIdPartitionArr[i2].partition()), new TestPushSession(abs, abs2, Math.abs(random.nextLong())));
        }
        BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder = new BufferingAppendRecordsBuilder(REPLICA_ID, abs2, this.config, this.memoryTracker, Time.SYSTEM);
        long maxRequestPartitionSizeBytes = this.config.maxRequestPartitionSizeBytes() / 101;
        MemoryRecords memoryRecords = (MemoryRecords) Mockito.mock(MemoryRecords.class);
        Mockito.when(Integer.valueOf(memoryRecords.sizeInBytes())).thenReturn(Integer.valueOf((int) maxRequestPartitionSizeBytes));
        long[] jArr = new long[100];
        Arrays.fill(jArr, -1L);
        long[] jArr2 = new long[100];
        Arrays.fill(jArr2, -1L);
        long j2 = 0;
        TopicIdPartition topicIdPartition = null;
        int length2 = topicIdPartitionArr.length;
        for (int i3 = 0; i3 < length2; i3 += REPLICA_ID) {
            TopicIdPartition topicIdPartition2 = topicIdPartitionArr[i3];
            if (topicIdPartition != null) {
                break;
            }
            long j3 = 0;
            int partition = topicIdPartition2.partition();
            PushSession pushSession = (PushSession) hashMap.get(Integer.valueOf(partition));
            int i4 = 0;
            while (true) {
                if (i4 < 101) {
                    j3 += random.nextInt(1000);
                    PushReplicationEvent forRecords = PushReplicationEvent.forRecords(topicIdPartition2, REPLICA_ID, memoryRecords, j3, j3 - 1);
                    if (j2 + memoryRecords.sizeInBytes() > this.config.maxRequestSizeBytes()) {
                        Assertions.assertFalse(bufferingAppendRecordsBuilder.processEvent(forRecords, pushSession));
                        topicIdPartition = topicIdPartition2;
                        break;
                    }
                    Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(forRecords, pushSession));
                    if (jArr[partition] == -1) {
                        jArr[partition] = ((PushReplicationEvent.RecordsPayload) forRecords.payload()).appendOffset();
                    }
                    jArr2[partition] = Math.max(jArr2[partition], ((PushReplicationEvent.RecordsPayload) forRecords.payload()).highWatermark());
                    j2 += memoryRecords.sizeInBytes();
                    i4 += REPLICA_ID;
                }
            }
        }
        Assertions.assertNotNull(topicIdPartition);
        AppendRecordsRequestData build = bufferingAppendRecordsBuilder.build();
        Assertions.assertEquals(abs2, build.replicaEpoch());
        Assertions.assertEquals(REPLICA_ID, build.topics().size());
        for (AppendRecordsRequestData.TopicData topicData : build.topics()) {
            Assertions.assertEquals(TOPIC_UUID, topicData.topicId());
            Assertions.assertEquals(topicIdPartition.partition() + REPLICA_ID, topicData.partitions().size());
            for (AppendRecordsRequestData.PartitionData partitionData : topicData.partitions()) {
                int partitionIndex = partitionData.partitionIndex();
                Assertions.assertEquals(((PushSession) hashMap.get(Integer.valueOf(partitionIndex))).replicationSessionId(), partitionData.replicationSessionId());
                Assertions.assertEquals(abs, partitionData.currentLeaderEpoch());
                Assertions.assertTrue(partitionData.records() instanceof BufferingPartitionDataBuilder.PartitionRecords);
                BufferingPartitionDataBuilder.PartitionRecords records = partitionData.records();
                if (partitionIndex != topicIdPartition.partition()) {
                    Assertions.assertEquals(101 * maxRequestPartitionSizeBytes, records.sizeInBytes());
                    Assertions.assertEquals(101, records.buffers().length);
                }
                Assertions.assertEquals(jArr[partitionIndex], partitionData.appendOffset());
                Assertions.assertEquals(jArr2[partitionIndex], partitionData.highWatermark());
                Assertions.assertEquals(-1L, partitionData.logStartOffset());
            }
        }
    }

    @Test
    public void testStopPush() {
        TestPushSession testPushSession = new TestPushSession(12, 1L, 34L);
        TestPushSession testPushSession2 = new TestPushSession(56, 1L, 78L);
        AtomicInteger atomicInteger = new AtomicInteger();
        Mockito.when(Integer.valueOf(this.memoryTracker.countDown(ArgumentMatchers.any()))).then(invocationOnMock -> {
            atomicInteger.incrementAndGet();
            return 0;
        });
        BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder = new BufferingAppendRecordsBuilder(REPLICA_ID, 1L, this.config, this.memoryTracker, Time.SYSTEM);
        PushReplicationEvent forStopPush = PushReplicationEvent.forStopPush(PARTITION_0, REPLICA_ID);
        long maxRequestPartitionSizeBytes = this.config.maxRequestPartitionSizeBytes() / 10;
        MemoryRecords memoryRecords = (MemoryRecords) Mockito.mock(MemoryRecords.class);
        Mockito.when(Integer.valueOf(memoryRecords.sizeInBytes())).thenReturn(Integer.valueOf((int) maxRequestPartitionSizeBytes));
        for (int i = 0; i < 10; i += REPLICA_ID) {
            Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(PushReplicationEvent.forRecords(PARTITION_0, REPLICA_ID, memoryRecords, i + REPLICA_ID, i), testPushSession));
        }
        Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(forStopPush, testPushSession));
        Assertions.assertEquals(10, atomicInteger.get());
        Assertions.assertFalse(((CompletableFuture) forStopPush.payload()).isDone());
        long j = -1;
        long j2 = -1;
        for (int i2 = 0; i2 < 10; i2 += REPLICA_ID) {
            PushReplicationEvent forRecords = PushReplicationEvent.forRecords(PARTITION_1, REPLICA_ID, memoryRecords, i2 + REPLICA_ID, i2);
            Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(forRecords, testPushSession2));
            if (j == -1) {
                j = ((PushReplicationEvent.RecordsPayload) forRecords.payload()).appendOffset();
            }
            j2 = Math.max(j2, ((PushReplicationEvent.RecordsPayload) forRecords.payload()).highWatermark());
        }
        AppendRecordsRequestData build = bufferingAppendRecordsBuilder.build();
        Assertions.assertEquals(1L, build.replicaEpoch());
        Assertions.assertEquals(REPLICA_ID, build.topics().size());
        for (AppendRecordsRequestData.TopicData topicData : build.topics()) {
            Assertions.assertEquals(TOPIC_UUID, topicData.topicId());
            Assertions.assertEquals(2, topicData.partitions().size());
            for (AppendRecordsRequestData.PartitionData partitionData : topicData.partitions()) {
                if (partitionData.partitionIndex() == PARTITION_0.partition()) {
                    Assertions.assertTrue(partitionData.endReplicationSession());
                    Assertions.assertEquals(testPushSession.replicationSessionId(), partitionData.replicationSessionId());
                    Assertions.assertEquals(testPushSession.leaderEpoch(), partitionData.currentLeaderEpoch());
                    Assertions.assertNull(partitionData.records());
                    Assertions.assertEquals(-1L, partitionData.appendOffset());
                    Assertions.assertEquals(-1L, partitionData.highWatermark());
                    Assertions.assertEquals(-1L, partitionData.logStartOffset());
                } else {
                    Assertions.assertFalse(partitionData.endReplicationSession());
                    Assertions.assertEquals(testPushSession2.replicationSessionId(), partitionData.replicationSessionId());
                    Assertions.assertEquals(testPushSession2.leaderEpoch(), partitionData.currentLeaderEpoch());
                    Assertions.assertTrue(partitionData.records() instanceof BufferingPartitionDataBuilder.PartitionRecords);
                    Assertions.assertEquals(10, partitionData.records().buffers().length);
                    Assertions.assertEquals(j, partitionData.appendOffset());
                    Assertions.assertEquals(j2, partitionData.highWatermark());
                    Assertions.assertEquals(-1L, partitionData.logStartOffset());
                }
            }
        }
        Assertions.assertTrue(((CompletableFuture) forStopPush.payload()).isDone());
    }

    @Test
    public void testInconsistentReplicaEpochs() {
        ReplicationConfig replicationConfig = new ReplicationConfig(new HashMap());
        TestPushSession testPushSession = new TestPushSession(REPLICA_ID, 1L, 1L);
        TestPushSession testPushSession2 = new TestPushSession(REPLICA_ID, 1 + 1, 1L);
        AtomicInteger atomicInteger = new AtomicInteger();
        Mockito.when(Integer.valueOf(this.memoryTracker.countDown(ArgumentMatchers.any()))).then(invocationOnMock -> {
            atomicInteger.incrementAndGet();
            return 0;
        });
        BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder = new BufferingAppendRecordsBuilder(REPLICA_ID, 1L, replicationConfig, this.memoryTracker, Time.SYSTEM);
        MemoryRecords memoryRecords = (MemoryRecords) Mockito.mock(MemoryRecords.class);
        for (int i = 0; i < 10; i += REPLICA_ID) {
            Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(PushReplicationEvent.forRecords(PARTITION_0, REPLICA_ID, memoryRecords, i + REPLICA_ID, i), testPushSession));
        }
        PushReplicationEvent forRecords = PushReplicationEvent.forRecords(PARTITION_0, REPLICA_ID, memoryRecords, 10 + REPLICA_ID, 10);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            bufferingAppendRecordsBuilder.processEvent(forRecords, testPushSession2);
        });
    }

    @Test
    public void testIsRequestReadyWithMaxWaitMs() {
        MockTime mockTime = new MockTime();
        TestPushSession testPushSession = new TestPushSession(12, 1L, 34L);
        BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder = new BufferingAppendRecordsBuilder(REPLICA_ID, 1L, this.config, this.memoryTracker, mockTime);
        mockTime.sleep(this.config.maxWaitMs() + REPLICA_ID);
        Assertions.assertFalse(bufferingAppendRecordsBuilder.isRequestReady());
        BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder2 = new BufferingAppendRecordsBuilder(REPLICA_ID, 1L, this.config, this.memoryTracker, mockTime);
        bufferingAppendRecordsBuilder2.processEvent(PushReplicationEvent.forHighWatermarkUpdate(PARTITION_0, REPLICA_ID, 5L), testPushSession);
        Assertions.assertFalse(bufferingAppendRecordsBuilder2.isRequestReady());
        mockTime.sleep(this.config.maxWaitMs() - REPLICA_ID);
        Assertions.assertFalse(bufferingAppendRecordsBuilder2.isRequestReady());
        mockTime.sleep(1L);
        Assertions.assertTrue(bufferingAppendRecordsBuilder2.isRequestReady());
        BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder3 = new BufferingAppendRecordsBuilder(REPLICA_ID, 1L, this.config, this.memoryTracker, mockTime);
        bufferingAppendRecordsBuilder3.processEvent(PushReplicationEvent.forLogStartOffsetUpdate(PARTITION_0, REPLICA_ID, 3L), testPushSession);
        Assertions.assertFalse(bufferingAppendRecordsBuilder3.isRequestReady());
        mockTime.sleep(this.config.maxWaitMs() - REPLICA_ID);
        Assertions.assertFalse(bufferingAppendRecordsBuilder3.isRequestReady());
        mockTime.sleep(1L);
        Assertions.assertTrue(bufferingAppendRecordsBuilder3.isRequestReady());
    }

    @Test
    public void testIsRequestReadyWithLingerMs() {
        MockTime mockTime = new MockTime();
        TestPushSession testPushSession = new TestPushSession(12, 1L, 34L);
        BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder = new BufferingAppendRecordsBuilder(REPLICA_ID, 1L, new ReplicationConfig(Collections.singletonMap("confluent.replication.linger.ms", 10)), this.memoryTracker, mockTime);
        bufferingAppendRecordsBuilder.processEvent(PushReplicationEvent.forHighWatermarkUpdate(PARTITION_0, REPLICA_ID, 5L), testPushSession);
        mockTime.sleep(r0.lingerMs());
        Assertions.assertFalse(bufferingAppendRecordsBuilder.isRequestReady());
        Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(PushReplicationEvent.forRecords(PARTITION_0, REPLICA_ID, new MemoryRecords(ByteBuffer.allocate(1024)), 6L, 5L), testPushSession));
        Assertions.assertFalse(bufferingAppendRecordsBuilder.isRequestReady());
        mockTime.sleep(r0.lingerMs() - REPLICA_ID);
        Assertions.assertFalse(bufferingAppendRecordsBuilder.isRequestReady());
        mockTime.sleep(1L);
        Assertions.assertTrue(bufferingAppendRecordsBuilder.isRequestReady());
    }

    @Test
    public void testLingerTimerBeginsOnceRecordsAdded() {
        MockTime mockTime = new MockTime();
        TestPushSession testPushSession = new TestPushSession(12, 1L, 34L);
        BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder = new BufferingAppendRecordsBuilder(REPLICA_ID, 1L, new ReplicationConfig(Collections.singletonMap("confluent.replication.linger.ms", 10)), this.memoryTracker, mockTime);
        mockTime.sleep(r0.lingerMs() + REPLICA_ID);
        Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(PushReplicationEvent.forRecords(PARTITION_0, REPLICA_ID, new MemoryRecords(ByteBuffer.allocate(1024)), 6L, 5L), testPushSession));
        Assertions.assertFalse(bufferingAppendRecordsBuilder.isRequestReady());
        mockTime.sleep(r0.lingerMs() - REPLICA_ID);
        Assertions.assertFalse(bufferingAppendRecordsBuilder.isRequestReady());
        mockTime.sleep(1L);
        Assertions.assertTrue(bufferingAppendRecordsBuilder.isRequestReady());
    }

    @Test
    public void testLingerTimerResetWhenAllRecordsRemoved() {
        MockTime mockTime = new MockTime();
        TestPushSession testPushSession = new TestPushSession(12, 1L, 34L);
        BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder = new BufferingAppendRecordsBuilder(REPLICA_ID, 1L, new ReplicationConfig(Collections.singletonMap("confluent.replication.linger.ms", 10)), this.memoryTracker, mockTime);
        Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(PushReplicationEvent.forRecords(PARTITION_0, REPLICA_ID, new MemoryRecords(ByteBuffer.allocate(1024)), 6L, 5L), testPushSession));
        mockTime.sleep(r0.lingerMs());
        Assertions.assertTrue(bufferingAppendRecordsBuilder.isRequestReady());
        Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(PushReplicationEvent.forStopPush(PARTITION_0, REPLICA_ID), testPushSession));
        Assertions.assertFalse(bufferingAppendRecordsBuilder.isRequestReady());
    }

    @Test
    public void testLingerTimerNotResetWhenSomeRecordsRemoved() {
        MockTime mockTime = new MockTime();
        TestPushSession testPushSession = new TestPushSession(12, 1L, 34L);
        BufferingAppendRecordsBuilder bufferingAppendRecordsBuilder = new BufferingAppendRecordsBuilder(REPLICA_ID, 1L, new ReplicationConfig(Collections.singletonMap("confluent.replication.linger.ms", 10)), this.memoryTracker, mockTime);
        Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(PushReplicationEvent.forRecords(PARTITION_0, REPLICA_ID, new MemoryRecords(ByteBuffer.allocate(1024)), 6L, 5L), testPushSession));
        mockTime.sleep(r0.lingerMs());
        Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(PushReplicationEvent.forRecords(PARTITION_1, REPLICA_ID, new MemoryRecords(ByteBuffer.allocate(1024)), 6L, 5L), testPushSession));
        Assertions.assertTrue(bufferingAppendRecordsBuilder.isRequestReady());
        Assertions.assertTrue(bufferingAppendRecordsBuilder.processEvent(PushReplicationEvent.forStopPush(PARTITION_0, REPLICA_ID), testPushSession));
        Assertions.assertTrue(bufferingAppendRecordsBuilder.isRequestReady());
    }
}
