package org.apache.kafka.connect.runtime;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.runtime.SubmittedRecords;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/connect/runtime/SubmittedRecordsTest.class */
public class SubmittedRecordsTest {
    private static final Map<String, Object> PARTITION1 = Collections.singletonMap("subreddit", "apachekafka");
    private static final Map<String, Object> PARTITION2 = Collections.singletonMap("subreddit", "adifferentvalue");
    private static final Map<String, Object> PARTITION3 = Collections.singletonMap("subreddit", "asdfqweoicus");
    private AtomicInteger offset;
    SubmittedRecords submittedRecords;

    @Before
    public void setup() {
        this.submittedRecords = new SubmittedRecords();
        this.offset = new AtomicInteger();
    }

    @Test
    public void testNoRecords() {
        Assert.assertTrue(this.submittedRecords.committableOffsets().isEmpty());
        Assert.assertTrue(this.submittedRecords.committableOffsets().isEmpty());
        Assert.assertTrue(this.submittedRecords.committableOffsets().isEmpty());
        assertNoRemainingDeques();
    }

    @Test
    public void testNoCommittedRecords() {
        for (int i = 0; i < 3; i++) {
            Iterator it = Arrays.asList(PARTITION1, PARTITION2, PARTITION3).iterator();
            while (it.hasNext()) {
                this.submittedRecords.submit((Map) it.next(), newOffset());
            }
        }
        SubmittedRecords.CommittableOffsets committableOffsets = this.submittedRecords.committableOffsets();
        assertMetadata(committableOffsets, 0, 9, 3, 3, PARTITION1, PARTITION2, PARTITION3);
        Assert.assertEquals(Collections.emptyMap(), committableOffsets.offsets());
        SubmittedRecords.CommittableOffsets committableOffsets2 = this.submittedRecords.committableOffsets();
        assertMetadata(committableOffsets2, 0, 9, 3, 3, PARTITION1, PARTITION2, PARTITION3);
        Assert.assertEquals(Collections.emptyMap(), committableOffsets2.offsets());
        SubmittedRecords.CommittableOffsets committableOffsets3 = this.submittedRecords.committableOffsets();
        assertMetadata(committableOffsets3, 0, 9, 3, 3, PARTITION1, PARTITION2, PARTITION3);
        Assert.assertEquals(Collections.emptyMap(), committableOffsets3.offsets());
    }

    @Test
    public void testSingleAck() {
        Map<String, Object> newOffset = newOffset();
        SubmittedRecords.SubmittedRecord submit = this.submittedRecords.submit(PARTITION1, newOffset);
        SubmittedRecords.CommittableOffsets committableOffsets = this.submittedRecords.committableOffsets();
        Assert.assertFalse(committableOffsets.isEmpty());
        Assert.assertEquals(Collections.emptyMap(), committableOffsets.offsets());
        assertMetadata(committableOffsets, 0, 1, 1, 1, PARTITION1);
        assertNoEmptyDeques();
        submit.ack();
        SubmittedRecords.CommittableOffsets committableOffsets2 = this.submittedRecords.committableOffsets();
        Assert.assertFalse(committableOffsets2.isEmpty());
        Assert.assertEquals(Collections.singletonMap(PARTITION1, newOffset), committableOffsets2.offsets());
        assertMetadataNoPending(committableOffsets2, 1);
        assertNoRemainingDeques();
        SubmittedRecords.CommittableOffsets committableOffsets3 = this.submittedRecords.committableOffsets();
        Assert.assertEquals(Collections.emptyMap(), committableOffsets3.offsets());
        Assert.assertTrue(committableOffsets3.isEmpty());
    }

