package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.cdc.connectors.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.cdc.connectors.kafka.shaded.org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.class */
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {
    private static final long serialVersionUID = -6272159445203409112L;
    protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
    private final KafkaTopicsDescriptor topicsDescriptor;
    protected final KafkaDeserializationSchema<T> deserializer;
    private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
    private SerializedValue<WatermarkStrategy<T>> watermarkStrategy;
    private OffsetCommitMode offsetCommitMode;
    private final long discoveryIntervalMillis;
    private Map<KafkaTopicPartition, Long> specificStartupOffsets;
    private Long startupOffsetsTimestamp;
    private volatile transient AbstractFetcher<T, ?> kafkaFetcher;
    private volatile transient AbstractPartitionDiscoverer partitionDiscoverer;
    private volatile transient TreeMap<KafkaTopicPartition, Long> restoredState;
    private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
    private volatile transient Thread discoveryLoopThread;
    private final boolean useMetrics;
    private transient Counter successfulCommits;
    private transient Counter failedCommits;
    private transient KafkaCommitCallback offsetCommitCallback;
    private boolean enableCommitOnCheckpoints = true;
    private boolean filterRestoredPartitionsWithCurrentTopicsDescriptor = true;
    private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
    private volatile boolean running = true;

    public FlinkKafkaConsumerBase(List<String> list, Pattern pattern, KafkaDeserializationSchema<T> kafkaDeserializationSchema, long j, boolean z) {
        this.topicsDescriptor = new KafkaTopicsDescriptor(list, pattern);
        this.deserializer = (KafkaDeserializationSchema) Preconditions.checkNotNull(kafkaDeserializationSchema, "valueDeserializer");
        Preconditions.checkArgument(j == Long.MIN_VALUE || j >= 0, "Cannot define a negative value for the topic / partition discovery interval.");
        this.discoveryIntervalMillis = j;
        this.useMetrics = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
            properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        }
    }

    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy) {
        Preconditions.checkNotNull(watermarkStrategy);
        try {
            ClosureCleaner.clean(watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
            this.watermarkStrategy = new SerializedValue<>(watermarkStrategy);
            return this;
        } catch (Exception e) {
            throw new IllegalArgumentException("The given WatermarkStrategy is not serializable", e);
        }
    }

    @Deprecated
    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assignerWithPunctuatedWatermarks) {
        Preconditions.checkNotNull(assignerWithPunctuatedWatermarks);
        if (this.watermarkStrategy != null) {
            throw new IllegalStateException("Some watermark strategy has already been set.");
        }
        try {
            ClosureCleaner.clean(assignerWithPunctuatedWatermarks, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
            return assignTimestampsAndWatermarks((WatermarkStrategy) new AssignerWithPunctuatedWatermarksAdapter.Strategy(assignerWithPunctuatedWatermarks));
        } catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }

    @Deprecated
    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assignerWithPeriodicWatermarks) {
        Preconditions.checkNotNull(assignerWithPeriodicWatermarks);
        if (this.watermarkStrategy != null) {
            throw new IllegalStateException("Some watermark strategy has already been set.");
        }
        try {
            ClosureCleaner.clean(assignerWithPeriodicWatermarks, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
            return assignTimestampsAndWatermarks((WatermarkStrategy) new AssignerWithPeriodicWatermarksAdapter.Strategy(assignerWithPeriodicWatermarks));
        } catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }

    public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean z) {
        this.enableCommitOnCheckpoints = z;
        return this;
    }

    public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
        this.startupMode = StartupMode.EARLIEST;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    public FlinkKafkaConsumerBase<T> setStartFromLatest() {
        this.startupMode = StartupMode.LATEST;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    public FlinkKafkaConsumerBase<T> setStartFromTimestamp(long j) {
        Preconditions.checkArgument(j >= 0, "The provided value for the startup offsets timestamp is invalid.");
        long currentTimeMillis = System.currentTimeMillis();
        Preconditions.checkArgument(j <= currentTimeMillis, "Startup time[%s] must be before current time[%s].", new Object[]{Long.valueOf(j), Long.valueOf(currentTimeMillis)});
        this.startupMode = StartupMode.TIMESTAMP;
        this.startupOffsetsTimestamp = Long.valueOf(j);
        this.specificStartupOffsets = null;
        return this;
    }

    public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
        this.startupMode = StartupMode.GROUP_OFFSETS;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> map) {
        this.startupMode = StartupMode.SPECIFIC_OFFSETS;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = (Map) Preconditions.checkNotNull(map);
        return this;
    }

    public FlinkKafkaConsumerBase<T> disableFilterRestoredPartitionsWithSubscribedTopics() {
        this.filterRestoredPartitionsWithCurrentTopicsDescriptor = false;
        return this;
    }

    public void open(Configuration configuration) throws Exception {
        this.offsetCommitMode = OffsetCommitModes.fromConfiguration(getIsAutoCommitEnabled(), this.enableCommitOnCheckpoints, getRuntimeContext().isCheckpointingEnabled());
        this.partitionDiscoverer = createPartitionDiscoverer(this.topicsDescriptor, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
        this.partitionDiscoverer.open();
        this.subscribedPartitionsToStartOffsets = new HashMap();
        List<KafkaTopicPartition> discoverPartitions = this.partitionDiscoverer.discoverPartitions();
        if (this.restoredState != null) {
            for (KafkaTopicPartition kafkaTopicPartition : discoverPartitions) {
                if (!this.restoredState.containsKey(kafkaTopicPartition)) {
                    this.restoredState.put(kafkaTopicPartition, Long.valueOf(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET));
                }
            }
            for (Map.Entry<KafkaTopicPartition, Long> entry : this.restoredState.entrySet()) {
                if (KafkaTopicPartitionAssigner.assign(entry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks()) == getRuntimeContext().getIndexOfThisSubtask()) {
                    this.subscribedPartitionsToStartOffsets.put(entry.getKey(), entry.getValue());
                }
            }
            if (this.filterRestoredPartitionsWithCurrentTopicsDescriptor) {
                this.subscribedPartitionsToStartOffsets.entrySet().removeIf(entry2 -> {
                    if (this.topicsDescriptor.isMatchingTopic(((KafkaTopicPartition) entry2.getKey()).getTopic())) {
                        return false;
                    }
                    LOG.warn("{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.", entry2.getKey());
                    return true;
                });
            }
            LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(this.subscribedPartitionsToStartOffsets.size()), this.subscribedPartitionsToStartOffsets});
        } else {
            switch (this.startupMode) {
                case SPECIFIC_OFFSETS:
                    if (this.specificStartupOffsets == null) {
                        throw new IllegalStateException("Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + ", but no specific offsets were specified.");
                    }
                    for (KafkaTopicPartition kafkaTopicPartition2 : discoverPartitions) {
                        Long l = this.specificStartupOffsets.get(kafkaTopicPartition2);
                        if (l != null) {
                            this.subscribedPartitionsToStartOffsets.put(kafkaTopicPartition2, Long.valueOf(l.longValue() - 1));
                        } else {
                            this.subscribedPartitionsToStartOffsets.put(kafkaTopicPartition2, Long.valueOf(KafkaTopicPartitionStateSentinel.GROUP_OFFSET));
                        }
                    }
                    break;
                case TIMESTAMP:
                    if (this.startupOffsetsTimestamp == null) {
                        throw new IllegalStateException("Startup mode for the consumer set to " + StartupMode.TIMESTAMP + ", but no startup timestamp was specified.");
                    }
                    for (Map.Entry<KafkaTopicPartition, Long> entry3 : fetchOffsetsWithTimestamp(discoverPartitions, this.startupOffsetsTimestamp.longValue()).entrySet()) {
                        this.subscribedPartitionsToStartOffsets.put(entry3.getKey(), Long.valueOf(entry3.getValue() == null ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET : entry3.getValue().longValue() - 1));
                    }
                    break;
                default:
                    Iterator<KafkaTopicPartition> it = discoverPartitions.iterator();
                    while (it.hasNext()) {
                        this.subscribedPartitionsToStartOffsets.put(it.next(), Long.valueOf(this.startupMode.getStateSentinel()));
                    }
                    break;
            }
            if (!this.subscribedPartitionsToStartOffsets.isEmpty()) {
                switch (this.startupMode) {
                    case SPECIFIC_OFFSETS:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(this.subscribedPartitionsToStartOffsets.size()), this.specificStartupOffsets, this.subscribedPartitionsToStartOffsets.keySet()});
                        ArrayList arrayList = new ArrayList(this.subscribedPartitionsToStartOffsets.size());
                        for (Map.Entry<KafkaTopicPartition, Long> entry4 : this.subscribedPartitionsToStartOffsets.entrySet()) {
                            if (entry4.getValue().longValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                                arrayList.add(entry4.getKey());
                            }
                        }
                        if (arrayList.size() > 0) {
                            LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}; their startup offsets will be defaulted to their committed group offsets in Kafka.", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(arrayList.size()), arrayList});
                            break;
                        }
                        break;
                    case TIMESTAMP:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(this.subscribedPartitionsToStartOffsets.size()), this.startupOffsetsTimestamp, this.subscribedPartitionsToStartOffsets.keySet()});
                        break;
                    case EARLIEST:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(this.subscribedPartitionsToStartOffsets.size()), this.subscribedPartitionsToStartOffsets.keySet()});
                        break;
                    case LATEST:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(this.subscribedPartitionsToStartOffsets.size()), this.subscribedPartitionsToStartOffsets.keySet()});
                        break;
                    case GROUP_OFFSETS:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(this.subscribedPartitionsToStartOffsets.size()), this.subscribedPartitionsToStartOffsets.keySet()});
                        break;
                }
            } else {
                LOG.info("Consumer subtask {} initially has no partitions to read from.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
            }
        }
        this.deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext(), metricGroup -> {
            return metricGroup.addGroup(ClientQuotaEntity.USER);
        }));
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        if (this.subscribedPartitionsToStartOffsets == null) {
            throw new Exception("The partitions were not set for the consumer");
        }
        this.successfulCommits = getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
        this.failedCommits = getRuntimeContext().getMetricGroup().counter("commitsFailed");
        final int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        this.offsetCommitCallback = new KafkaCommitCallback() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.1
            @Override // org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback
            public void onSuccess() {
                FlinkKafkaConsumerBase.this.successfulCommits.inc();
            }

            @Override // org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback
            public void onException(Throwable th) {
                FlinkKafkaConsumerBase.LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", Integer.valueOf(indexOfThisSubtask)), th);
                FlinkKafkaConsumerBase.this.failedCommits.inc();
            }
        };
        if (this.subscribedPartitionsToStartOffsets.isEmpty()) {
            sourceContext.markAsTemporarilyIdle();
        }
        LOG.info("Consumer subtask {} creating fetcher with offsets {}.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), this.subscribedPartitionsToStartOffsets);
        this.kafkaFetcher = createFetcher(sourceContext, this.subscribedPartitionsToStartOffsets, this.watermarkStrategy, getRuntimeContext(), this.offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup("KafkaConsumer"), this.useMetrics);
        if (this.running) {
            if (this.discoveryIntervalMillis == Long.MIN_VALUE) {
                this.kafkaFetcher.runFetchLoop();
            } else {
                runWithPartitionDiscovery();
            }
        }
    }

    private void runWithPartitionDiscovery() throws Exception {
        AtomicReference<Exception> atomicReference = new AtomicReference<>();
        createAndStartDiscoveryLoop(atomicReference);
        this.kafkaFetcher.runFetchLoop();
        this.partitionDiscoverer.wakeup();
        joinDiscoveryLoopThread();
        Exception exc = atomicReference.get();
        if (exc != null) {
            throw new RuntimeException(exc);
        }
    }

    @VisibleForTesting
    void joinDiscoveryLoopThread() throws InterruptedException {
        if (this.discoveryLoopThread != null) {
            this.discoveryLoopThread.join();
        }
    }

    private void createAndStartDiscoveryLoop(AtomicReference<Exception> atomicReference) {
        this.discoveryLoopThread = new Thread(() -> {
            while (this.running) {
                try {
                    try {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Consumer subtask {} is trying to discover new partitions ...", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
                        }
                        try {
                            List<KafkaTopicPartition> discoverPartitions = this.partitionDiscoverer.discoverPartitions();
                            if (this.running && !discoverPartitions.isEmpty()) {
                                this.kafkaFetcher.addDiscoveredPartitions(discoverPartitions);
                            }
                            if (this.running && this.discoveryIntervalMillis != 0) {
                                try {
                                    Thread.sleep(this.discoveryIntervalMillis);
                                } catch (InterruptedException e) {
                                }
                            }
                        } catch (AbstractPartitionDiscoverer.ClosedException | AbstractPartitionDiscoverer.WakeupException e2) {
                        }
                    } catch (Exception e3) {
                        atomicReference.set(e3);
                        if (this.running) {
                            cancel();
                            return;
                        }
                        return;
                    }
                } finally {
                    if (this.running) {
                        cancel();
                    }
                }
            }
        }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
        this.discoveryLoopThread.start();
    }

    public void cancel() {
        this.running = false;
        if (this.discoveryLoopThread != null) {
            if (this.partitionDiscoverer != null) {
                this.partitionDiscoverer.wakeup();
            }
            this.discoveryLoopThread.interrupt();
        }
        if (this.kafkaFetcher != null) {
            this.kafkaFetcher.cancel();
        }
    }

    public void close() throws Exception {
        cancel();
        joinDiscoveryLoopThread();
        Exception exc = null;
        if (this.partitionDiscoverer != null) {
            try {
                this.partitionDiscoverer.close();
            } catch (Exception e) {
                exc = e;
            }
        }
        try {
            super.close();
        } catch (Exception e2) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
        }
        if (exc != null) {
            throw exc;
        }
    }

    public final void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.unionOffsetStates = functionInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME, createStateSerializer(getRuntimeContext().getExecutionConfig())));
        if (!functionInitializationContext.isRestored()) {
            LOG.info("Consumer subtask {} has no restore state.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
            return;
        }
        this.restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
        for (Tuple2 tuple2 : (Iterable) this.unionOffsetStates.get()) {
            this.restoredState.put((KafkaTopicPartition) tuple2.f0, (Long) tuple2.f1);
        }
        LOG.info("Consumer subtask {} restored state: {}.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), this.restoredState);
    }

    public final void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!this.running) {
            LOG.debug("snapshotState() called on closed source");
            return;
        }
        this.unionOffsetStates.clear();
        AbstractFetcher<T, ?> abstractFetcher = this.kafkaFetcher;
        if (abstractFetcher == null) {
            for (Map.Entry<KafkaTopicPartition, Long> entry : this.subscribedPartitionsToStartOffsets.entrySet()) {
                this.unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
            }
            if (this.offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                this.pendingOffsetsToCommit.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), this.restoredState);
            }
        } else {
            HashMap<KafkaTopicPartition, Long> snapshotCurrentState = abstractFetcher.snapshotCurrentState();
            if (this.offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                this.pendingOffsetsToCommit.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), snapshotCurrentState);
            }
            for (Map.Entry<KafkaTopicPartition, Long> entry2 : snapshotCurrentState.entrySet()) {
                this.unionOffsetStates.add(Tuple2.of(entry2.getKey(), entry2.getValue()));
            }
        }
        if (this.offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
            while (this.pendingOffsetsToCommit.size() > 100) {
                this.pendingOffsetsToCommit.remove(0);
            }
        }
    }

    public final void notifyCheckpointComplete(long j) throws Exception {
        if (!this.running) {
            LOG.debug("notifyCheckpointComplete() called on closed source");
            return;
        }
        AbstractFetcher<T, ?> abstractFetcher = this.kafkaFetcher;
        if (abstractFetcher == null) {
            LOG.debug("notifyCheckpointComplete() called on uninitialized source");
            return;
        }
        if (this.offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Long.valueOf(j));
            }
            try {
                int indexOf = this.pendingOffsetsToCommit.indexOf(Long.valueOf(j));
                if (indexOf == -1) {
                    LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Long.valueOf(j));
                    return;
                }
                Map<KafkaTopicPartition, Long> map = (Map) this.pendingOffsetsToCommit.remove(indexOf);
                for (int i = 0; i < indexOf; i++) {
                    this.pendingOffsetsToCommit.remove(0);
                }
                if (map == null || map.size() == 0) {
                    LOG.debug("Consumer subtask {} has empty checkpoint state.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
                } else {
                    abstractFetcher.commitInternalOffsetsToKafka(map, this.offsetCommitCallback);
                }
            } catch (Exception e) {
                if (this.running) {
                    throw e;
                }
            }
        }
    }

    public void notifyCheckpointAborted(long j) {
    }

    protected abstract AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<WatermarkStrategy<T>> serializedValue, StreamingRuntimeContext streamingRuntimeContext, OffsetCommitMode offsetCommitMode, MetricGroup metricGroup, boolean z) throws Exception;

    protected abstract AbstractPartitionDiscoverer createPartitionDiscoverer(KafkaTopicsDescriptor kafkaTopicsDescriptor, int i, int i2);

    protected abstract boolean getIsAutoCommitEnabled();

    protected abstract Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(Collection<KafkaTopicPartition> collection, long j);

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    @VisibleForTesting
    Map<KafkaTopicPartition, Long> getSubscribedPartitionsToStartOffsets() {
        return this.subscribedPartitionsToStartOffsets;
    }

    @VisibleForTesting
    TreeMap<KafkaTopicPartition, Long> getRestoredState() {
        return this.restoredState;
    }

    @VisibleForTesting
    OffsetCommitMode getOffsetCommitMode() {
        return this.offsetCommitMode;
    }

    @VisibleForTesting
    LinkedMap getPendingOffsetsToCommit() {
        return this.pendingOffsetsToCommit;
    }

    @VisibleForTesting
    public boolean getEnableCommitOnCheckpoints() {
        return this.enableCommitOnCheckpoints;
    }

    @VisibleForTesting
    static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateSerializer(ExecutionConfig executionConfig) {
        return new TupleSerializer<>(Tuple2.class, new TypeSerializer[]{new KryoSerializer(KafkaTopicPartition.class, executionConfig), LongSerializer.INSTANCE});
    }
}
