package org.apache.flink.streaming.connectors.kafka.internals;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.class */
public class AbstractFetcherTest {

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest$TestFetcher.class */
    private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
        Map<KafkaTopicPartition, Long> lastCommittedOffsets;
        private final OneShotLatch fetchLoopWaitLatch;
        private final OneShotLatch stateIterationBlockLatch;

        TestFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<WatermarkStrategy<T>> serializedValue, ProcessingTimeService processingTimeService, long j) throws Exception {
            this(sourceContext, map, serializedValue, processingTimeService, j, null, null);
        }

        TestFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<WatermarkStrategy<T>> serializedValue, ProcessingTimeService processingTimeService, long j, OneShotLatch oneShotLatch, OneShotLatch oneShotLatch2) throws Exception {
            super(sourceContext, map, serializedValue, processingTimeService, j, TestFetcher.class.getClassLoader(), new UnregisteredMetricsGroup(), false);
            this.lastCommittedOffsets = null;
            this.fetchLoopWaitLatch = oneShotLatch;
            this.stateIterationBlockLatch = oneShotLatch2;
        }

        public void runFetchLoop() throws Exception {
            if (this.fetchLoopWaitLatch == null) {
                throw new UnsupportedOperationException();
            }
            for (KafkaTopicPartitionState kafkaTopicPartitionState : subscribedPartitionStates()) {
                this.fetchLoopWaitLatch.trigger();
                this.stateIterationBlockLatch.await();
            }
        }

        public void cancel() {
            throw new UnsupportedOperationException();
        }

        public Object createKafkaPartitionHandle(KafkaTopicPartition kafkaTopicPartition) {
            return new Object();
        }

        protected void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> map, @Nonnull KafkaCommitCallback kafkaCommitCallback) {
            this.lastCommittedOffsets = map;
            kafkaCommitCallback.onSuccess();
        }

        public Optional<Map<KafkaTopicPartition, Long>> getLastCommittedOffsets() {
            return Optional.ofNullable(this.lastCommittedOffsets);
        }
    }

    @Test
    public void testIgnorePartitionStateSentinelInSnapshot() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(new KafkaTopicPartition("test topic name", 1), -915623761774L);
        hashMap.put(new KafkaTopicPartition("test topic name", 2), -915623761773L);
        hashMap.put(new KafkaTopicPartition("test topic name", 3), -915623761775L);
        TestSourceContext testSourceContext = new TestSourceContext();
        TestFetcher testFetcher = new TestFetcher(testSourceContext, hashMap, null, new TestProcessingTimeService(), 0L);
        synchronized (testSourceContext.getCheckpointLock()) {
            testFetcher.commitInternalOffsetsToKafka(testFetcher.snapshotCurrentState(), new KafkaCommitCallback() { // from class: org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcherTest.1
                public void onSuccess() {
                }

                public void onException(Throwable th) {
                    throw new RuntimeException("Callback failed", th);
                }
            });
            Assert.assertTrue(testFetcher.getLastCommittedOffsets().isPresent());
            Assert.assertEquals(Collections.emptyMap(), testFetcher.getLastCommittedOffsets().get());
        }
    }

    @Test
    public void testSkipCorruptedRecord() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(new KafkaTopicPartition("test topic name", 1), -915623761774L);
        TestSourceContext testSourceContext = new TestSourceContext();
        TestFetcher testFetcher = new TestFetcher(testSourceContext, hashMap, null, new TestProcessingTimeService(), 0L);
        KafkaTopicPartitionState kafkaTopicPartitionState = (KafkaTopicPartitionState) testFetcher.subscribedPartitionStates().get(0);
        emitRecord(testFetcher, 1L, kafkaTopicPartitionState, 1L);
        emitRecord(testFetcher, 2L, kafkaTopicPartitionState, 2L);
        Assert.assertEquals(2L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
        Assert.assertEquals(2L, kafkaTopicPartitionState.getOffset());
        testFetcher.emitRecordsWithTimestamps(emptyQueue(), kafkaTopicPartitionState, 3L, Long.MIN_VALUE);
        Assert.assertEquals(2L, ((Long) testSourceContext.getLatestElement().getValue()).longValue());
        Assert.assertEquals(3L, kafkaTopicPartitionState.getOffset());
    }

    @Test
    public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception {
        KafkaTopicPartition kafkaTopicPartition = new KafkaTopicPartition("test", 42);
        TestSourceContext testSourceContext = new TestSourceContext();
        Map singletonMap = Collections.singletonMap(kafkaTopicPartition, -915623761773L);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        final TestFetcher testFetcher = new TestFetcher(testSourceContext, singletonMap, null, new TestProcessingTimeService(), 10L, oneShotLatch, oneShotLatch2);
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcherTest.2
            public void go() throws Exception {
                testFetcher.runFetchLoop();
            }
        };
        checkedThread.start();
        oneShotLatch.await();
        testFetcher.addDiscoveredPartitions(Collections.singletonList(kafkaTopicPartition));
        oneShotLatch2.trigger();
        checkedThread.sync();
    }

    private static <T, KPH> void emitRecord(AbstractFetcher<T, KPH> abstractFetcher, T t, KafkaTopicPartitionState<T, KPH> kafkaTopicPartitionState, long j) {
        ArrayDeque arrayDeque = new ArrayDeque();
        arrayDeque.add(t);
        abstractFetcher.emitRecordsWithTimestamps(arrayDeque, kafkaTopicPartitionState, j, Long.MIN_VALUE);
    }

    private static <T> Queue<T> emptyQueue() {
        return new ArrayDeque();
    }
}