    @Test
    public void testMultipleAcksAcrossMultiplePartitions() {
        Map<String, Object> newOffset = newOffset();
        Map<String, Object> newOffset2 = newOffset();
        Map<String, Object> newOffset3 = newOffset();
        Map<String, Object> newOffset4 = newOffset();
        SubmittedRecords.SubmittedRecord submit = this.submittedRecords.submit(PARTITION1, newOffset);
        SubmittedRecords.SubmittedRecord submit2 = this.submittedRecords.submit(PARTITION1, newOffset2);
        SubmittedRecords.SubmittedRecord submit3 = this.submittedRecords.submit(PARTITION2, newOffset3);
        SubmittedRecords.SubmittedRecord submit4 = this.submittedRecords.submit(PARTITION2, newOffset4);
        SubmittedRecords.CommittableOffsets committableOffsets = this.submittedRecords.committableOffsets();
        Assert.assertEquals(Collections.emptyMap(), committableOffsets.offsets());
        assertMetadata(committableOffsets, 0, 4, 2, 2, PARTITION1, PARTITION2);
        assertNoEmptyDeques();
        submit2.ack();
        SubmittedRecords.CommittableOffsets committableOffsets2 = this.submittedRecords.committableOffsets();
        Assert.assertEquals(Collections.emptyMap(), committableOffsets2.offsets());
        assertMetadata(committableOffsets2, 0, 4, 2, 2, PARTITION1, PARTITION2);
        assertNoEmptyDeques();
        submit3.ack();
        SubmittedRecords.CommittableOffsets committableOffsets3 = this.submittedRecords.committableOffsets();
        Assert.assertEquals(Collections.singletonMap(PARTITION2, newOffset3), committableOffsets3.offsets());
        assertMetadata(committableOffsets3, 1, 3, 2, 2, PARTITION1);
        assertNoEmptyDeques();
        SubmittedRecords.CommittableOffsets committableOffsets4 = this.submittedRecords.committableOffsets();
        Assert.assertEquals(Collections.emptyMap(), committableOffsets4.offsets());
        assertMetadata(committableOffsets4, 0, 3, 2, 2, PARTITION1);
        assertNoEmptyDeques();
        submit.ack();
        submit4.ack();
        SubmittedRecords.CommittableOffsets committableOffsets5 = this.submittedRecords.committableOffsets();
        HashMap hashMap = new HashMap();
        hashMap.put(PARTITION1, newOffset2);
        hashMap.put(PARTITION2, newOffset4);
        Assert.assertEquals(hashMap, committableOffsets5.offsets());
        assertMetadataNoPending(committableOffsets5, 3);
        assertNoRemainingDeques();
        Assert.assertTrue(this.submittedRecords.committableOffsets().isEmpty());
    }

    @Test
    public void testRemoveLastSubmittedRecord() {
        SubmittedRecords.SubmittedRecord submit = this.submittedRecords.submit(PARTITION1, newOffset());
        SubmittedRecords.CommittableOffsets committableOffsets = this.submittedRecords.committableOffsets();
        Assert.assertEquals(Collections.emptyMap(), committableOffsets.offsets());
        assertMetadata(committableOffsets, 0, 1, 1, 1, PARTITION1);
        Assert.assertTrue("First attempt to remove record from submitted queue should succeed", this.submittedRecords.removeLastOccurrence(submit));
        Assert.assertFalse("Attempt to remove already-removed record from submitted queue should fail", this.submittedRecords.removeLastOccurrence(submit));
        Assert.assertTrue(this.submittedRecords.committableOffsets().isEmpty());
        submit.ack();
        Assert.assertTrue(this.submittedRecords.committableOffsets().isEmpty());
    }

    @Test
    public void testRemoveNotLastSubmittedRecord() {
        Map<String, Object> newOffset = newOffset();
        Map<String, Object> newOffset2 = newOffset();
        SubmittedRecords.SubmittedRecord submit = this.submittedRecords.submit(PARTITION1, newOffset);
        SubmittedRecords.SubmittedRecord submit2 = this.submittedRecords.submit(PARTITION2, newOffset2);
        assertMetadata(this.submittedRecords.committableOffsets(), 0, 2, 2, 1, PARTITION1, PARTITION2);
        assertNoEmptyDeques();
        Assert.assertTrue("First attempt to remove record from submitted queue should succeed", this.submittedRecords.removeLastOccurrence(submit));
        SubmittedRecords.CommittableOffsets committableOffsets = this.submittedRecords.committableOffsets();
        Assert.assertEquals(Collections.emptyMap(), committableOffsets.offsets());
        assertMetadata(committableOffsets, 0, 1, 1, 1, PARTITION2);
        assertNoEmptyDeques();
        assertRemovedDeques(PARTITION1);
        submit.ack();
        SubmittedRecords.CommittableOffsets committableOffsets2 = this.submittedRecords.committableOffsets();
        Assert.assertEquals(Collections.emptyMap(), committableOffsets2.offsets());
        assertMetadata(committableOffsets2, 0, 1, 1, 1, PARTITION2);
        assertNoEmptyDeques();
        submit2.ack();
        SubmittedRecords.CommittableOffsets committableOffsets3 = this.submittedRecords.committableOffsets();
        Assert.assertEquals(Collections.singletonMap(PARTITION2, newOffset2), committableOffsets3.offsets());
        assertMetadata(committableOffsets3, 1, 0, 0, 0, (Map) null);
        Assert.assertFalse(committableOffsets3.hasPending());
        assertNoRemainingDeques();
        Assert.assertTrue(this.submittedRecords.committableOffsets().isEmpty());
    }

    @Test
    public void testNullPartitionAndOffset() {
        SubmittedRecords.SubmittedRecord submit = this.submittedRecords.submit((Map) null, (Map) null);
        assertMetadata(this.submittedRecords.committableOffsets(), 0, 1, 1, 1, (Map) null);
        submit.ack();
        SubmittedRecords.CommittableOffsets committableOffsets = this.submittedRecords.committableOffsets();
        Assert.assertEquals(Collections.singletonMap(null, null), committableOffsets.offsets());
        assertMetadataNoPending(committableOffsets, 1);
        assertNoEmptyDeques();
    }

