/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class FlinkKafkaConsumerBaseFrom12MigrationTest {
    static final HashMap<KafkaTopicPartition, Long> PARTITION_STATE = new HashMap();

    @Ignore
    @Test
    public void writeSnapshot() throws Exception {
        this.writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink1.2-snapshot", PARTITION_STATE);
        HashMap<KafkaTopicPartition, Long> emptyState = new HashMap<KafkaTopicPartition, Long>();
        this.writeSnapshot("src/test/resources/kafka-consumer-migration-test-flink1.2-empty-state-snapshot", emptyState);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeSnapshot(String path, HashMap<KafkaTopicPartition, Long> state) throws Exception {
        OperatorStateHandles snapshot;
        final OneShotLatch latch = new OneShotLatch();
        AbstractFetcher fetcher = (AbstractFetcher)Mockito.mock(AbstractFetcher.class);
        ((AbstractFetcher)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                latch.trigger();
                return null;
            }
        }).when((Object)fetcher)).runFetchLoop();
        Mockito.when((Object)fetcher.snapshotCurrentState()).thenReturn(state);
        ArrayList<KafkaTopicPartition> partitions = new ArrayList<KafkaTopicPartition>(PARTITION_STATE.keySet());
        final DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer(fetcher, partitions);
        StreamSource consumerOperator = new StreamSource(consumerFunction);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)consumerOperator, 1, 1, 0);
        testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        testHarness.setup();
        testHarness.open();
        final Throwable[] error = new Throwable[1];
        Thread runner = new Thread(){

            @Override
            public void run() {
                try {
                    consumerFunction.run(new DummySourceContext(){

                        public void collect(String element) {
                        }
                    });
                }
                catch (Throwable t) {
                    t.printStackTrace();
                    error[0] = t;
                }
            }
        };
        runner.start();
        if (!latch.isTriggered()) {
            latch.await();
        }
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            snapshot = testHarness.snapshot(0L, 0L);
        }
        OperatorSnapshotUtil.writeStateHandle((OperatorStateHandles)snapshot, (String)path);
        consumerOperator.close();
        runner.join();
    }

    @Test
    public void testRestoreFromEmptyStateNoPartitions() throws Exception {
        DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer(Collections.emptyList());
        StreamSource consumerOperator = new StreamSource(consumerFunction);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)consumerOperator, 1, 1, 0);
        testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.readStateHandle((String)OperatorSnapshotUtil.getResourceFilename((String)"kafka-consumer-migration-test-flink1.2-empty-state-snapshot")));
        testHarness.open();
        Assert.assertTrue((consumerFunction.getSubscribedPartitionsToStartOffsets() != null ? 1 : 0) != 0);
        Assert.assertTrue((boolean)consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
        Assert.assertTrue((consumerFunction.getRestoredState() == null ? 1 : 0) != 0);
        consumerOperator.close();
        consumerOperator.cancel();
    }

    @Test
    public void testRestoreFromEmptyStateWithPartitions() throws Exception {
        ArrayList<KafkaTopicPartition> partitions = new ArrayList<KafkaTopicPartition>(PARTITION_STATE.keySet());
        DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer(partitions);
        StreamSource consumerOperator = new StreamSource(consumerFunction);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)consumerOperator, 1, 1, 0);
        testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.readStateHandle((String)OperatorSnapshotUtil.getResourceFilename((String)"kafka-consumer-migration-test-flink1.2-empty-state-snapshot")));
        testHarness.open();
        HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<KafkaTopicPartition, Long>();
        for (KafkaTopicPartition partition : PARTITION_STATE.keySet()) {
            expectedSubscribedPartitionsWithStartOffsets.put(partition, -915623761773L);
        }
        Assert.assertTrue((consumerFunction.getSubscribedPartitionsToStartOffsets() != null ? 1 : 0) != 0);
        Assert.assertTrue((!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty() ? 1 : 0) != 0);
        Assert.assertEquals(expectedSubscribedPartitionsWithStartOffsets, (Object)consumerFunction.getSubscribedPartitionsToStartOffsets());
        Assert.assertTrue((consumerFunction.getRestoredState() == null ? 1 : 0) != 0);
        consumerOperator.close();
        consumerOperator.cancel();
    }

    @Test
    public void testRestore() throws Exception {
        ArrayList<KafkaTopicPartition> partitions = new ArrayList<KafkaTopicPartition>(PARTITION_STATE.keySet());
        DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer(partitions);
        StreamSource consumerOperator = new StreamSource(consumerFunction);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)consumerOperator, 1, 1, 0);
        testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        testHarness.setup();
        testHarness.initializeState(OperatorSnapshotUtil.readStateHandle((String)OperatorSnapshotUtil.getResourceFilename((String)"kafka-consumer-migration-test-flink1.2-snapshot")));
        testHarness.open();
        Assert.assertTrue((consumerFunction.getSubscribedPartitionsToStartOffsets() != null ? 1 : 0) != 0);
        Assert.assertTrue((!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty() ? 1 : 0) != 0);
        Assert.assertEquals(PARTITION_STATE, (Object)consumerFunction.getSubscribedPartitionsToStartOffsets());
        Assert.assertTrue((consumerFunction.getRestoredState() != null ? 1 : 0) != 0);
        Assert.assertEquals(PARTITION_STATE, (Object)consumerFunction.getRestoredState());
        consumerOperator.close();
        consumerOperator.cancel();
    }

    static {
        PARTITION_STATE.put(new KafkaTopicPartition("abc", 13), 16768L);
        PARTITION_STATE.put(new KafkaTopicPartition("def", 7), 987654321L);
    }

    private static abstract class DummySourceContext
    implements SourceFunction.SourceContext<String> {
        private final Object lock = new Object();

        private DummySourceContext() {
        }

        public void collectWithTimestamp(String element, long timestamp) {
        }

        public void emitWatermark(Watermark mark) {
        }

        public Object getCheckpointLock() {
            return this.lock;
        }

        public void close() {
        }

        public void markAsTemporarilyIdle() {
        }
    }

    private static class DummyFlinkKafkaConsumer<T>
    extends FlinkKafkaConsumerBase<T> {
        private static final long serialVersionUID = 1L;
        private final List<KafkaTopicPartition> partitions;
        private final AbstractFetcher<T, ?> fetcher;

        DummyFlinkKafkaConsumer(AbstractFetcher<T, ?> fetcher, List<KafkaTopicPartition> partitions) {
            super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema)Mockito.mock(KeyedDeserializationSchema.class));
            this.fetcher = fetcher;
            this.partitions = partitions;
        }

        DummyFlinkKafkaConsumer(List<KafkaTopicPartition> partitions) {
            super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema)Mockito.mock(KeyedDeserializationSchema.class));
            this.fetcher = (AbstractFetcher)Mockito.mock(AbstractFetcher.class);
            this.partitions = partitions;
        }

        protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode) throws Exception {
            return this.fetcher;
        }

        protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
            return this.partitions;
        }

        protected boolean getIsAutoCommitEnabled() {
            return false;
        }
    }
}

