package io.confluent.connect.elasticsearch;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;

/* loaded from: input_file:io/confluent/connect/elasticsearch/AsyncOffsetTrackerTest.class */
public class AsyncOffsetTrackerTest {
    private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = (Map) Mockito.mock(Map.class);
    private SinkTaskContext context = (SinkTaskContext) Mockito.mock(SinkTaskContext.class);

    @Test
    public void testHappyPath() {
        AsyncOffsetTracker asyncOffsetTracker = new AsyncOffsetTracker(this.context);
        TopicPartition topicPartition = new TopicPartition("t1", 0);
        Mockito.when(this.context.assignment()).thenReturn(Collections.singleton(topicPartition));
        SinkRecord sinkRecord = sinkRecord(topicPartition, 0L);
        SinkRecord sinkRecord2 = sinkRecord(topicPartition, 1L);
        SinkRecord sinkRecord3 = sinkRecord(topicPartition, 2L);
        OffsetState addPendingRecord = asyncOffsetTracker.addPendingRecord(sinkRecord);
        OffsetState addPendingRecord2 = asyncOffsetTracker.addPendingRecord(sinkRecord2);
        OffsetState addPendingRecord3 = asyncOffsetTracker.addPendingRecord(sinkRecord3);
        Assertions.assertThat(asyncOffsetTracker.offsets(this.currentOffsets)).isEmpty();
        addPendingRecord2.markProcessed();
        Assertions.assertThat(asyncOffsetTracker.offsets(this.currentOffsets)).isEmpty();
        addPendingRecord.markProcessed();
        asyncOffsetTracker.updateOffsets();
        Map offsets = asyncOffsetTracker.offsets(this.currentOffsets);
        Assertions.assertThat(offsets).hasSize(1);
        Assertions.assertThat(((OffsetAndMetadata) offsets.get(topicPartition)).offset()).isEqualTo(2L);
        addPendingRecord3.markProcessed();
        asyncOffsetTracker.updateOffsets();
        Map offsets2 = asyncOffsetTracker.offsets(this.currentOffsets);
        Assertions.assertThat(offsets2).hasSize(1);
        Assertions.assertThat(((OffsetAndMetadata) offsets2.get(topicPartition)).offset()).isEqualTo(3L);
        asyncOffsetTracker.updateOffsets();
        Assertions.assertThat(((OffsetAndMetadata) offsets2.get(topicPartition)).offset()).isEqualTo(3L);
    }

    @Test
    public void testBelowWatermark() {
        AsyncOffsetTracker asyncOffsetTracker = new AsyncOffsetTracker(this.context);
        TopicPartition topicPartition = new TopicPartition("t1", 0);
        Mockito.when(this.context.assignment()).thenReturn(Collections.singleton(topicPartition));
        SinkRecord sinkRecord = sinkRecord(topicPartition, 0L);
        SinkRecord sinkRecord2 = sinkRecord(topicPartition, 1L);
        OffsetState addPendingRecord = asyncOffsetTracker.addPendingRecord(sinkRecord);
        OffsetState addPendingRecord2 = asyncOffsetTracker.addPendingRecord(sinkRecord2);
        addPendingRecord.markProcessed();
        addPendingRecord2.markProcessed();
        asyncOffsetTracker.updateOffsets();
        Assertions.assertThat(((OffsetAndMetadata) asyncOffsetTracker.offsets(this.currentOffsets).get(topicPartition)).offset()).isEqualTo(2L);
        OffsetState addPendingRecord3 = asyncOffsetTracker.addPendingRecord(sinkRecord2);
        asyncOffsetTracker.updateOffsets();
        Assertions.assertThat(((OffsetAndMetadata) asyncOffsetTracker.offsets(this.currentOffsets).get(topicPartition)).offset()).isEqualTo(2L);
        addPendingRecord3.markProcessed();
        asyncOffsetTracker.updateOffsets();
        Assertions.assertThat(((OffsetAndMetadata) asyncOffsetTracker.offsets(this.currentOffsets).get(topicPartition)).offset()).isEqualTo(2L);
    }

    @Test
    public void testBatchRetry() {
        AsyncOffsetTracker asyncOffsetTracker = new AsyncOffsetTracker(this.context);
        TopicPartition topicPartition = new TopicPartition("t1", 0);
        Mockito.when(this.context.assignment()).thenReturn(Collections.singleton(topicPartition));
        SinkRecord sinkRecord = sinkRecord(topicPartition, 0L);
        SinkRecord sinkRecord2 = sinkRecord(topicPartition, 1L);
        asyncOffsetTracker.addPendingRecord(sinkRecord);
        asyncOffsetTracker.addPendingRecord(sinkRecord2).markProcessed();
        asyncOffsetTracker.updateOffsets();
        Assertions.assertThat(asyncOffsetTracker.offsets(this.currentOffsets)).isEmpty();
        OffsetState addPendingRecord = asyncOffsetTracker.addPendingRecord(sinkRecord);
        asyncOffsetTracker.addPendingRecord(sinkRecord2).markProcessed();
        addPendingRecord.markProcessed();
        asyncOffsetTracker.updateOffsets();
        Assertions.assertThat(((OffsetAndMetadata) asyncOffsetTracker.offsets(this.currentOffsets).get(topicPartition)).offset()).isEqualTo(2L);
    }

    @Test
    public void testRebalance() {
        AsyncOffsetTracker asyncOffsetTracker = new AsyncOffsetTracker(this.context);
        TopicPartition topicPartition = new TopicPartition("t1", 0);
        TopicPartition topicPartition2 = new TopicPartition("t2", 0);
        TopicPartition topicPartition3 = new TopicPartition("t3", 0);
        Mockito.when(this.context.assignment()).thenReturn(new HashSet(Arrays.asList(topicPartition, topicPartition2, topicPartition3)));
        asyncOffsetTracker.addPendingRecord(sinkRecord(topicPartition, 0L)).markProcessed();
        asyncOffsetTracker.addPendingRecord(sinkRecord(topicPartition, 1L));
        asyncOffsetTracker.addPendingRecord(sinkRecord(topicPartition2, 0L)).markProcessed();
        Assertions.assertThat(asyncOffsetTracker.numOffsetStateEntries()).isEqualTo(3L);
        asyncOffsetTracker.updateOffsets();
        Assertions.assertThat(asyncOffsetTracker.offsets(this.currentOffsets).size()).isEqualTo(2);
        Assertions.assertThat(asyncOffsetTracker.numOffsetStateEntries()).isEqualTo(1L);
        asyncOffsetTracker.closePartitions(ImmutableList.of(topicPartition, topicPartition3));
        Assertions.assertThat(asyncOffsetTracker.offsets(this.currentOffsets).keySet()).containsExactly(new TopicPartition[]{topicPartition2});
        Assertions.assertThat(asyncOffsetTracker.numOffsetStateEntries()).isEqualTo(0L);
    }

    private SinkRecord sinkRecord(TopicPartition topicPartition, long j) {
        return sinkRecord(topicPartition.topic(), topicPartition.partition(), j);
    }

    private SinkRecord sinkRecord(String str, int i, long j) {
        return new SinkRecord(str, i, (Schema) null, "testKey", (Schema) null, "testValue" + j, j);
    }
}
