package org.apache.beam.sdk.io.aws2.kinesis;

import io.netty.channel.ChannelException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StartingPosition;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscriber.class */
public class EFOShardSubscriber {
    private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscriber.class);
    private final EFOShardSubscribersPool pool;
    private final String consumerArn;
    private final String shardId;
    private final KinesisAsyncClient kinesis;
    private StartingPosition initialPosition;
    private final BiConsumer<Void, Throwable> reSubscriptionHandler;
    private volatile State state = State.INITIALIZED;
    private final CompletableFuture<Void> done = new CompletableFuture<>();
    private final ShardEventsSubscriber eventsSubscriber = new ShardEventsSubscriber();
    private final AtomicInteger inFlight = new AtomicInteger();

    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscriber$ShardEventsSubscriber.class */
    private class ShardEventsSubscriber implements Subscriber<SubscribeToShardEventStream>, SubscribeToShardResponseHandler.Visitor {
        String sequenceNumber;
        Subscription subscription;

        private ShardEventsSubscriber() {
        }

        void cancel() {
            if (this.subscription != null) {
                this.subscription.cancel();
            }
            this.subscription = null;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            if (EFOShardSubscriber.this.state == State.STOPPED) {
                cancel();
            } else if (EFOShardSubscriber.this.inFlight.get() < EFOShardSubscriber.this.pool.getMaxCapacityPerShard()) {
                subscription.request(1L);
            }
        }

        public void visit(SubscribeToShardEvent subscribeToShardEvent) {
            EFOShardSubscriber.this.pool.enqueueEvent(EFOShardSubscriber.this.shardId, subscribeToShardEvent);
            this.sequenceNumber = subscribeToShardEvent.continuationSequenceNumber();
            int maxCapacityPerShard = EFOShardSubscriber.this.pool.getMaxCapacityPerShard() - EFOShardSubscriber.this.inFlight.incrementAndGet();
            Preconditions.checkState(maxCapacityPerShard >= 0, "Exceeded in-flight limit");
            if (maxCapacityPerShard <= 0 || this.subscription == null) {
                return;
            }
            this.subscription.request(1L);
        }

        public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) {
            subscribeToShardEventStream.accept(this);
        }

        public void onError(Throwable th) {
            EFOShardSubscriber.LOG.warn("Pool {} - shard {} subscriber got error", new Object[]{EFOShardSubscriber.this.pool.getPoolId(), EFOShardSubscriber.this.shardId, th});
        }

        public void onComplete() {
            this.subscription = null;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscriber$State.class */
    enum State {
        INITIALIZED,
        RUNNING,
        PAUSED,
        STOPPED
    }

    private static boolean isRetryable(Throwable th) {
        SdkException unwrapCompletionException = unwrapCompletionException(th);
        if ((unwrapCompletionException instanceof SdkException) && unwrapCompletionException.retryable()) {
            return true;
        }
        Throwable rootCause = Throwables.getRootCause(unwrapCompletionException);
        return (rootCause instanceof ClosedChannelException) || (rootCause instanceof TimeoutException) || (rootCause instanceof ChannelException);
    }

    private static Throwable unwrapCompletionException(Throwable th) {
        Throwable th2 = th;
        while (true) {
            Throwable th3 = th2;
            if (!(th3 instanceof CompletionException)) {
                return th3;
            }
            Throwable cause = th3.getCause();
            if (cause == null) {
                return th3;
            }
            th2 = cause;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EFOShardSubscriber(EFOShardSubscribersPool eFOShardSubscribersPool, String str, String str2, KinesisAsyncClient kinesisAsyncClient, int i) {
        this.pool = eFOShardSubscribersPool;
        this.consumerArn = str2;
        this.shardId = str;
        this.kinesis = kinesisAsyncClient;
        this.reSubscriptionHandler = (r9, th) -> {
            this.eventsSubscriber.cancel();
            if (th != null && !isRetryable(th)) {
                this.done.completeExceptionally(th);
                return;
            }
            if (th != null && isRetryable(th) && this.state != State.STOPPED) {
                String str3 = this.eventsSubscriber.sequenceNumber;
                if (this.inFlight.get() == eFOShardSubscribersPool.getMaxCapacityPerShard()) {
                    this.state = State.PAUSED;
                    return;
                } else if (str3 != null) {
                    eFOShardSubscribersPool.delayedTask(() -> {
                        return internalReSubscribe(str3);
                    }, i);
                    return;
                } else {
                    eFOShardSubscribersPool.delayedTask(() -> {
                        return internalSubscribe(this.initialPosition);
                    }, i);
                    return;
                }
            }
            String str4 = this.eventsSubscriber.sequenceNumber;
            if (th == null && this.state != State.STOPPED && str4 != null) {
                internalReSubscribe(str4);
                return;
            }
            if (th == null && this.state != State.STOPPED && str4 == null) {
                this.done.complete(null);
                return;
            }
            String format = String.format("Pool %s - unknown case which is likely a bug: state=%s seqnum=%s", eFOShardSubscribersPool.getPoolId(), this.state, str4);
            LOG.warn(format);
            this.done.completeExceptionally(new IllegalStateException(format));
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> subscribe(StartingPosition startingPosition) {
        Preconditions.checkState(this.state == State.INITIALIZED, "Subscriber was already started");
        this.initialPosition = startingPosition;
        return internalSubscribe(startingPosition);
    }

    private CompletableFuture<Void> internalReSubscribe(String str) {
        return internalSubscribe((StartingPosition) StartingPosition.builder().type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).sequenceNumber(str).build());
    }

    private CompletableFuture<Void> internalSubscribe(StartingPosition startingPosition) {
        SubscribeToShardRequest subscribeRequest = subscribeRequest(startingPosition);
        LOG.info("Pool {} - shard {} starting subscribe request {}", new Object[]{this.pool.getPoolId(), this.shardId, subscribeRequest});
        try {
            this.kinesis.subscribeToShard(subscribeRequest, responseHandler()).whenComplete((BiConsumer) this.reSubscriptionHandler);
            return this.done;
        } catch (Exception e) {
            this.done.completeExceptionally(e);
            return this.done;
        }
    }

    private SubscribeToShardRequest subscribeRequest(StartingPosition startingPosition) {
        return (SubscribeToShardRequest) SubscribeToShardRequest.builder().consumerARN(this.consumerArn).shardId(this.shardId).startingPosition(startingPosition).build();
    }

    private SubscribeToShardResponseHandler responseHandler() {
        return ((SubscribeToShardResponseHandler.Builder) SubscribeToShardResponseHandler.builder().subscriber(() -> {
            return this.eventsSubscriber;
        })).build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancel() {
        LOG.info("Pool {} - shard {} cancelling", this.pool.getPoolId(), this.shardId);
        if (this.state == State.STOPPED || this.eventsSubscriber == null) {
            return;
        }
        this.eventsSubscriber.cancel();
        this.state = State.STOPPED;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ackEvent() {
        Subscription subscription;
        int andDecrement = this.inFlight.getAndDecrement();
        if (this.state == State.PAUSED) {
            this.state = State.RUNNING;
            internalReSubscribe((String) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(this.eventsSubscriber.sequenceNumber));
        } else {
            if (andDecrement != this.pool.getMaxCapacityPerShard() || (subscription = this.eventsSubscriber.subscription) == null) {
                return;
            }
            subscription.request(1L);
        }
    }
}