    @Test
    public void testAwaitMessagesNoneSubmitted() {
        Assert.assertTrue(this.submittedRecords.awaitAllMessages(0L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testAwaitMessagesAfterAllAcknowledged() {
        SubmittedRecords.SubmittedRecord submit = this.submittedRecords.submit(PARTITION1, newOffset());
        Assert.assertFalse(this.submittedRecords.awaitAllMessages(0L, TimeUnit.MILLISECONDS));
        submit.ack();
        Assert.assertTrue(this.submittedRecords.awaitAllMessages(0L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testAwaitMessagesAfterAllRemoved() {
        SubmittedRecords.SubmittedRecord submit = this.submittedRecords.submit(PARTITION1, newOffset());
        SubmittedRecords.SubmittedRecord submit2 = this.submittedRecords.submit(PARTITION1, newOffset());
        Assert.assertFalse("Await should fail since neither of the in-flight records has been removed so far", this.submittedRecords.awaitAllMessages(0L, TimeUnit.MILLISECONDS));
        this.submittedRecords.removeLastOccurrence(submit);
        Assert.assertFalse("Await should fail since only one of the two submitted records has been removed so far", this.submittedRecords.awaitAllMessages(0L, TimeUnit.MILLISECONDS));
        this.submittedRecords.removeLastOccurrence(submit);
        Assert.assertFalse("Await should fail since only one of the two submitted records has been removed so far, even though that record has been removed twice", this.submittedRecords.awaitAllMessages(0L, TimeUnit.MILLISECONDS));
        this.submittedRecords.removeLastOccurrence(submit2);
        Assert.assertTrue("Await should succeed since both submitted records have now been removed", this.submittedRecords.awaitAllMessages(0L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testAwaitMessagesTimesOut() {
        this.submittedRecords.submit(PARTITION1, newOffset());
        Assert.assertFalse(this.submittedRecords.awaitAllMessages(10L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testAwaitMessagesReturnsAfterAsynchronousAck() throws Exception {
        SubmittedRecords.SubmittedRecord submit = this.submittedRecords.submit(PARTITION1, newOffset());
        SubmittedRecords.SubmittedRecord submit2 = this.submittedRecords.submit(PARTITION2, newOffset());
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(() -> {
            atomicBoolean.set(this.submittedRecords.awaitAllMessages(5L, TimeUnit.SECONDS));
            countDownLatch.countDown();
        }).start();
        Assert.assertTrue("Should not have finished awaiting message delivery before either in-flight record was acknowledged", countDownLatch.getCount() > 0);
        submit.ack();
        Assert.assertTrue("Should not have finished awaiting message delivery before one in-flight record was acknowledged", countDownLatch.getCount() > 0);
        submit.ack();
        Assert.assertTrue("Should not have finished awaiting message delivery before one in-flight record was acknowledged, even though the other record has been acknowledged twice", countDownLatch.getCount() > 0);
        submit2.ack();
        Assert.assertTrue("Should have finished awaiting message delivery after both in-flight records were acknowledged", countDownLatch.await(1L, TimeUnit.SECONDS));
        Assert.assertTrue("Await of in-flight messages should have succeeded", atomicBoolean.get());
    }

    private void assertNoRemainingDeques() {
        Assert.assertEquals("Internal records map should be completely empty", Collections.emptyMap(), this.submittedRecords.records);
    }

    @SafeVarargs
    private final void assertRemovedDeques(Map<String, ?>... mapArr) {
        for (Map<String, ?> map : mapArr) {
            Assert.assertFalse("Deque for partition " + map + " should have been cleaned up from internal records map", this.submittedRecords.records.containsKey(map));
        }
    }

    private void assertNoEmptyDeques() {
        this.submittedRecords.records.forEach((map, deque) -> {
            Assert.assertFalse("Empty deque for partition " + map + " should have been cleaned up from internal records map", deque.isEmpty());
        });
    }

    private Map<String, Object> newOffset() {
        return Collections.singletonMap("timestamp", Integer.valueOf(this.offset.getAndIncrement()));
    }

    private void assertMetadataNoPending(SubmittedRecords.CommittableOffsets committableOffsets, int i) {
        Assert.assertEquals(i, committableOffsets.numCommittableMessages());
        Assert.assertFalse(committableOffsets.hasPending());
    }

    @SafeVarargs
    private final void assertMetadata(SubmittedRecords.CommittableOffsets committableOffsets, int i, int i2, int i3, int i4, Map<String, Object>... mapArr) {
        Assert.assertEquals(i, committableOffsets.numCommittableMessages());
        Assert.assertEquals(i2, committableOffsets.numUncommittableMessages());
        Assert.assertEquals(i3, committableOffsets.numDeques());
        Assert.assertEquals(i4, committableOffsets.largestDequeSize());
        Assert.assertTrue(Arrays.asList(mapArr).contains(committableOffsets.largestDequePartition()));
    }
}
