package org.apache.flink.streaming.connectors.kafka;

import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.class */
public class FlinkKafkaConsumerBaseMigrationTest {

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest$DummyFlinkKafkaConsumer.class */
    private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
        private static final long serialVersionUID = 1;
        private final FetcherFactory<T> fetcherFactory;
        private final List<KafkaTopicPartition> partitions;

        DummyFlinkKafkaConsumer(FetcherFactory<T> fetcherFactory, List<KafkaTopicPartition> list) {
            super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema) Mockito.mock(KeyedDeserializationSchema.class));
            this.fetcherFactory = fetcherFactory;
            this.partitions = list;
        }

        protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, List<KafkaTopicPartition> list, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, StreamingRuntimeContext streamingRuntimeContext) throws Exception {
            return this.fetcherFactory.createFetcher();
        }

        protected List<KafkaTopicPartition> getKafkaPartitions(List<String> list) {
            return this.partitions;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest$DummySourceContext.class */
    private static abstract class DummySourceContext implements SourceFunction.SourceContext<String> {
        private final Object lock;

        private DummySourceContext() {
            this.lock = new Object();
        }

        public void collectWithTimestamp(String str, long j) {
        }

        public void emitWatermark(Watermark watermark) {
        }

        public Object getCheckpointLock() {
            return this.lock;
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest$FetcherFactory.class */
    private interface FetcherFactory<T> extends Serializable {
        AbstractFetcher<T, ?> createFetcher();
    }

    private static String getResourceFilename(String str) {
        URL resource = FlinkKafkaConsumerBaseMigrationTest.class.getClassLoader().getResource(str);
        if (resource == null) {
            throw new NullPointerException("Missing snapshot resource.");
        }
        return resource.getFile();
    }

    @Test
    public void testRestoreFromFlink11WithEmptyStateNoPartitions() throws Exception {
        final OneShotLatch oneShotLatch = new OneShotLatch();
        final AbstractFetcher abstractFetcher = (AbstractFetcher) Mockito.mock(AbstractFetcher.class);
        ((AbstractFetcher) Mockito.doAnswer(new Answer() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                oneShotLatch.trigger();
                Assert.fail("This should never be called");
                return null;
            }
        }).when(abstractFetcher)).restoreOffsets(Matchers.anyMapOf(KafkaTopicPartition.class, Long.class));
        ((AbstractFetcher) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m0answer(InvocationOnMock invocationOnMock) throws Throwable {
                oneShotLatch.trigger();
                Assert.fail("This should never be called");
                return null;
            }
        }).when(abstractFetcher)).runFetchLoop();
        final DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer(new FetcherFactory<String>() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.3
            private static final long serialVersionUID = -2803131905656983619L;

            @Override // org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.FetcherFactory
            public AbstractFetcher<String, ?> createFetcher() {
                return abstractFetcher;
            }
        }, Collections.emptyList());
        StreamSource streamSource = new StreamSource(dummyFlinkKafkaConsumer);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(streamSource, 1, 1, 0);
        abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        abstractStreamOperatorTestHarness.setup();
        abstractStreamOperatorTestHarness.initializeStateFromLegacyCheckpoint(getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
        abstractStreamOperatorTestHarness.open();
        final Throwable[] thArr = new Throwable[1];
        Thread thread = new Thread() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    dummyFlinkKafkaConsumer.run(new DummySourceContext() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.4.1
                        public void collect(String str) {
                            oneShotLatch.trigger();
                            Assert.fail("This should never be called.");
                        }

                        @Override // org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.DummySourceContext
                        public void emitWatermark(Watermark watermark) {
                            oneShotLatch.trigger();
                            Assert.assertEquals(Long.MAX_VALUE, watermark.getTimestamp());
                        }
                    });
                } catch (Throwable th) {
                    th.printStackTrace();
                    thArr[0] = th;
                }
            }
        };
        thread.start();
        if (!oneShotLatch.isTriggered()) {
            oneShotLatch.await();
        }
        streamSource.close();
        streamSource.cancel();
        thread.interrupt();
        thread.join();
        Assert.assertNull(thArr[0]);
    }

    @Test
    public void testRestoreFromFlink11WithEmptyStateWithPartitions() throws Exception {
        final OneShotLatch oneShotLatch = new OneShotLatch();
        final AbstractFetcher abstractFetcher = (AbstractFetcher) Mockito.mock(AbstractFetcher.class);
        ((AbstractFetcher) Mockito.doAnswer(new Answer() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.5
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                oneShotLatch.trigger();
                Assert.fail("This should never be called");
                return null;
            }
        }).when(abstractFetcher)).restoreOffsets(Matchers.anyMapOf(KafkaTopicPartition.class, Long.class));
        ((AbstractFetcher) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.6
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m1answer(InvocationOnMock invocationOnMock) throws Throwable {
                oneShotLatch.trigger();
                return null;
            }
        }).when(abstractFetcher)).runFetchLoop();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KafkaTopicPartition("abc", 13));
        arrayList.add(new KafkaTopicPartition("def", 7));
        final DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer(new FetcherFactory<String>() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.7
            private static final long serialVersionUID = -2803131905656983619L;

            @Override // org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.FetcherFactory
            public AbstractFetcher<String, ?> createFetcher() {
                return abstractFetcher;
            }
        }, arrayList);
        StreamSource streamSource = new StreamSource(dummyFlinkKafkaConsumer);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(streamSource, 1, 1, 0);
        abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        abstractStreamOperatorTestHarness.setup();
        abstractStreamOperatorTestHarness.initializeStateFromLegacyCheckpoint(getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
        abstractStreamOperatorTestHarness.open();
        final Throwable[] thArr = new Throwable[1];
        Thread thread = new Thread() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.8
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    dummyFlinkKafkaConsumer.run(new DummySourceContext() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.8.1
                        public void collect(String str) {
                            oneShotLatch.trigger();
                            Assert.fail("This should never be called.");
                        }

                        @Override // org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.DummySourceContext
                        public void emitWatermark(Watermark watermark) {
                            oneShotLatch.trigger();
                            Assert.fail("This should never be called.");
                        }
                    });
                } catch (Throwable th) {
                    th.printStackTrace();
                    thArr[0] = th;
                }
            }
        };
        thread.start();
        if (!oneShotLatch.isTriggered()) {
            oneShotLatch.await();
        }
        streamSource.close();
        thread.join();
        Assert.assertNull(thArr[0]);
    }

    @Test
    public void testRestoreFromFlink11() throws Exception {
        final HashMap hashMap = new HashMap();
        hashMap.put(new KafkaTopicPartition("abc", 13), 16768L);
        hashMap.put(new KafkaTopicPartition("def", 7), 987654321L);
        final OneShotLatch oneShotLatch = new OneShotLatch();
        final AbstractFetcher abstractFetcher = (AbstractFetcher) Mockito.mock(AbstractFetcher.class);
        ((AbstractFetcher) Mockito.doAnswer(new Answer() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.9
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                HashMap hashMap2 = (HashMap) invocationOnMock.getArguments()[0];
                oneShotLatch.trigger();
                Assert.assertEquals(hashMap, hashMap2);
                return null;
            }
        }).when(abstractFetcher)).restoreOffsets(Matchers.anyMapOf(KafkaTopicPartition.class, Long.class));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KafkaTopicPartition("abc", 13));
        arrayList.add(new KafkaTopicPartition("def", 7));
        final DummyFlinkKafkaConsumer dummyFlinkKafkaConsumer = new DummyFlinkKafkaConsumer(new FetcherFactory<String>() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.10
            private static final long serialVersionUID = -2803131905656983619L;

            @Override // org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.FetcherFactory
            public AbstractFetcher<String, ?> createFetcher() {
                return abstractFetcher;
            }
        }, arrayList);
        StreamSource streamSource = new StreamSource(dummyFlinkKafkaConsumer);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness(streamSource, 1, 1, 0);
        abstractStreamOperatorTestHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        abstractStreamOperatorTestHarness.setup();
        abstractStreamOperatorTestHarness.initializeStateFromLegacyCheckpoint(getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot"));
        abstractStreamOperatorTestHarness.open();
        final Throwable[] thArr = new Throwable[1];
        Thread thread = new Thread() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.11
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    dummyFlinkKafkaConsumer.run(new DummySourceContext() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseMigrationTest.11.1
                        public void collect(String str) {
                        }
                    });
                } catch (Throwable th) {
                    th.printStackTrace();
                    thArr[0] = th;
                }
            }
        };
        thread.start();
        if (!oneShotLatch.isTriggered()) {
            oneShotLatch.await();
        }
        streamSource.close();
        thread.join();
        Assert.assertNull(thArr[0]);
    }
}
