package org.apache.flink.connector.kinesis.source;

import java.util.Properties;
import java.util.function.Supplier;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer;
import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy;
import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter;
import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader;
import org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSClientUtil;
import org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.KinesisClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.AttributeMap;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;

@Experimental
/* loaded from: input_file:org/apache/flink/connector/kinesis/source/KinesisStreamsSource.class */
public class KinesisStreamsSource<T> implements Source<T, KinesisShardSplit, KinesisStreamsSourceEnumeratorState> {
    private final String streamArn;
    private final Configuration sourceConfig;
    private final KinesisDeserializationSchema<T> deserializationSchema;
    private final KinesisShardAssigner kinesisShardAssigner;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisStreamsSource(String str, Configuration configuration, KinesisDeserializationSchema<T> kinesisDeserializationSchema, KinesisShardAssigner kinesisShardAssigner) {
        Preconditions.checkNotNull(str);
        Preconditions.checkArgument(!str.isEmpty(), "stream ARN cannot be empty string");
        Preconditions.checkNotNull(configuration);
        Preconditions.checkNotNull(kinesisDeserializationSchema);
        Preconditions.checkNotNull(kinesisShardAssigner);
        this.streamArn = str;
        this.sourceConfig = configuration;
        this.deserializationSchema = kinesisDeserializationSchema;
        this.kinesisShardAssigner = kinesisShardAssigner;
    }

    public static <T> KinesisStreamsSourceBuilder<T> builder() {
        return new KinesisStreamsSourceBuilder<>();
    }

    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    public SourceReader<T, KinesisShardSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        setUpDeserializationSchema(sourceReaderContext);
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        Supplier supplier = () -> {
            return new PollingKinesisShardSplitReader(createKinesisStreamProxy(this.sourceConfig));
        };
        KinesisStreamsRecordEmitter kinesisStreamsRecordEmitter = new KinesisStreamsRecordEmitter(this.deserializationSchema);
        supplier.getClass();
        return new KinesisStreamsSourceReader(futureCompletingBlockingQueue, new SingleThreadFetcherManager(futureCompletingBlockingQueue, supplier::get), kinesisStreamsRecordEmitter, this.sourceConfig, sourceReaderContext);
    }

    public SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> createEnumerator(SplitEnumeratorContext<KinesisShardSplit> splitEnumeratorContext) throws Exception {
        return restoreEnumerator(splitEnumeratorContext, (KinesisStreamsSourceEnumeratorState) null);
    }

    public SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> restoreEnumerator(SplitEnumeratorContext<KinesisShardSplit> splitEnumeratorContext, KinesisStreamsSourceEnumeratorState kinesisStreamsSourceEnumeratorState) throws Exception {
        return new KinesisStreamsSourceEnumerator(splitEnumeratorContext, this.streamArn, this.sourceConfig, createKinesisStreamProxy(this.sourceConfig), this.kinesisShardAssigner, kinesisStreamsSourceEnumeratorState);
    }

    public SimpleVersionedSerializer<KinesisShardSplit> getSplitSerializer() {
        return new KinesisShardSplitSerializer();
    }

    public SimpleVersionedSerializer<KinesisStreamsSourceEnumeratorState> getEnumeratorCheckpointSerializer() {
        return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer());
    }

    private KinesisStreamProxy createKinesisStreamProxy(Configuration configuration) {
        SdkHttpClient createSyncHttpClient = AWSGeneralUtil.createSyncHttpClient(AttributeMap.builder().mo3335build(), ApacheHttpClient.builder());
        Properties properties = new Properties();
        configuration.addAllToProperties(properties);
        AWSGeneralUtil.validateAwsCredentials(properties);
        return new KinesisStreamProxy((KinesisClient) AWSClientUtil.createAwsSyncClient(properties, createSyncHttpClient, KinesisClient.builder(), KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX), createSyncHttpClient);
    }

    private void setUpDeserializationSchema(final SourceReaderContext sourceReaderContext) throws Exception {
        this.deserializationSchema.open(new DeserializationSchema.InitializationContext() { // from class: org.apache.flink.connector.kinesis.source.KinesisStreamsSource.1
            public MetricGroup getMetricGroup() {
                return sourceReaderContext.metricGroup().addGroup("deserializer");
            }

            public UserCodeClassLoader getUserCodeClassLoader() {
                return sourceReaderContext.getUserCodeClassLoader();
            }
        });
    }

    public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
        return restoreEnumerator((SplitEnumeratorContext<KinesisShardSplit>) splitEnumeratorContext, (KinesisStreamsSourceEnumeratorState) obj);
    }
}
