/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror;

import java.util.OptionalLong;
import java.util.PrimitiveIterator;
import java.util.Random;
import java.util.stream.LongStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.mirror.OffsetSync;
import org.apache.kafka.connect.mirror.OffsetSyncStore;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class OffsetSyncStoreTest {
    static TopicPartition tp = new TopicPartition("topic1", 2);

    @Test
    public void testOffsetTranslation() {
        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore();){
            store.start(true);
            store.sync(tp, 100L, 200L);
            Assertions.assertEquals((Object)OptionalLong.of(201L), (Object)store.translateDownstream(null, tp, 150L));
            store.sync(tp, 150L, 251L);
            Assertions.assertEquals((Object)OptionalLong.of(251L), (Object)store.translateDownstream(null, tp, 150L));
            Assertions.assertEquals((Object)OptionalLong.of(-1L), (Object)store.translateDownstream(null, tp, 5L));
            store.sync(tp, 200L, 10L);
            Assertions.assertEquals((Object)OptionalLong.of(10L), (Object)store.translateDownstream(null, tp, 200L));
            store.sync(tp, 20L, 20L);
            Assertions.assertEquals((Object)OptionalLong.of(20L), (Object)store.translateDownstream(null, tp, 20L));
        }
    }

    @Test
    public void testNoTranslationIfStoreNotStarted() {
        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore(){

            @Override
            void backingStoreStart() {
                this.sync(tp, 100L, 200L);
                Assertions.assertEquals((Object)OptionalLong.empty(), (Object)this.translateDownstream(null, tp, 0L));
                Assertions.assertEquals((Object)OptionalLong.empty(), (Object)this.translateDownstream(null, tp, 100L));
                Assertions.assertEquals((Object)OptionalLong.empty(), (Object)this.translateDownstream(null, tp, 200L));
            }
        };){
            Assertions.assertEquals((Object)OptionalLong.empty(), (Object)store.translateDownstream(null, tp, 0L));
            Assertions.assertEquals((Object)OptionalLong.empty(), (Object)store.translateDownstream(null, tp, 100L));
            Assertions.assertEquals((Object)OptionalLong.empty(), (Object)store.translateDownstream(null, tp, 200L));
            store.start(true);
            Assertions.assertEquals((Object)OptionalLong.of(-1L), (Object)store.translateDownstream(null, tp, 0L));
            Assertions.assertEquals((Object)OptionalLong.of(200L), (Object)store.translateDownstream(null, tp, 100L));
            Assertions.assertEquals((Object)OptionalLong.of(201L), (Object)store.translateDownstream(null, tp, 200L));
        }
    }

    @Test
    public void testNoTranslationIfNoOffsetSync() {
        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore();){
            store.start(true);
            Assertions.assertEquals((Object)OptionalLong.empty(), (Object)store.translateDownstream(null, tp, 0L));
        }
    }

    @Test
    public void testPastOffsetTranslation() {
        final int maxOffsetLag = 10;
        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore(){

            @Override
            void backingStoreStart() {
                for (int offset = 0; offset <= 1000; offset += maxOffsetLag) {
                    this.sync(tp, offset, offset);
                    OffsetSyncStoreTest.this.assertSparseSyncInvariant(this, tp);
                }
            }
        };){
            store.start(true);
            this.assertSparseSync(store, 1000L, -1L);
            for (int offset = 1000 + maxOffsetLag; offset <= 10000; offset += maxOffsetLag) {
                store.sync(tp, offset, offset);
                this.assertSparseSyncInvariant(store, tp);
            }
            this.assertSparseSync(store, 1000L, -1L);
            this.assertSparseSync(store, 4840L, 1000L);
            this.assertSparseSync(store, 6760L, 4840L);
            this.assertSparseSync(store, 8680L, 6760L);
            this.assertSparseSync(store, 9160L, 8680L);
            this.assertSparseSync(store, 9640L, 9160L);
            this.assertSparseSync(store, 9880L, 9640L);
            this.assertSparseSync(store, 9940L, 9880L);
            this.assertSparseSync(store, 9970L, 9940L);
            this.assertSparseSync(store, 9990L, 9970L);
            this.assertSparseSync(store, 10000L, 9990L);
            store.sync(tp, 1500L, 11000L);
            this.assertSparseSyncInvariant(store, tp);
            Assertions.assertEquals((Object)OptionalLong.of(-1L), (Object)store.translateDownstream(null, tp, 1499L));
            Assertions.assertEquals((Object)OptionalLong.of(11000L), (Object)store.translateDownstream(null, tp, 1500L));
            Assertions.assertEquals((Object)OptionalLong.of(11001L), (Object)store.translateDownstream(null, tp, 2000L));
        }
    }

    @Test
    public void testPastOffsetTranslationWithoutInitializationReadToEnd() {
        int maxOffsetLag = 10;
        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore(){

            @Override
            void backingStoreStart() {
                for (int offset = 0; offset <= 1000; offset += 10) {
                    this.sync(tp, offset, offset);
                    OffsetSyncStoreTest.this.assertSparseSyncInvariant(this, tp);
                }
            }
        };){
            store.start(false);
            this.assertSparseSync(store, 480L, 0L);
            this.assertSparseSync(store, 720L, 480L);
            this.assertSparseSync(store, 1000L, 990L);
            for (int offset = 1000; offset <= 10000; offset += 10) {
                store.sync(tp, offset, offset);
                this.assertSparseSyncInvariant(store, tp);
            }
            this.assertSparseSync(store, 3840L, 0L);
            this.assertSparseSync(store, 7680L, 3840L);
            this.assertSparseSync(store, 8640L, 7680L);
            this.assertSparseSync(store, 9120L, 8640L);
            this.assertSparseSync(store, 9600L, 9120L);
            this.assertSparseSync(store, 9840L, 9600L);
            this.assertSparseSync(store, 9900L, 9840L);
            this.assertSparseSync(store, 9960L, 9900L);
            this.assertSparseSync(store, 9990L, 9960L);
            this.assertSparseSync(store, 10000L, 9990L);
            store.sync(tp, 1500L, 11000L);
            this.assertSparseSyncInvariant(store, tp);
            Assertions.assertEquals((Object)OptionalLong.of(-1L), (Object)store.translateDownstream(null, tp, 1499L));
            Assertions.assertEquals((Object)OptionalLong.of(11000L), (Object)store.translateDownstream(null, tp, 1500L));
            Assertions.assertEquals((Object)OptionalLong.of(11001L), (Object)store.translateDownstream(null, tp, 2000L));
        }
    }

    @Test
    public void testConsistentlySpacedSyncs() {
        long iterations = 100L;
        long maxStep = Long.MAX_VALUE / iterations;
        long step = 1L;
        while (step < maxStep) {
            for (long firstOffset = 0L; firstOffset < 30L; ++firstOffset) {
                long finalStep = step;
                this.assertSyncSpacingHasBoundedExpirations(firstOffset, LongStream.generate(() -> finalStep).limit(iterations), 1);
            }
            step = step * 2L + 1L;
        }
    }

    @Test
    public void testRandomlySpacedSyncs() {
        Random random = new Random(0L);
        int iterationBits = 10;
        long iterations = 1 << iterationBits;
        for (int n = 1; n < 64 - iterationBits; ++n) {
            long maximumDifference = 1L << n;
            int maximumExpirations = n + 2;
            this.assertSyncSpacingHasBoundedExpirations(0L, random.longs(iterations, 0L, maximumDifference), maximumExpirations);
            long offsetLagMax = 65536L;
            this.assertSyncSpacingHasBoundedExpirations(0L, random.longs(iterations, offsetLagMax, offsetLagMax + maximumDifference), maximumExpirations);
        }
    }

    @Test
    public void testDroppedSyncsSpacing() {
        Random random = new Random(0L);
        long iterations = 10000L;
        long offsetLagMax = 100L;
        LongStream stream = random.doubles().mapToLong(d -> (long)(d < 0.5 ? 2 : 1) * offsetLagMax).limit(iterations);
        this.assertSyncSpacingHasBoundedExpirations(0L, stream, 2);
    }

    private void assertSyncSpacingHasBoundedExpirations(long firstOffset, LongStream steps, int maximumExpirations) {
        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore();){
            store.start(true);
            store.sync(tp, firstOffset, firstOffset);
            PrimitiveIterator.OfLong iterator = steps.iterator();
            long offset = firstOffset;
            int lastCount = 1;
            while (iterator.hasNext()) {
                Assertions.assertTrue(((offset += iterator.nextLong()) >= 0L ? 1 : 0) != 0, (String)"Test is invalid, offset overflowed");
                store.sync(tp, offset, offset);
                Assertions.assertEquals((long)offset, (long)store.syncFor(tp, 0).upstreamOffset());
                Assertions.assertEquals((long)firstOffset, (long)store.syncFor(tp, 63).upstreamOffset());
                int count = this.countDistinctStoredSyncs(store, tp);
                int expiredSyncs = lastCount - count + 1;
                Assertions.assertTrue((expiredSyncs <= maximumExpirations ? 1 : 0) != 0, (String)("Store expired too many syncs: " + expiredSyncs + " > " + maximumExpirations + " after receiving offset " + offset));
                lastCount = count;
            }
        }
    }

    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, long previousOffset) {
        Assertions.assertEquals((Object)OptionalLong.of(previousOffset == -1L ? previousOffset : previousOffset + 1L), (Object)store.translateDownstream(null, tp, syncOffset - 1L));
        Assertions.assertEquals((Object)OptionalLong.of(syncOffset), (Object)store.translateDownstream(null, tp, syncOffset));
        Assertions.assertEquals((Object)OptionalLong.of(syncOffset + 1L), (Object)store.translateDownstream(null, tp, syncOffset + 1L));
        Assertions.assertEquals((Object)OptionalLong.of(syncOffset + 1L), (Object)store.translateDownstream(null, tp, syncOffset + 2L));
    }

    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, TopicPartition topicPartition) {
        int count = 1;
        for (int i = 1; i < 64; ++i) {
            if (store.syncFor(topicPartition, i - 1) == store.syncFor(topicPartition, i)) continue;
            ++count;
        }
        return count;
    }

    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, TopicPartition topicPartition) {
        for (int j = 0; j < 64; ++j) {
            for (int i = 0; i < j; ++i) {
                int exponent;
                long iUpstreamLowerBound;
                long iUpstream;
                long jUpstream = store.syncFor(topicPartition, j).upstreamOffset();
                if (jUpstream == (iUpstream = store.syncFor(topicPartition, i).upstreamOffset()) || (iUpstreamLowerBound = jUpstream + (1L << (exponent = Math.max(i - 2, 0)))) < 0L) continue;
                Assertions.assertTrue((iUpstream >= iUpstreamLowerBound ? 1 : 0) != 0, (String)("Invariant C(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i + " should be at least " + iUpstreamLowerBound + " (" + jUpstream + " + 2^" + exponent + ")"));
                long iUpstreamUpperBound = jUpstream + (1L << j) - (1L << i);
                if (iUpstreamUpperBound < 0L) continue;
                Assertions.assertTrue((iUpstream <= iUpstreamUpperBound ? 1 : 0) != 0, (String)("Invariant B(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i + " should be no greater than " + iUpstreamUpperBound + " (" + jUpstream + " + 2^" + j + " - 2^" + i + ")"));
            }
        }
    }

    static class FakeOffsetSyncStore
    extends OffsetSyncStore {
        private boolean startCalled = false;

        FakeOffsetSyncStore() {
        }

        public void start(boolean initializationMustReadToEnd) {
            this.startCalled = true;
            super.start(initializationMustReadToEnd);
        }

        void backingStoreStart() {
        }

        void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
            Assertions.assertTrue((boolean)this.startCalled);
            OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
            byte[] key = offsetSync.recordKey();
            byte[] value = offsetSync.recordValue();
            ConsumerRecord record = new ConsumerRecord("test.offsets.internal", 0, 3L, (Object)key, (Object)value);
            this.handleRecord(record);
        }
    }
}

