/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror;

import java.util.Map;
import java.util.concurrent.Semaphore;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.mirror.OffsetSync;
import org.apache.kafka.connect.mirror.OffsetSyncWriter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class OffsetSyncWriterTest {
    String topicName = "topic";
    KafkaProducer<byte[], byte[]> producer = (KafkaProducer)Mockito.mock(KafkaProducer.class);
    TopicPartition topicPartition = new TopicPartition(this.topicName, 0);

    @Test
    public void testMaybeQueueOffsetSyncs() {
        int maxOffsetLag = 2;
        KafkaProducer producer = (KafkaProducer)Mockito.mock(KafkaProducer.class);
        Semaphore outstandingOffsetSyncs = new Semaphore(1);
        OffsetSyncWriter offsetSyncWriter = new OffsetSyncWriter(producer, this.topicName, outstandingOffsetSyncs, (long)maxOffsetLag);
        offsetSyncWriter.maybeQueueOffsetSyncs(this.topicPartition, 0L, 1L);
        Assertions.assertFalse((boolean)offsetSyncWriter.getDelayedOffsetSyncs().containsKey(this.topicPartition));
        Assertions.assertTrue((boolean)offsetSyncWriter.getPendingOffsetSyncs().containsKey(this.topicPartition));
        Assertions.assertEquals((long)((OffsetSyncWriter.PartitionState)offsetSyncWriter.partitionStates().get((Object)this.topicPartition)).lastSyncDownstreamOffset, (long)1L);
        offsetSyncWriter.maybeQueueOffsetSyncs(this.topicPartition, 1L, 2L);
        Assertions.assertTrue((boolean)offsetSyncWriter.getDelayedOffsetSyncs().containsKey(this.topicPartition));
        Assertions.assertEquals((long)((OffsetSyncWriter.PartitionState)offsetSyncWriter.partitionStates().get((Object)this.topicPartition)).lastSyncDownstreamOffset, (long)1L);
    }

    @Test
    public void testFirePendingOffsetSyncs() {
        int maxOffsetLag = 1;
        Semaphore outstandingOffsetSyncs = new Semaphore(1);
        OffsetSyncWriter offsetSyncWriter = new OffsetSyncWriter(this.producer, this.topicName, outstandingOffsetSyncs, (long)maxOffsetLag);
        offsetSyncWriter.maybeQueueOffsetSyncs(this.topicPartition, 0L, 100L);
        Assertions.assertEquals((long)((OffsetSyncWriter.PartitionState)offsetSyncWriter.partitionStates().get((Object)this.topicPartition)).lastSyncDownstreamOffset, (long)100L);
        offsetSyncWriter.firePendingOffsetSyncs();
        ArgumentCaptor producerCallback = ArgumentCaptor.forClass(Callback.class);
        Mockito.when((Object)this.producer.send((ProducerRecord)ArgumentMatchers.any(), (Callback)producerCallback.capture())).thenAnswer(mockInvocation -> {
            ((Callback)producerCallback.getValue()).onCompletion(null, null);
            return null;
        });
        ((KafkaProducer)Mockito.verify(this.producer, (VerificationMode)Mockito.times((int)1))).send((ProducerRecord)ArgumentMatchers.any(), (Callback)ArgumentMatchers.any());
        offsetSyncWriter.maybeQueueOffsetSyncs(this.topicPartition, 2L, 102L);
        Assertions.assertEquals((long)((OffsetSyncWriter.PartitionState)offsetSyncWriter.partitionStates().get((Object)this.topicPartition)).lastSyncDownstreamOffset, (long)102L);
        offsetSyncWriter.firePendingOffsetSyncs();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.producer});
    }

    @Test
    public void testPromoteDelayedOffsetSyncs() {
        int maxOffsetLag = 50;
        KafkaProducer producer = (KafkaProducer)Mockito.mock(KafkaProducer.class);
        Semaphore outstandingOffsetSyncs = new Semaphore(1);
        OffsetSyncWriter offsetSyncWriter = new OffsetSyncWriter(producer, this.topicName, outstandingOffsetSyncs, (long)maxOffsetLag);
        offsetSyncWriter.maybeQueueOffsetSyncs(this.topicPartition, 0L, 100L);
        offsetSyncWriter.maybeQueueOffsetSyncs(this.topicPartition, 1L, 101L);
        offsetSyncWriter.promoteDelayedOffsetSyncs();
        Assertions.assertTrue((boolean)offsetSyncWriter.getDelayedOffsetSyncs().isEmpty());
        Map pendingOffsetSyncs = offsetSyncWriter.getPendingOffsetSyncs();
        Assertions.assertEquals((int)1, (int)pendingOffsetSyncs.size());
        Assertions.assertEquals((long)1L, (long)((OffsetSync)pendingOffsetSyncs.get(this.topicPartition)).upstreamOffset());
        Assertions.assertEquals((long)101L, (long)((OffsetSync)pendingOffsetSyncs.get(this.topicPartition)).downstreamOffset());
    }
}

