/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class FlinkKafkaConsumerBaseFrom11MigrationTest {
    @Test
    public void testRestoreFromFlink11WithEmptyStateNoPartitions() throws Exception {
        DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer(Collections.emptyList());
        StreamSource consumerOperator = new StreamSource(consumerFunction);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)consumerOperator, 1, 1, 0);
        testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        testHarness.setup();
        testHarness.initializeStateFromLegacyCheckpoint(FlinkKafkaConsumerBaseFrom11MigrationTest.getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot"));
        testHarness.open();
        Assert.assertTrue((consumerFunction.getSubscribedPartitionsToStartOffsets() != null ? 1 : 0) != 0);
        Assert.assertTrue((boolean)consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
        Assert.assertTrue((consumerFunction.getRestoredState() == null ? 1 : 0) != 0);
        consumerOperator.close();
        consumerOperator.cancel();
    }

    @Test
    public void testRestoreFromFlink11WithEmptyStateWithPartitions() throws Exception {
        ArrayList<KafkaTopicPartition> partitions = new ArrayList<KafkaTopicPartition>();
        partitions.add(new KafkaTopicPartition("abc", 13));
        partitions.add(new KafkaTopicPartition("def", 7));
        DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer(partitions);
        StreamSource consumerOperator = new StreamSource(consumerFunction);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)consumerOperator, 1, 1, 0);
        testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        testHarness.setup();
        testHarness.initializeStateFromLegacyCheckpoint(FlinkKafkaConsumerBaseFrom11MigrationTest.getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot"));
        testHarness.open();
        HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<KafkaTopicPartition, Long>();
        expectedSubscribedPartitionsWithStartOffsets.put(new KafkaTopicPartition("abc", 13), -915623761773L);
        expectedSubscribedPartitionsWithStartOffsets.put(new KafkaTopicPartition("def", 7), -915623761773L);
        Assert.assertTrue((consumerFunction.getSubscribedPartitionsToStartOffsets() != null ? 1 : 0) != 0);
        Assert.assertTrue((!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty() ? 1 : 0) != 0);
        Assert.assertEquals(expectedSubscribedPartitionsWithStartOffsets, (Object)consumerFunction.getSubscribedPartitionsToStartOffsets());
        Assert.assertTrue((consumerFunction.getRestoredState() == null ? 1 : 0) != 0);
        consumerOperator.close();
        consumerOperator.cancel();
    }

    @Test
    public void testRestoreFromFlink11() throws Exception {
        ArrayList<KafkaTopicPartition> partitions = new ArrayList<KafkaTopicPartition>();
        partitions.add(new KafkaTopicPartition("abc", 13));
        partitions.add(new KafkaTopicPartition("def", 7));
        DummyFlinkKafkaConsumer consumerFunction = new DummyFlinkKafkaConsumer(partitions);
        StreamSource consumerOperator = new StreamSource(consumerFunction);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)consumerOperator, 1, 1, 0);
        testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        testHarness.setup();
        testHarness.initializeStateFromLegacyCheckpoint(FlinkKafkaConsumerBaseFrom11MigrationTest.getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot"));
        testHarness.open();
        HashMap<KafkaTopicPartition, Long> expectedState = new HashMap<KafkaTopicPartition, Long>();
        expectedState.put(new KafkaTopicPartition("abc", 13), 16768L);
        expectedState.put(new KafkaTopicPartition("def", 7), 987654321L);
        Assert.assertTrue((consumerFunction.getSubscribedPartitionsToStartOffsets() != null ? 1 : 0) != 0);
        Assert.assertTrue((!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty() ? 1 : 0) != 0);
        Assert.assertEquals(expectedState, (Object)consumerFunction.getSubscribedPartitionsToStartOffsets());
        Assert.assertTrue((consumerFunction.getRestoredState() != null ? 1 : 0) != 0);
        Assert.assertEquals(expectedState, (Object)consumerFunction.getRestoredState());
        consumerOperator.close();
        consumerOperator.cancel();
    }

    private static String getResourceFilename(String filename) {
        ClassLoader cl = FlinkKafkaConsumerBaseFrom11MigrationTest.class.getClassLoader();
        URL resource = cl.getResource(filename);
        if (resource == null) {
            throw new NullPointerException("Missing snapshot resource.");
        }
        return resource.getFile();
    }

    private static class DummyFlinkKafkaConsumer<T>
    extends FlinkKafkaConsumerBase<T> {
        private static final long serialVersionUID = 1L;
        private final List<KafkaTopicPartition> partitions;

        DummyFlinkKafkaConsumer(List<KafkaTopicPartition> partitions) {
            super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema)Mockito.mock(KeyedDeserializationSchema.class));
            this.partitions = partitions;
        }

        protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode) throws Exception {
            return (AbstractFetcher)Mockito.mock(AbstractFetcher.class);
        }

        protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
            return this.partitions;
        }

        protected boolean getIsAutoCommitEnabled() {
            return false;
        }
    }
}

