package org.apache.flink.connector.kinesis.source.enumerator;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigConstants;
import org.apache.flink.connector.kinesis.source.config.KinesisStreamsSourceConfigUtil;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.Shard;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.class */
public class KinesisStreamsSourceEnumerator implements SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
    private final SplitEnumeratorContext<KinesisShardSplit> context;
    private final String streamArn;
    private final Configuration sourceConfig;
    private final StreamProxy streamProxy;
    private final KinesisShardAssigner shardAssigner;
    private final ShardAssignerContext shardAssignerContext;
    private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = new HashMap();
    private final Set<String> assignedSplitIds = new HashSet();
    private final Set<KinesisShardSplit> unassignedSplits;
    private String lastSeenShardId;

    /* JADX INFO: Access modifiers changed from: private */
    @Internal
    /* loaded from: input_file:org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator$ShardAssignerContext.class */
    public static class ShardAssignerContext implements KinesisShardAssigner.Context {
        private final Map<Integer, Set<KinesisShardSplit>> splitAssignment;
        private final SplitEnumeratorContext<KinesisShardSplit> splitEnumeratorContext;
        private Map<Integer, List<KinesisShardSplit>> pendingSplitAssignments;

        private ShardAssignerContext(Map<Integer, Set<KinesisShardSplit>> map, SplitEnumeratorContext<KinesisShardSplit> splitEnumeratorContext) {
            this.pendingSplitAssignments = Collections.emptyMap();
            this.splitAssignment = map;
            this.splitEnumeratorContext = splitEnumeratorContext;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ShardAssignerContext withPendingSplitAssignments(Map<Integer, List<KinesisShardSplit>> map) {
            HashMap hashMap = new HashMap();
            for (Map.Entry<Integer, List<KinesisShardSplit>> entry : map.entrySet()) {
                hashMap.put(entry.getKey(), Collections.unmodifiableList(new ArrayList(entry.getValue())));
            }
            this.pendingSplitAssignments = Collections.unmodifiableMap(hashMap);
            return this;
        }

        @Override // org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner.Context
        public Map<Integer, Set<KinesisShardSplit>> getCurrentSplitAssignment() {
            HashMap hashMap = new HashMap();
            for (Map.Entry<Integer, Set<KinesisShardSplit>> entry : this.splitAssignment.entrySet()) {
                hashMap.put(entry.getKey(), Collections.unmodifiableSet(new HashSet(entry.getValue())));
            }
            return Collections.unmodifiableMap(hashMap);
        }

        @Override // org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner.Context
        public Map<Integer, List<KinesisShardSplit>> getPendingSplitAssignments() {
            return this.pendingSplitAssignments;
        }

        @Override // org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner.Context
        public Map<Integer, ReaderInfo> getRegisteredReaders() {
            return this.splitEnumeratorContext.registeredReaders();
        }
    }

    public KinesisStreamsSourceEnumerator(SplitEnumeratorContext<KinesisShardSplit> splitEnumeratorContext, String str, Configuration configuration, StreamProxy streamProxy, KinesisShardAssigner kinesisShardAssigner, KinesisStreamsSourceEnumeratorState kinesisStreamsSourceEnumeratorState) {
        this.context = splitEnumeratorContext;
        this.streamArn = str;
        this.sourceConfig = configuration;
        this.streamProxy = streamProxy;
        this.shardAssigner = kinesisShardAssigner;
        this.shardAssignerContext = new ShardAssignerContext(this.splitAssignment, splitEnumeratorContext);
        if (kinesisStreamsSourceEnumeratorState == null) {
            this.lastSeenShardId = null;
            this.unassignedSplits = new HashSet();
        } else {
            this.lastSeenShardId = kinesisStreamsSourceEnumeratorState.getLastSeenShardId();
            this.unassignedSplits = kinesisStreamsSourceEnumeratorState.getUnassignedSplits();
        }
    }

    public void start() {
        if (this.lastSeenShardId == null) {
            this.context.callAsync(this::initialDiscoverSplits, this::assignSplits);
        }
        long longValue = ((Long) this.sourceConfig.get(KinesisStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS)).longValue();
        this.context.callAsync(this::periodicallyDiscoverSplits, this::assignSplits, longValue, longValue);
    }

    public void handleSplitRequest(int i, @Nullable String str) {
    }

    public void addSplitsBack(List<KinesisShardSplit> list, int i) {
        if (!this.splitAssignment.containsKey(Integer.valueOf(i))) {
            LOG.warn("Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", Integer.valueOf(i), list);
            return;
        }
        for (KinesisShardSplit kinesisShardSplit : list) {
            this.splitAssignment.get(Integer.valueOf(i)).remove(kinesisShardSplit);
            this.assignedSplitIds.remove(kinesisShardSplit.splitId());
            this.unassignedSplits.add(kinesisShardSplit);
        }
        assignSplits(Collections.emptyList(), null);
    }

    public void addReader(int i) {
        this.splitAssignment.putIfAbsent(Integer.valueOf(i), new HashSet());
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public KinesisStreamsSourceEnumeratorState m8snapshotState(long j) throws Exception {
        return new KinesisStreamsSourceEnumeratorState(this.unassignedSplits, this.lastSeenShardId);
    }

    public void close() throws IOException {
        this.streamProxy.close();
    }

    private List<KinesisShardSplit> initialDiscoverSplits() {
        return mapToSplits(this.streamProxy.listShards(this.streamArn, this.lastSeenShardId), (KinesisStreamsSourceConfigConstants.InitialPosition) this.sourceConfig.get(KinesisStreamsSourceConfigConstants.STREAM_INITIAL_POSITION));
    }

    private List<KinesisShardSplit> periodicallyDiscoverSplits() {
        return mapToSplits(this.streamProxy.listShards(this.streamArn, this.lastSeenShardId), KinesisStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON);
    }

    private List<KinesisShardSplit> mapToSplits(List<Shard> list, KinesisStreamsSourceConfigConstants.InitialPosition initialPosition) {
        StartingPosition fromStart;
        switch (initialPosition) {
            case LATEST:
                fromStart = StartingPosition.fromTimestamp(Instant.now());
                break;
            case AT_TIMESTAMP:
                fromStart = StartingPosition.fromTimestamp(KinesisStreamsSourceConfigUtil.parseStreamTimestampStartingPosition(this.sourceConfig).toInstant());
                break;
            case TRIM_HORIZON:
            default:
                fromStart = StartingPosition.fromStart();
                break;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Shard> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new KinesisShardSplit(this.streamArn, it.next().shardId(), fromStart));
        }
        return arrayList;
    }

    private void assignSplits(List<KinesisShardSplit> list, Throwable th) {
        if (th != null) {
            throw new KinesisStreamsSourceException("Failed to list shards.", th);
        }
        if (this.context.registeredReaders().size() < this.context.currentParallelism()) {
            LOG.info("Insufficient registered readers, skipping assignment of discovered splits until all readers are registered. Required number of readers: {}, Registered readers: {}", Integer.valueOf(this.context.currentParallelism()), Integer.valueOf(this.context.registeredReaders().size()));
            this.unassignedSplits.addAll(list);
            return;
        }
        HashMap hashMap = new HashMap();
        Iterator<KinesisShardSplit> it = this.unassignedSplits.iterator();
        while (it.hasNext()) {
            assignSplitToSubtask(it.next(), hashMap);
        }
        this.unassignedSplits.clear();
        Iterator<KinesisShardSplit> it2 = list.iterator();
        while (it2.hasNext()) {
            assignSplitToSubtask(it2.next(), hashMap);
        }
        updateLastSeenShardId(list);
        updateSplitAssignment(hashMap);
        this.context.assignSplits(new SplitsAssignment(hashMap));
    }

    private void assignSplitToSubtask(KinesisShardSplit kinesisShardSplit, Map<Integer, List<KinesisShardSplit>> map) {
        if (this.assignedSplitIds.contains(kinesisShardSplit.splitId())) {
            LOG.info("Skipping assignment of shard {} from stream {} because it is already assigned.", kinesisShardSplit.getShardId(), kinesisShardSplit.getStreamArn());
            return;
        }
        int assign = this.shardAssigner.assign(kinesisShardSplit, this.shardAssignerContext.withPendingSplitAssignments(map));
        LOG.info("Assigning shard {} from stream {} to subtask {}.", new Object[]{kinesisShardSplit.getShardId(), kinesisShardSplit.getStreamArn(), Integer.valueOf(assign)});
        if (map.containsKey(Integer.valueOf(assign))) {
            map.get(Integer.valueOf(assign)).add(kinesisShardSplit);
        } else {
            ArrayList arrayList = new ArrayList();
            arrayList.add(kinesisShardSplit);
            map.put(Integer.valueOf(assign), arrayList);
        }
        this.assignedSplitIds.add(kinesisShardSplit.splitId());
    }

    private void updateLastSeenShardId(List<KinesisShardSplit> list) {
        if (list.isEmpty()) {
            return;
        }
        this.lastSeenShardId = list.get(list.size() - 1).getShardId();
    }

    private void updateSplitAssignment(Map<Integer, List<KinesisShardSplit>> map) {
        map.forEach((num, list) -> {
            if (this.splitAssignment.containsKey(num)) {
                this.splitAssignment.get(num).addAll(list);
            } else {
                this.splitAssignment.put(num, new HashSet(list));
            }
        });
    }
}
