package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.RandomStringUtils;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ForwardingIterator;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StartingPosition;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool.class */
public class EFOShardSubscribersPool {
    private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscribersPool.class);
    private static final int ON_ERROR_COOL_DOWN_MS_DEFAULT = 1000;
    private static final int DEFAULT_MAX_CAPACITY_PER_SHARD = 10;
    private final int onErrorCoolDownMs;
    private final String poolId;
    private final KinesisIO.Read read;
    private final String consumerArn;
    private final KinesisAsyncClient kinesis;
    private final ConcurrentLinkedQueue<EventRecords> eventQueue;
    private final int maxCapacityPerShard;
    private final Map<String, ShardState> state;
    private volatile Throwable subscriptionError;
    private boolean isStopped;
    private final BiConsumer<Void, Throwable> errorHandler;
    private final ScheduledExecutorService scheduler;
    EventRecords current;
    private final WatermarkPolicyFactory watermarkPolicyFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool$EventRecords.class */
    public static class EventRecords extends ForwardingIterator<KinesisClientRecord> {
        private static final AggregatorUtil AGG_UTIL = new AggregatorUtil();
        String shardId;
        SubscribeToShardEvent event;
        Iterator<KinesisClientRecord> delegate = null;

        EventRecords(String str, SubscribeToShardEvent subscribeToShardEvent) {
            this.shardId = str;
            this.event = subscribeToShardEvent;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: delegate, reason: merged with bridge method [inline-methods] */
        public Iterator<KinesisClientRecord> m18delegate() {
            if (this.delegate == null) {
                if (!this.event.hasRecords() || this.event.records().isEmpty()) {
                    this.delegate = Collections.emptyIterator();
                } else {
                    this.delegate = AGG_UTIL.deaggregate(Lists.transform(this.event.records(), KinesisClientRecord::fromRecord)).iterator();
                }
            }
            return this.delegate;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPool$ShardState.class */
    public static class ShardState {
        final EFOShardSubscriber subscriber;
        final ShardCheckpoint initCheckpoint;
        final WatermarkPolicy watermarkPolicy;
        String sequenceNumber = null;
        long subSequenceNumber = 0;

        ShardState(EFOShardSubscriber eFOShardSubscriber, ShardCheckpoint shardCheckpoint, WatermarkPolicyFactory watermarkPolicyFactory) {
            this.subscriber = eFOShardSubscriber;
            this.initCheckpoint = shardCheckpoint;
            this.watermarkPolicy = watermarkPolicyFactory.createWatermarkPolicy();
        }

        void update(KinesisRecord kinesisRecord) {
            this.sequenceNumber = (String) Preconditions.checkNotNull(kinesisRecord.getSequenceNumber());
            this.subSequenceNumber = kinesisRecord.getSubSequenceNumber();
            this.watermarkPolicy.update(kinesisRecord);
        }

        void update(EventRecords eventRecords) {
            this.sequenceNumber = (String) Preconditions.checkNotNull(eventRecords.event.continuationSequenceNumber());
            this.subscriber.ackEvent();
        }

        ShardCheckpoint toCheckpoint() {
            return this.sequenceNumber != null ? new ShardCheckpoint(this.initCheckpoint.getStreamName(), this.initCheckpoint.getShardId(), ShardIteratorType.AFTER_SEQUENCE_NUMBER, this.sequenceNumber, Long.valueOf(this.subSequenceNumber)) : this.initCheckpoint;
        }

        Instant getWatermark() {
            return this.watermarkPolicy.getWatermark();
        }

        boolean isAfterInitialCheckpoint(KinesisRecord kinesisRecord) {
            return this.initCheckpoint.isBeforeOrAt(kinesisRecord);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EFOShardSubscribersPool(KinesisIO.Read read, String str, KinesisAsyncClient kinesisAsyncClient) {
        this(read, str, kinesisAsyncClient, ON_ERROR_COOL_DOWN_MS_DEFAULT);
    }

    EFOShardSubscribersPool(KinesisIO.Read read, String str, KinesisAsyncClient kinesisAsyncClient, int i) {
        this.eventQueue = new ConcurrentLinkedQueue<>();
        this.state = new HashMap();
        this.isStopped = false;
        this.errorHandler = (r4, th) -> {
            if (th == null || this.subscriptionError != null) {
                return;
            }
            this.subscriptionError = th;
        };
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        this.current = null;
        this.poolId = generatePoolId();
        this.read = read;
        this.consumerArn = str;
        this.kinesis = kinesisAsyncClient;
        this.watermarkPolicyFactory = this.read.getWatermarkPolicyFactory();
        this.onErrorCoolDownMs = i;
        this.maxCapacityPerShard = read.getMaxCapacityPerShard() != null ? this.read.getMaxCapacityPerShard().intValue() : 10;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start(Iterable<ShardCheckpoint> iterable) {
        LOG.info("Pool {} - starting for stream {} consumer {}. Checkpoints = {}", new Object[]{this.poolId, this.read.getStreamName(), this.consumerArn, iterable});
        for (ShardCheckpoint shardCheckpoint : iterable) {
            Preconditions.checkState(!this.state.containsKey(shardCheckpoint.getShardId()), "Duplicate shard id %s", shardCheckpoint.getShardId());
            this.state.put(shardCheckpoint.getShardId(), new ShardState(initShardSubscriber(shardCheckpoint), shardCheckpoint, this.watermarkPolicyFactory));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisRecord getNextRecord() throws IOException {
        while (true) {
            if (!this.isStopped && this.subscriptionError != null) {
                stop();
            }
            if (this.current == null) {
                this.current = this.eventQueue.poll();
            }
            if (this.current == null) {
                if (this.subscriptionError == null) {
                    return null;
                }
                stop();
                throw new IOException(this.subscriptionError);
            }
            String str = this.current.shardId;
            ShardState shardState = (ShardState) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(this.state.get(str));
            if (this.current.hasNext()) {
                KinesisClientRecord kinesisClientRecord = (KinesisClientRecord) this.current.next();
                if (!this.current.hasNext()) {
                    onEventDone(shardState, this.current);
                    this.current = null;
                }
                KinesisRecord kinesisRecord = new KinesisRecord(kinesisClientRecord, this.read.getStreamName(), str);
                if (shardState.isAfterInitialCheckpoint(kinesisRecord)) {
                    shardState.update(kinesisRecord);
                    return kinesisRecord;
                }
            } else {
                onEventDone(shardState, this.current);
                this.current = null;
            }
        }
    }

    private void onEventDone(ShardState shardState, EventRecords eventRecords) {
        SubscribeToShardEvent subscribeToShardEvent = eventRecords.event;
        if (subscribeToShardEvent.continuationSequenceNumber() != null || !subscribeToShardEvent.hasChildShards()) {
            shardState.update(eventRecords);
            return;
        }
        LOG.info("Pool {} - processing re-shard signal {}", this.poolId, subscribeToShardEvent);
        for (String str : computeSuccessorShardsIds(eventRecords)) {
            ShardCheckpoint shardCheckpoint = new ShardCheckpoint(this.read.getStreamName(), str, new StartingPoint(InitialPositionInStream.TRIM_HORIZON));
            this.state.computeIfAbsent(str, str2 -> {
                return new ShardState(initShardSubscriber(shardCheckpoint), shardCheckpoint, this.watermarkPolicyFactory);
            });
        }
        this.state.remove(eventRecords.shardId);
    }

    private EFOShardSubscriber initShardSubscriber(ShardCheckpoint shardCheckpoint) {
        EFOShardSubscriber eFOShardSubscriber = new EFOShardSubscriber(this, shardCheckpoint.getShardId(), this.consumerArn, this.kinesis, this.onErrorCoolDownMs);
        StartingPosition eFOStartingPosition = shardCheckpoint.toEFOStartingPosition();
        if (this.subscriptionError == null) {
            eFOShardSubscriber.subscribe(eFOStartingPosition).whenCompleteAsync((BiConsumer<? super Void, ? super Throwable>) this.errorHandler);
        }
        return eFOShardSubscriber;
    }

    private List<String> computeSuccessorShardsIds(EventRecords eventRecords) {
        ArrayList arrayList = new ArrayList();
        for (ChildShard childShard : eventRecords.event.childShards()) {
            if (childShard.parentShards().contains(eventRecords.shardId)) {
                if (childShard.parentShards().size() <= 1) {
                    arrayList.add(childShard.shardId());
                } else if (eventRecords.shardId.equals((String) childShard.parentShards().stream().max((v0, v1) -> {
                    return v0.compareTo(v1);
                }).get())) {
                    arrayList.add(childShard.shardId());
                }
            }
        }
        if (arrayList.isEmpty()) {
            LOG.info("Pool {} - found no successors for shard {}", this.poolId, eventRecords.shardId);
        } else {
            LOG.info("Pool {} - found successors for shard {}: {}", new Object[]{this.poolId, eventRecords.shardId, arrayList});
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueueEvent(String str, SubscribeToShardEvent subscribeToShardEvent) {
        this.eventQueue.offer(new EventRecords(str, subscribeToShardEvent));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Instant getWatermark() {
        return TimeUtil.minTimestamp(this.state.values().stream().map((v0) -> {
            return v0.getWatermark();
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisReaderCheckpoint getCheckpointMark() {
        ArrayList arrayList = new ArrayList(this.state.size());
        Iterator<ShardState> it = this.state.values().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().toCheckpoint());
        }
        return new KinesisReaderCheckpoint(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        LOG.info("Pool {} - stopping", this.poolId);
        this.isStopped = true;
        this.state.forEach((str, shardState) -> {
            shardState.subscriber.cancel();
        });
        this.scheduler.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getPoolId() {
        return this.poolId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getMaxCapacityPerShard() {
        return this.maxCapacityPerShard;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> CompletableFuture<T> delayedTask(Supplier<CompletableFuture<T>> supplier, long j) {
        if (j <= 0) {
            return supplier.get();
        }
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        try {
            this.scheduler.schedule(() -> {
                return ((CompletableFuture) supplier.get()).handle((obj, th) -> {
                    return Boolean.valueOf(th == null ? completableFuture.complete(obj) : completableFuture.completeExceptionally(th));
                });
            }, j, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    private static String generatePoolId() {
        return RandomStringUtils.randomAlphanumeric(8).toLowerCase();
    }
}
