package org.apache.flink.connector.kinesis.source.proxy;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.KinesisClient;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Shard;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.class */
public class KinesisStreamProxy implements StreamProxy {
    private final KinesisClient kinesisClient;
    private final SdkHttpClient httpClient;
    private final Map<String, String> shardIdToIteratorStore = new ConcurrentHashMap();

    public KinesisStreamProxy(KinesisClient kinesisClient, SdkHttpClient sdkHttpClient) {
        this.kinesisClient = kinesisClient;
        this.httpClient = sdkHttpClient;
    }

    @Override // org.apache.flink.connector.kinesis.source.proxy.StreamProxy
    public List<Shard> listShards(String str, @Nullable String str2) {
        ArrayList arrayList = new ArrayList();
        String str3 = null;
        do {
            ListShardsResponse listShards = this.kinesisClient.listShards((ListShardsRequest) ListShardsRequest.builder().streamARN(str).exclusiveStartShardId(str3 == null ? str2 : null).nextToken(str3).mo3914build());
            arrayList.addAll(listShards.shards());
            str3 = listShards.nextToken();
        } while (str3 != null);
        return arrayList;
    }

    @Override // org.apache.flink.connector.kinesis.source.proxy.StreamProxy
    public GetRecordsResponse getRecords(String str, String str2, StartingPosition startingPosition) {
        try {
            GetRecordsResponse records = getRecords(str, this.shardIdToIteratorStore.computeIfAbsent(str2, str3 -> {
                return getShardIterator(str, str3, startingPosition);
            }));
            if (records.nextShardIterator() != null) {
                this.shardIdToIteratorStore.put(str2, records.nextShardIterator());
            }
            return records;
        } catch (ExpiredIteratorException e) {
            GetRecordsResponse records2 = getRecords(str, getShardIterator(str, str2, startingPosition));
            if (records2.nextShardIterator() != null) {
                this.shardIdToIteratorStore.put(str2, records2.nextShardIterator());
            }
            return records2;
        }
    }

    private String getShardIterator(String str, String str2, StartingPosition startingPosition) {
        GetShardIteratorRequest.Builder shardIteratorType = GetShardIteratorRequest.builder().streamARN(str).shardId(str2).shardIteratorType(startingPosition.getShardIteratorType());
        switch (startingPosition.getShardIteratorType()) {
            case AT_TIMESTAMP:
                if (!(startingPosition.getStartingMarker() instanceof Instant)) {
                    throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object.");
                }
                shardIteratorType = shardIteratorType.timestamp((Instant) startingPosition.getStartingMarker());
                break;
            case AT_SEQUENCE_NUMBER:
            case AFTER_SEQUENCE_NUMBER:
                if (!(startingPosition.getStartingMarker() instanceof String)) {
                    throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String.");
                }
                shardIteratorType = shardIteratorType.startingSequenceNumber((String) startingPosition.getStartingMarker());
                break;
        }
        return this.kinesisClient.getShardIterator((GetShardIteratorRequest) shardIteratorType.mo3914build()).shardIterator();
    }

    private GetRecordsResponse getRecords(String str, String str2) {
        return this.kinesisClient.getRecords((GetRecordsRequest) GetRecordsRequest.builder().streamARN(str).shardIterator(str2).mo3914build());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.kinesisClient.close();
        this.httpClient.close();
    }
}
