/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
import org.apache.flink.streaming.connectors.pulsar.internal.MessageIdSerializer;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSourceStateSerializer;
import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscription;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer;
import org.apache.flink.streaming.connectors.pulsar.serialization.PulsarDeserializationSchema;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkPulsarSource<T>
extends RichParallelSourceFunction<T>
implements ResultTypeQueryable<T>,
CheckpointListener,
CheckpointedFunction {
    private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSource.class);
    private static final long serialVersionUID = -6080350107046202906L;
    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
    private static final String OFFSETS_STATE_NAME_V3 = "topic-offset-states";
    protected String adminUrl;
    protected ClientConfigurationData clientConfigurationData;
    protected final Map<String, String> caseInsensitiveParams;
    protected final Map<String, Object> readerConf;
    protected volatile PulsarDeserializationSchema<T> deserializer;
    private Map<TopicRange, MessageId> ownedTopicStarts;
    private SerializedValue<WatermarkStrategy<T>> watermarkStrategy;
    private final long discoveryIntervalMillis;
    protected final int pollTimeoutMs;
    protected final int commitMaxRetries;
    private StartupMode startupMode = StartupMode.LATEST;
    private transient Map<TopicRange, MessageId> specificStartupOffsets;
    private String externalSubscriptionName;
    private MessageId subscriptionPosition = MessageId.latest;
    private Map<TopicRange, byte[]> specificStartupOffsetsAsBytes;
    protected final Properties properties;
    protected final UUID uuid = UUID.randomUUID();
    private final LinkedHashMap<Long, Map<TopicRange, MessageId>> pendingOffsetsToCommit = new LinkedHashMap();
    private volatile transient PulsarFetcher<T> pulsarFetcher;
    protected volatile transient PulsarMetadataReader metadataReader;
    private volatile transient TreeMap<TopicRange, MessageId> restoredState;
    private volatile transient Set<TopicRange> excludeStartMessageIds;
    private transient ListState<Tuple2<TopicSubscription, MessageId>> unionOffsetStates;
    private int oldStateVersion = 2;
    private volatile boolean stateSubEqualexternalSub = false;
    private volatile transient Thread discoveryLoopThread;
    private volatile boolean running = true;
    private final boolean useMetrics;
    private transient Counter successfulCommits;
    private transient Counter failedCommits;
    private transient PulsarCommitCallback offsetCommitCallback;
    private transient int taskIndex;
    private transient int numParallelTasks;
    private long startupOffsetsTimestamp = -1L;

    public FlinkPulsarSource(String adminUrl, ClientConfigurationData clientConf, PulsarDeserializationSchema<T> deserializer, Properties properties) {
        this.adminUrl = (String)Preconditions.checkNotNull((Object)adminUrl);
        this.clientConfigurationData = (ClientConfigurationData)Preconditions.checkNotNull((Object)clientConf);
        this.deserializer = deserializer;
        this.properties = properties;
        this.caseInsensitiveParams = SourceSinkUtils.validateStreamSourceOptions((Map<String, String>)Maps.fromProperties((Properties)properties));
        this.readerConf = SourceSinkUtils.getReaderParams((Map<String, String>)Maps.fromProperties((Properties)properties));
        this.discoveryIntervalMillis = SourceSinkUtils.getPartitionDiscoveryIntervalInMillis(this.caseInsensitiveParams);
        this.pollTimeoutMs = SourceSinkUtils.getPollTimeoutMs(this.caseInsensitiveParams);
        this.commitMaxRetries = SourceSinkUtils.getCommitMaxRetries(this.caseInsensitiveParams);
        this.useMetrics = SourceSinkUtils.getUseMetrics(this.caseInsensitiveParams);
        CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(this.caseInsensitiveParams));
        if (this.clientConfigurationData.getServiceUrl() == null) {
            throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration");
        }
        this.oldStateVersion = SourceSinkUtils.getOldStateVersion(this.caseInsensitiveParams, this.oldStateVersion);
    }

    public FlinkPulsarSource(String serviceUrl, String adminUrl, PulsarDeserializationSchema<T> deserializer, Properties properties) {
        this(adminUrl, PulsarClientUtils.newClientConf((String)Preconditions.checkNotNull((Object)serviceUrl), properties), deserializer, properties);
    }

    public FlinkPulsarSource(String serviceUrl, String adminUrl, DeserializationSchema<T> deserializer, Properties properties) {
        this(adminUrl, PulsarClientUtils.newClientConf((String)Preconditions.checkNotNull((Object)serviceUrl), properties), PulsarDeserializationSchema.valueOnly(deserializer), properties);
    }

    @Deprecated
    public FlinkPulsarSource<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
        Preconditions.checkNotNull(assigner);
        if (this.watermarkStrategy != null) {
            throw new IllegalStateException("Some watermark strategy has already been set.");
        }
        try {
            ClosureCleaner.clean(assigner, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
            AssignerWithPunctuatedWatermarksAdapter.Strategy wms = new AssignerWithPunctuatedWatermarksAdapter.Strategy(assigner);
            return this.assignTimestampsAndWatermarks((WatermarkStrategy<T>)wms);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }

    @Deprecated
    public FlinkPulsarSource<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
        Preconditions.checkNotNull(assigner);
        if (this.watermarkStrategy != null) {
            throw new IllegalStateException("Some watermark strategy has already been set.");
        }
        try {
            ClosureCleaner.clean(assigner, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
            AssignerWithPeriodicWatermarksAdapter.Strategy wms = new AssignerWithPeriodicWatermarksAdapter.Strategy(assigner);
            return this.assignTimestampsAndWatermarks((WatermarkStrategy<T>)wms);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }

    public FlinkPulsarSource<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy) {
        Preconditions.checkNotNull(watermarkStrategy);
        try {
            ClosureCleaner.clean(watermarkStrategy, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
            this.watermarkStrategy = new SerializedValue(watermarkStrategy);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("The given WatermarkStrategy is not serializable", e);
        }
        return this;
    }

    public FlinkPulsarSource<T> setStartFromEarliest() {
        this.startupMode = StartupMode.EARLIEST;
        this.specificStartupOffsets = null;
        return this;
    }

    public FlinkPulsarSource<T> setStartFromLatest() {
        this.startupMode = StartupMode.LATEST;
        this.specificStartupOffsets = null;
        return this;
    }

    public FlinkPulsarSource<T> setStartFromSpecificOffsets(Map<String, MessageId> specificStartupOffsets) {
        Preconditions.checkNotNull(specificStartupOffsets);
        this.specificStartupOffsets = specificStartupOffsets.entrySet().stream().collect(Collectors.toMap(e -> new TopicRange((String)e.getKey()), Map.Entry::getValue));
        this.startupMode = StartupMode.SPECIFIC_OFFSETS;
        this.specificStartupOffsetsAsBytes = new HashMap<TopicRange, byte[]>();
        for (Map.Entry<TopicRange, MessageId> entry : this.specificStartupOffsets.entrySet()) {
            this.specificStartupOffsetsAsBytes.put(entry.getKey(), entry.getValue().toByteArray());
        }
        return this;
    }

    public FlinkPulsarSource<T> setStartFromSubscription(String externalSubscriptionName) {
        this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
        this.externalSubscriptionName = (String)Preconditions.checkNotNull((Object)externalSubscriptionName);
        return this;
    }

    public FlinkPulsarSource<T> setStartFromSubscription(String externalSubscriptionName, MessageId subscriptionPosition) {
        this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
        this.externalSubscriptionName = (String)Preconditions.checkNotNull((Object)externalSubscriptionName);
        this.subscriptionPosition = (MessageId)Preconditions.checkNotNull((Object)subscriptionPosition);
        return this;
    }

    public FlinkPulsarSource<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
        Preconditions.checkArgument((startupOffsetsTimestamp >= 0L ? 1 : 0) != 0, (Object)"The provided value for the startup offsets timestamp is invalid.");
        long currentTimestamp = System.currentTimeMillis();
        Preconditions.checkArgument((startupOffsetsTimestamp <= currentTimestamp ? 1 : 0) != 0, (String)"Startup time[%s] must be before current time[%s].", (Object[])new Object[]{startupOffsetsTimestamp, currentTimestamp});
        this.startupMode = StartupMode.TIMESTAMP;
        this.startupOffsetsTimestamp = startupOffsetsTimestamp;
        this.specificStartupOffsets = null;
        return this;
    }

    public void open(Configuration parameters) throws Exception {
        boolean usingRestoredState;
        if (this.deserializer != null) {
            this.deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter((RuntimeContext)this.getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
        }
        this.taskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
        this.numParallelTasks = this.getRuntimeContext().getNumberOfParallelSubtasks();
        this.metadataReader = this.createMetadataReader();
        this.ownedTopicStarts = new HashMap<TopicRange, MessageId>();
        this.excludeStartMessageIds = new HashSet<TopicRange>();
        Set<TopicRange> allTopics = this.metadataReader.discoverTopicChanges();
        if (this.specificStartupOffsets == null && this.specificStartupOffsetsAsBytes != null) {
            this.specificStartupOffsets = new HashMap<TopicRange, MessageId>();
            for (Map.Entry<TopicRange, byte[]> entry : this.specificStartupOffsetsAsBytes.entrySet()) {
                this.specificStartupOffsets.put(entry.getKey(), MessageId.fromByteArray((byte[])entry.getValue()));
            }
        }
        Map<TopicRange, MessageId> allTopicOffsets = this.offsetForEachTopic(allTopics, this.startupMode, this.specificStartupOffsets);
        boolean bl = usingRestoredState = this.startupMode != StartupMode.EXTERNAL_SUBSCRIPTION || this.stateSubEqualexternalSub;
        if (this.restoredState != null && usingRestoredState) {
            allTopicOffsets.entrySet().stream().filter(e -> !this.restoredState.containsKey(e.getKey())).forEach(e -> this.restoredState.put((TopicRange)e.getKey(), (MessageId)e.getValue()));
            this.restoredState.entrySet().stream().filter(e -> SourceSinkUtils.belongsTo((TopicRange)e.getKey(), this.numParallelTasks, this.taskIndex)).forEach(e -> {
                this.ownedTopicStarts.put((TopicRange)e.getKey(), (MessageId)e.getValue());
                this.excludeStartMessageIds.add((TopicRange)e.getKey());
            });
            Set goneTopics = Sets.difference(this.restoredState.keySet(), allTopics).stream().filter(k -> SourceSinkUtils.belongsTo(k, this.numParallelTasks, this.taskIndex)).collect(Collectors.toSet());
            for (TopicRange goneTopic : goneTopics) {
                log.warn(goneTopic + " is removed from subscription since it no longer matches with topics settings.");
                this.ownedTopicStarts.remove(goneTopic);
            }
            log.info("Source {} will start reading {} topics in restored state {}", new Object[]{this.taskIndex, this.ownedTopicStarts.size(), StringUtils.join((Object[])new Set[]{this.ownedTopicStarts.entrySet()})});
        } else {
            this.ownedTopicStarts.putAll(allTopicOffsets.entrySet().stream().filter(e -> SourceSinkUtils.belongsTo((TopicRange)e.getKey(), this.numParallelTasks, this.taskIndex)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
            if (this.ownedTopicStarts.isEmpty()) {
                log.info("Source {} initially has no topics to read from.", (Object)this.taskIndex);
            } else {
                log.info("Source {} will start reading {} topics from initialized positions: {}", new Object[]{this.taskIndex, this.ownedTopicStarts.size(), this.ownedTopicStarts});
            }
        }
    }

    protected String getSubscriptionName() {
        if (this.startupMode == StartupMode.EXTERNAL_SUBSCRIPTION) {
            Preconditions.checkNotNull((Object)this.externalSubscriptionName);
            return this.externalSubscriptionName;
        }
        return "flink-pulsar-" + this.uuid.toString();
    }

    protected PulsarMetadataReader createMetadataReader() throws PulsarClientException {
        return new PulsarMetadataReader(this.adminUrl, this.clientConfigurationData, this.getSubscriptionName(), this.caseInsensitiveParams, this.taskIndex, this.numParallelTasks, this.startupMode == StartupMode.EXTERNAL_SUBSCRIPTION);
    }

    public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
        if (this.ownedTopicStarts == null) {
            throw new Exception("The partitions were not set for the source");
        }
        this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
        this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
        this.offsetCommitCallback = new PulsarCommitCallback(){

            @Override
            public void onSuccess() {
                FlinkPulsarSource.this.successfulCommits.inc();
            }

            @Override
            public void onException(Throwable cause) {
                log.warn("source {} failed commit by {}", (Object)FlinkPulsarSource.this.taskIndex, (Object)cause.toString());
                FlinkPulsarSource.this.failedCommits.inc();
            }
        };
        if (this.ownedTopicStarts.isEmpty()) {
            ctx.markAsTemporarilyIdle();
        }
        log.info("Source {} creating fetcher with offsets {}", (Object)this.taskIndex, (Object)StringUtils.join((Object[])new Set[]{this.ownedTopicStarts.entrySet()}));
        StreamingRuntimeContext streamingRuntime = (StreamingRuntimeContext)this.getRuntimeContext();
        this.pulsarFetcher = this.createFetcher(ctx, this.ownedTopicStarts, this.watermarkStrategy, streamingRuntime.getProcessingTimeService(), streamingRuntime.getExecutionConfig().getAutoWatermarkInterval(), this.getRuntimeContext().getUserCodeClassLoader(), streamingRuntime, this.useMetrics, this.excludeStartMessageIds);
        if (!this.running) {
            return;
        }
        if (this.discoveryIntervalMillis < 0L) {
            this.pulsarFetcher.runFetchLoop();
        } else {
            this.runWithTopicsDiscovery();
        }
    }

    protected PulsarFetcher<T> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<TopicRange, MessageId> seedTopicsWithInitialOffsets, SerializedValue<WatermarkStrategy<T>> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, StreamingRuntimeContext streamingRuntime, boolean useMetrics, Set<TopicRange> excludeStartMessageIds) throws Exception {
        return new PulsarFetcher<T>(sourceContext, seedTopicsWithInitialOffsets, excludeStartMessageIds, watermarkStrategy, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, streamingRuntime, this.clientConfigurationData, this.readerConf, this.pollTimeoutMs, this.commitMaxRetries, this.deserializer, this.metadataReader, streamingRuntime.getMetricGroup().addGroup("PulsarConsumer"), useMetrics, this.startupOffsetsTimestamp);
    }

    public void joinDiscoveryLoopThread() throws InterruptedException {
        if (this.discoveryLoopThread != null) {
            this.discoveryLoopThread.join();
        }
    }

    public void runWithTopicsDiscovery() throws Exception {
        AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<Exception>();
        this.createAndStartDiscoveryLoop(discoveryLoopErrorRef);
        this.pulsarFetcher.runFetchLoop();
        this.joinDiscoveryLoopThread();
        Exception discoveryLoopError = discoveryLoopErrorRef.get();
        if (discoveryLoopError != null) {
            throw new IllegalStateException(discoveryLoopError);
        }
    }

    private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
        this.discoveryLoopThread = new Thread(() -> {
            try {
                while (this.running) {
                    Set<TopicRange> added = this.metadataReader.discoverTopicChanges();
                    if (this.running && !added.isEmpty()) {
                        this.pulsarFetcher.addDiscoveredTopics(added);
                    }
                    if (!this.running || this.discoveryIntervalMillis == -1L) continue;
                    Thread.sleep(this.discoveryIntervalMillis);
                }
            }
            catch (PulsarMetadataReader.ClosedException added) {
            }
            catch (InterruptedException added) {
            }
            catch (Exception e) {
                discoveryLoopErrorRef.set(e);
            }
            finally {
                if (this.running) {
                    this.cancel();
                }
            }
        }, "Pulsar topic discovery for source " + this.taskIndex);
        this.discoveryLoopThread.start();
    }

    public void close() throws Exception {
        this.cancel();
        this.joinDiscoveryLoopThread();
        Exception exception = null;
        if (this.metadataReader != null) {
            try {
                this.metadataReader.close();
            }
            catch (Exception e) {
                exception = e;
            }
        }
        try {
            super.close();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        if (exception != null) {
            throw exception;
        }
    }

    public void cancel() {
        this.running = false;
        if (this.discoveryLoopThread != null) {
            this.discoveryLoopThread.interrupt();
        }
        if (this.pulsarFetcher != null) {
            try {
                this.pulsarFetcher.cancel();
            }
            catch (Exception e) {
                log.error("Failed to cancel the Pulsar Fetcher {}", (Object)ExceptionUtils.stringifyException((Throwable)e));
                throw new IllegalStateException(e);
            }
        }
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        OperatorStateStore stateStore = context.getOperatorStateStore();
        this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME_V3, FlinkPulsarSource.createStateSerializer()));
        if (context.isRestored()) {
            this.restoredState = new TreeMap();
            Iterator<Object> iterator = ((Iterable)this.unionOffsetStates.get()).iterator();
            if (!iterator.hasNext()) {
                iterator = this.tryMigrateState(stateStore);
            }
            while (iterator.hasNext()) {
                Tuple2 tuple2 = (Tuple2)iterator.next();
                SerializableRange range = ((TopicSubscription)tuple2.f0).getRange() != null ? ((TopicSubscription)tuple2.f0).getRange() : SerializableRange.ofFullRange();
                TopicRange topicRange = new TopicRange(((TopicSubscription)tuple2.f0).getTopic(), range.getPulsarRange());
                this.restoredState.put(topicRange, (MessageId)tuple2.f1);
                String subscriptionName = ((TopicSubscription)tuple2.f0).getSubscriptionName();
                if (this.stateSubEqualexternalSub || !StringUtils.equals((CharSequence)subscriptionName, (CharSequence)this.externalSubscriptionName)) continue;
                this.stateSubEqualexternalSub = true;
                log.info("Source restored state with subscriptionName {}", (Object)subscriptionName);
            }
            log.info("Source subtask {} restored state {}", (Object)this.taskIndex, (Object)StringUtils.join((Object[])new Set[]{this.restoredState.entrySet()}));
        } else {
            log.info("Source subtask {} has no restore state", (Object)this.taskIndex);
        }
    }

    @VisibleForTesting
    static TupleSerializer<Tuple2<TopicSubscription, MessageId>> createStateSerializer() {
        TypeSerializer[] fieldSerializers = new TypeSerializer[]{TopicSubscriptionSerializer.INSTANCE, MessageIdSerializer.INSTANCE};
        Class<Tuple2> tupleClass = Tuple2.class;
        return new TupleSerializer(tupleClass, fieldSerializers);
    }

    private Iterator<Tuple2<TopicSubscription, MessageId>> tryMigrateState(OperatorStateStore stateStore) throws Exception {
        log.info("restore old state version {}", (Object)this.oldStateVersion);
        PulsarSourceStateSerializer stateSerializer = new PulsarSourceStateSerializer(this.getRuntimeContext().getExecutionConfig());
        ListState rawStates = stateStore.getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME, stateSerializer.getSerializer(this.oldStateVersion)));
        ListState oldUnionSubscriptionNameStates = stateStore.getUnionListState(new ListStateDescriptor("topic-partition-offset-states_subName", TypeInformation.of((TypeHint)new TypeHint<String>(){})));
        Iterator subNameIterator = ((Iterable)oldUnionSubscriptionNameStates.get()).iterator();
        Iterator tuple2s = ((Iterable)rawStates.get()).iterator();
        log.info("restore old state has data {}", (Object)tuple2s.hasNext());
        ArrayList<Tuple2> records = new ArrayList<Tuple2>();
        while (tuple2s.hasNext()) {
            Object next = tuple2s.next();
            Tuple2<TopicSubscription, MessageId> tuple2 = stateSerializer.deserialize(this.oldStateVersion, next);
            String subName = ((TopicSubscription)tuple2.f0).getSubscriptionName();
            if (subNameIterator.hasNext()) {
                subName = (String)subNameIterator.next();
            }
            TopicSubscription topicSubscription = TopicSubscription.builder().topic(((TopicSubscription)tuple2.f0).getTopic()).range(((TopicSubscription)tuple2.f0).getRange()).subscriptionName(subName).build();
            Tuple2 record = Tuple2.of((Object)topicSubscription, (Object)tuple2.f1);
            log.info("migrationState {}", (Object)record);
            records.add(record);
        }
        rawStates.clear();
        oldUnionSubscriptionNameStates.clear();
        return records.listIterator();
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!this.running) {
            log.debug("snapshotState() called on closed source");
        } else {
            this.unionOffsetStates.clear();
            PulsarFetcher<T> fetcher = this.pulsarFetcher;
            if (fetcher == null) {
                for (Map.Entry<TopicRange, MessageId> entry : this.ownedTopicStarts.entrySet()) {
                    TopicSubscription topicSubscription = TopicSubscription.builder().topic(entry.getKey().getTopic()).range(entry.getKey().getRange()).subscriptionName(this.getSubscriptionName()).build();
                    this.unionOffsetStates.add((Object)Tuple2.of((Object)topicSubscription, (Object)entry.getValue()));
                }
                this.pendingOffsetsToCommit.put(context.getCheckpointId(), this.restoredState);
            } else {
                Map<TopicRange, MessageId> currentOffsets = fetcher.snapshotCurrentState();
                this.pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                for (Map.Entry<TopicRange, MessageId> entry : currentOffsets.entrySet()) {
                    TopicSubscription topicSubscription = TopicSubscription.builder().topic(entry.getKey().getTopic()).range(entry.getKey().getRange()).subscriptionName(this.getSubscriptionName()).build();
                    this.unionOffsetStates.add((Object)Tuple2.of((Object)topicSubscription, (Object)entry.getValue()));
                }
                int exceed = this.pendingOffsetsToCommit.size() - 100;
                Iterator<Long> iterator = this.pendingOffsetsToCommit.keySet().iterator();
                while (iterator.hasNext() && exceed > 0) {
                    iterator.next();
                    iterator.remove();
                }
            }
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        block7: {
            if (!this.running) {
                log.info("notifyCheckpointComplete() called on closed source");
                return;
            }
            PulsarFetcher<T> fetcher = this.pulsarFetcher;
            if (fetcher == null) {
                log.info("notifyCheckpointComplete() called on uninitialized source");
                return;
            }
            log.debug("Source {} received confirmation for unknown checkpoint id {}", (Object)this.taskIndex, (Object)checkpointId);
            try {
                if (!this.pendingOffsetsToCommit.containsKey(checkpointId)) {
                    log.warn("Source {} received confirmation for unknown checkpoint id {}", (Object)this.taskIndex, (Object)checkpointId);
                    return;
                }
                Map<TopicRange, MessageId> offset = this.pendingOffsetsToCommit.get(checkpointId);
                Iterator<Long> iterator = this.pendingOffsetsToCommit.keySet().iterator();
                while (iterator.hasNext()) {
                    Long key = iterator.next();
                    iterator.remove();
                    if (!Objects.equals(key, checkpointId)) continue;
                    break;
                }
                if (offset == null || offset.size() == 0) {
                    log.debug("Source {} has empty checkpoint state", (Object)this.taskIndex);
                    return;
                }
                fetcher.commitOffsetToPulsar(offset, this.offsetCommitCallback);
            }
            catch (Exception e) {
                if (!this.running) break block7;
                throw e;
            }
        }
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        log.error("checkpoint aborted, checkpointId: {}", (Object)checkpointId);
    }

    public Map<TopicRange, MessageId> offsetForEachTopic(Set<TopicRange> topics, StartupMode mode, Map<TopicRange, MessageId> specificStartupOffsets) {
        switch (mode) {
            case LATEST: 
            case TIMESTAMP: {
                return topics.stream().collect(Collectors.toMap(k -> k, k -> MessageId.latest));
            }
            case EARLIEST: {
                return topics.stream().collect(Collectors.toMap(k -> k, k -> MessageId.earliest));
            }
            case SPECIFIC_OFFSETS: {
                Preconditions.checkArgument((boolean)topics.containsAll(specificStartupOffsets.keySet()), (Object)String.format("Topics designated in startingOffsets should appear in %s, topics:%s, topics in offsets: %s", StringUtils.join((Object[])new Set[]{PulsarOptions.TOPIC_OPTION_KEYS}), StringUtils.join((Object[])topics.toArray()), StringUtils.join((Object[])specificStartupOffsets.entrySet().toArray())));
                HashMap<TopicRange, MessageId> specificOffsets = new HashMap<TopicRange, MessageId>();
                for (TopicRange topic : topics) {
                    if (specificStartupOffsets.containsKey(topic)) {
                        specificOffsets.put(topic, specificStartupOffsets.get(topic));
                        continue;
                    }
                    specificOffsets.put(topic, MessageId.latest);
                }
                return specificOffsets;
            }
            case EXTERNAL_SUBSCRIPTION: {
                HashMap<TopicRange, MessageId> offsetsFromSubs = new HashMap<TopicRange, MessageId>();
                for (TopicRange topic : topics) {
                    offsetsFromSubs.put(topic, this.metadataReader.getPositionFromSubscription(topic, this.subscriptionPosition));
                }
                return offsetsFromSubs;
            }
        }
        return null;
    }

    public Map<Long, Map<TopicRange, MessageId>> getPendingOffsetsToCommit() {
        return this.pendingOffsetsToCommit;
    }

    public Map<TopicRange, MessageId> getOwnedTopicStarts() {
        return this.ownedTopicStarts;
    }
}

