package org.apache.flink.streaming.connectors.kinesis;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kinesis.table.DefaultShardAssignerFactory;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisStateUtil;
import org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil;
import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.class */
public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> implements ResultTypeQueryable<T>, CheckpointedFunction {
    private static final long serialVersionUID = 4724006128720664870L;
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class);
    private final List<String> streams;
    private final Properties configProps;
    private final KinesisDeserializationSchema<T> deserializer;
    private KinesisShardAssigner shardAssigner;
    private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
    private WatermarkTracker watermarkTracker;
    private transient KinesisDataFetcher<T> fetcher;
    private transient HashMap<StreamShardMetadata.EquivalenceWrapper, SequenceNumber> sequenceNumsToRestore;
    private volatile boolean running;
    private static final String sequenceNumsStateStoreName = "Kinesis-Stream-Shard-State";
    private transient ListState<Tuple2<StreamShardMetadata, SequenceNumber>> sequenceNumsStateForCheckpoint;

    public FlinkKinesisConsumer(String str, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this(str, new KinesisDeserializationSchemaWrapper(deserializationSchema), properties);
    }

    public FlinkKinesisConsumer(String str, KinesisDeserializationSchema<T> kinesisDeserializationSchema, Properties properties) {
        this((List<String>) Collections.singletonList(str), kinesisDeserializationSchema, properties);
    }

    public FlinkKinesisConsumer(List<String> list, KinesisDeserializationSchema<T> kinesisDeserializationSchema, Properties properties) {
        this.shardAssigner = new DefaultShardAssignerFactory().getShardAssigner();
        this.running = true;
        Preconditions.checkNotNull(list, "streams can not be null");
        Preconditions.checkArgument(list.size() != 0, "must be consuming at least 1 stream");
        Preconditions.checkArgument(!list.contains(""), "stream names cannot be empty Strings");
        this.streams = list;
        this.configProps = (Properties) Preconditions.checkNotNull(properties, "configProps can not be null");
        KinesisConfigUtil.validateConsumerConfiguration(this.configProps, list);
        Preconditions.checkNotNull(kinesisDeserializationSchema, "deserializer can not be null");
        Preconditions.checkArgument(InstantiationUtil.isSerializable(kinesisDeserializationSchema), "The provided deserialization schema is not serializable: " + kinesisDeserializationSchema.getClass().getName() + ". Please check that it does not contain references to non-serializable instances.");
        this.deserializer = kinesisDeserializationSchema;
        StreamConsumerRegistrarUtil.eagerlyRegisterStreamConsumers(properties, list);
        if (LOG.isInfoEnabled()) {
            StringBuilder sb = new StringBuilder();
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                sb.append(it.next()).append(", ");
            }
            LOG.info("Flink Kinesis Consumer is going to read the following streams: {}", sb.toString());
        }
    }

    public KinesisShardAssigner getShardAssigner() {
        return this.shardAssigner;
    }

    public void setShardAssigner(KinesisShardAssigner kinesisShardAssigner) {
        this.shardAssigner = (KinesisShardAssigner) Preconditions.checkNotNull(kinesisShardAssigner, "function can not be null");
        ClosureCleaner.clean(kinesisShardAssigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
    }

    public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner() {
        return this.periodicWatermarkAssigner;
    }

    public void setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks<T> assignerWithPeriodicWatermarks) {
        this.periodicWatermarkAssigner = assignerWithPeriodicWatermarks;
        ClosureCleaner.clean(this.periodicWatermarkAssigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
    }

    public WatermarkTracker getWatermarkTracker() {
        return this.watermarkTracker;
    }

    public void setWatermarkTracker(WatermarkTracker watermarkTracker) {
        this.watermarkTracker = watermarkTracker;
        ClosureCleaner.clean(this.watermarkTracker, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        KinesisDataFetcher<T> createFetcher = createFetcher(this.streams, sourceContext, getRuntimeContext(), this.configProps, this.deserializer);
        for (StreamShardHandle streamShardHandle : createFetcher.discoverNewShardsToSubscribe()) {
            StreamShardMetadata.EquivalenceWrapper equivalenceWrapper = new StreamShardMetadata.EquivalenceWrapper(KinesisDataFetcher.convertToStreamShardMetadata(streamShardHandle));
            if (this.sequenceNumsToRestore == null) {
                SentinelSequenceNumber sentinelSequenceNumber = ConsumerConfigConstants.InitialPosition.valueOf(this.configProps.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();
                createFetcher.registerNewSubscribedShardState(new KinesisStreamShardState(equivalenceWrapper.getShardMetadata(), streamShardHandle, sentinelSequenceNumber.get()));
                if (LOG.isInfoEnabled()) {
                    LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), streamShardHandle.toString(), sentinelSequenceNumber.get()});
                }
            } else if (this.sequenceNumsToRestore.containsKey(equivalenceWrapper)) {
                createFetcher.registerNewSubscribedShardState(new KinesisStreamShardState(equivalenceWrapper.getShardMetadata(), streamShardHandle, this.sequenceNumsToRestore.get(equivalenceWrapper)));
                if (LOG.isInfoEnabled()) {
                    LOG.info("Subtask {} is seeding the fetcher with restored shard {}, starting state set to the restored sequence number {}", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), streamShardHandle.toString(), this.sequenceNumsToRestore.get(equivalenceWrapper)});
                }
            } else {
                createFetcher.registerNewSubscribedShardState(new KinesisStreamShardState(equivalenceWrapper.getShardMetadata(), streamShardHandle, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
                if (LOG.isInfoEnabled()) {
                    LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}, starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), streamShardHandle.toString());
                }
            }
        }
        if (this.running) {
            this.fetcher = createFetcher;
            createFetcher.runFetcher();
            createFetcher.awaitTermination();
            sourceContext.close();
        }
    }

    public void cancel() {
        this.running = false;
        KinesisDataFetcher<T> kinesisDataFetcher = this.fetcher;
        if (kinesisDataFetcher != null) {
            try {
                kinesisDataFetcher.shutdownFetcher();
            } catch (Exception e) {
                LOG.warn("Error while closing Kinesis data fetcher", e);
            }
        }
    }

    public void close() throws Exception {
        cancel();
        KinesisDataFetcher<T> kinesisDataFetcher = this.fetcher;
        if (kinesisDataFetcher != null) {
            kinesisDataFetcher.awaitTermination();
        }
        this.fetcher = null;
        super.close();
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.sequenceNumsStateForCheckpoint = functionInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor(sequenceNumsStateStoreName, KinesisStateUtil.createShardsStateSerializer(getRuntimeContext().getExecutionConfig())));
        if (!functionInitializationContext.isRestored()) {
            LOG.info("No restore state for FlinkKinesisConsumer.");
            return;
        }
        if (this.sequenceNumsToRestore == null) {
            this.sequenceNumsToRestore = new HashMap<>();
            for (Tuple2 tuple2 : (Iterable) this.sequenceNumsStateForCheckpoint.get()) {
                this.sequenceNumsToRestore.put(new StreamShardMetadata.EquivalenceWrapper((StreamShardMetadata) tuple2.f0), (SequenceNumber) tuple2.f1);
            }
            LOG.info("Setting restore state in the FlinkKinesisConsumer. Using the following offsets: {}", this.sequenceNumsToRestore);
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!this.running) {
            LOG.debug("snapshotState() called on closed source; returning null.");
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Snapshotting state ...");
        }
        this.sequenceNumsStateForCheckpoint.clear();
        if (this.fetcher == null) {
            if (this.sequenceNumsToRestore != null) {
                for (Map.Entry<StreamShardMetadata.EquivalenceWrapper, SequenceNumber> entry : this.sequenceNumsToRestore.entrySet()) {
                    if (KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(this.shardAssigner.assign(KinesisDataFetcher.convertToStreamShardHandle(entry.getKey().getShardMetadata()), getRuntimeContext().getNumberOfParallelSubtasks()), getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask())) {
                        this.sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey().getShardMetadata(), entry.getValue()));
                    }
                }
                return;
            }
            return;
        }
        HashMap<StreamShardMetadata, SequenceNumber> snapshotState = this.fetcher.snapshotState();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", new Object[]{snapshotState, Long.valueOf(functionSnapshotContext.getCheckpointId()), Long.valueOf(functionSnapshotContext.getCheckpointTimestamp())});
        }
        for (Map.Entry<StreamShardMetadata, SequenceNumber> entry2 : snapshotState.entrySet()) {
            this.sequenceNumsStateForCheckpoint.add(Tuple2.of(entry2.getKey(), entry2.getValue()));
        }
    }

    protected KinesisDataFetcher<T> createFetcher(List<String> list, SourceFunction.SourceContext<T> sourceContext, RuntimeContext runtimeContext, Properties properties, KinesisDeserializationSchema<T> kinesisDeserializationSchema) {
        return new KinesisDataFetcher<>(list, sourceContext, runtimeContext, properties, kinesisDeserializationSchema, this.shardAssigner, this.periodicWatermarkAssigner, this.watermarkTracker);
    }

    @VisibleForTesting
    HashMap<StreamShardMetadata.EquivalenceWrapper, SequenceNumber> getRestoredState() {
        return this.sequenceNumsToRestore;
    }
}
