/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.beam.sdk.io.aws2.kinesis.GetKinesisRecordsResult;
import org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient;
import org.apache.beam.sdk.io.aws2.kinesis.StartingPoint;
import org.apache.beam.sdk.io.aws2.kinesis.TransientKinesisException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

class StartingPointShardsFinder
implements Serializable {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(StartingPointShardsFinder.class);

    StartingPointShardsFinder() {
    }

    @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized Shard> findShardsAtStartingPoint(@UnknownKeyFor @NonNull @Initialized SimplifiedKinesisClient kinesis, @UnknownKeyFor @NonNull @Initialized String streamName, @UnknownKeyFor @NonNull @Initialized StartingPoint startingPoint) throws @UnknownKeyFor @NonNull @Initialized TransientKinesisException {
        Sets.SetView expiredShards;
        List<Shard> allShards = kinesis.listShards(streamName);
        Set<Shard> initialShards = this.findInitialShardsWithoutParents(streamName, allShards);
        HashSet<Shard> startingPointShards = new HashSet<Shard>();
        do {
            Set<Shard> validShards = this.validateShards(kinesis, initialShards, streamName, startingPoint);
            startingPointShards.addAll(validShards);
            expiredShards = Sets.difference(initialShards, validShards);
            if (!expiredShards.isEmpty()) {
                LOG.info("Following shards expired for {} stream at '{}' starting point: {}", new Object[]{streamName, startingPoint, expiredShards});
            }
            initialShards = this.findNextShards(allShards, (Set<Shard>)expiredShards);
        } while (!expiredShards.isEmpty());
        return startingPointShards;
    }

    private @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized Shard> findNextShards(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Shard> allShards, @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized Shard> expiredShards) {
        HashSet<Shard> nextShards = new HashSet<Shard>();
        for (Shard expiredShard : expiredShards) {
            boolean successorFound = false;
            for (Shard shard : allShards) {
                if (Objects.equals(expiredShard.shardId(), shard.parentShardId())) {
                    nextShards.add(shard);
                    successorFound = true;
                    continue;
                }
                if (!Objects.equals(expiredShard.shardId(), shard.adjacentParentShardId())) continue;
                successorFound = true;
            }
            if (successorFound) continue;
            throw new IllegalStateException("No successors were found for shard: " + expiredShard);
        }
        return nextShards;
    }

    private @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized Shard> findInitialShardsWithoutParents(@UnknownKeyFor @NonNull @Initialized String streamName, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Shard> allShards) {
        HashSet<String> shardIds = new HashSet<String>();
        for (Shard shard : allShards) {
            shardIds.add(shard.shardId());
        }
        LOG.info("Stream {} has following shards: {}", (Object)streamName, shardIds);
        HashSet<Shard> shardsWithoutParents = new HashSet<Shard>();
        for (Shard shard : allShards) {
            if (shardIds.contains(shard.parentShardId())) continue;
            shardsWithoutParents.add(shard);
        }
        return shardsWithoutParents;
    }

    private @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized Shard> validateShards(@UnknownKeyFor @NonNull @Initialized SimplifiedKinesisClient kinesis, @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized Shard> rootShards, @UnknownKeyFor @NonNull @Initialized String streamName, @UnknownKeyFor @NonNull @Initialized StartingPoint startingPoint) throws @UnknownKeyFor @NonNull @Initialized TransientKinesisException {
        HashSet<Shard> validShards = new HashSet<Shard>();
        ShardIteratorType shardIteratorType = ShardIteratorType.fromValue((String)startingPoint.getPositionName());
        for (Shard shard : rootShards) {
            String shardIterator = kinesis.getShardIterator(streamName, shard.shardId(), shardIteratorType, null, startingPoint.getTimestamp());
            GetKinesisRecordsResult records = kinesis.getRecords(shardIterator, streamName, shard.shardId());
            if (records.getNextShardIterator() == null && records.getRecords().isEmpty()) continue;
            validShards.add(shard);
        }
        return validShards;
    }
}

