package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kinesis.shaded.io.netty.handler.timeout.ReadTimeoutException;
import org.apache.flink.kinesis.shaded.org.reactivestreams.Subscriber;
import org.apache.flink.kinesis.shaded.org.reactivestreams.Subscription;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.StartingPosition;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.class */
public class FanOutShardSubscriber {
    private static final int QUEUE_CAPACITY = 2;
    private final BlockingQueue<FanOutSubscriptionEvent> queue;
    private final AtomicReference<FanOutSubscriptionEvent> subscriptionErrorEvent;
    private final KinesisProxyV2Interface kinesis;
    private final String consumerArn;
    private final String shardId;
    private final Duration subscribeToShardTimeout;
    private final Duration queueWaitTimeout;
    private static final Logger LOG = LoggerFactory.getLogger(FanOutShardSubscriber.class);
    private static final Duration DEFAULT_QUEUE_TIMEOUT = Duration.ofSeconds(35);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber$FanOutShardSubscription.class */
    public class FanOutShardSubscription implements Subscriber<SubscribeToShardEventStream> {
        private Subscription subscription;
        private volatile boolean cancelled;
        private final CountDownLatch waitForSubscriptionLatch;

        private FanOutShardSubscription(CountDownLatch countDownLatch) {
            this.cancelled = false;
            this.waitForSubscriptionLatch = countDownLatch;
        }

        void requestRecord() {
            if (this.cancelled) {
                return;
            }
            FanOutShardSubscriber.LOG.debug("Requesting more records from EFO subscription - {} ({})", FanOutShardSubscriber.this.shardId, FanOutShardSubscriber.this.consumerArn);
            this.subscription.request(1L);
        }

        @Override // org.apache.flink.kinesis.shaded.org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            this.waitForSubscriptionLatch.countDown();
        }

        @Override // org.apache.flink.kinesis.shaded.org.reactivestreams.Subscriber
        public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) {
            subscribeToShardEventStream.accept(new SubscribeToShardResponseHandler.Visitor() { // from class: org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutShardSubscription.1
                @Override // org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler.Visitor
                public void visit(SubscribeToShardEvent subscribeToShardEvent) {
                    FanOutShardSubscription.this.enqueueEvent(new SubscriptionNextEvent(subscribeToShardEvent));
                }
            });
        }

        @Override // org.apache.flink.kinesis.shaded.org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            FanOutShardSubscriber.LOG.debug("Error occurred on EFO subscription: {} - ({}).  {} ({})", new Object[]{th.getClass().getName(), th.getMessage(), FanOutShardSubscriber.this.shardId, FanOutShardSubscriber.this.consumerArn, th});
            SubscriptionErrorEvent subscriptionErrorEvent = new SubscriptionErrorEvent(th);
            if (FanOutShardSubscriber.this.subscriptionErrorEvent.get() == null) {
                FanOutShardSubscriber.this.subscriptionErrorEvent.set(subscriptionErrorEvent);
            } else {
                FanOutShardSubscriber.LOG.warn("Error already queued. Ignoring subsequent exception.", th);
            }
            cancelSubscription();
            if (FanOutShardSubscriber.this.queue.isEmpty()) {
                FanOutShardSubscriber.this.queue.offer(subscriptionErrorEvent);
            }
        }

        @Override // org.apache.flink.kinesis.shaded.org.reactivestreams.Subscriber
        public void onComplete() {
            FanOutShardSubscriber.LOG.debug("EFO subscription complete - {} ({})", FanOutShardSubscriber.this.shardId, FanOutShardSubscriber.this.consumerArn);
            enqueueEvent(new SubscriptionCompleteEvent());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void cancelSubscription() {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
            if (this.subscription != null) {
                this.subscription.cancel();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void enqueueEvent(FanOutSubscriptionEvent fanOutSubscriptionEvent) {
            if (this.cancelled) {
                return;
            }
            try {
                if (!FanOutShardSubscriber.this.queue.offer(fanOutSubscriptionEvent, FanOutShardSubscriber.this.queueWaitTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                    String str = "Timed out enqueuing event " + fanOutSubscriptionEvent.getClass().getSimpleName() + " - " + FanOutShardSubscriber.this.shardId + " (" + FanOutShardSubscriber.this.consumerArn + ")";
                    FanOutShardSubscriber.LOG.error(str);
                    onError(new RecoverableFanOutSubscriberException(new TimeoutException(str)));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber$FanOutSubscriberException.class */
    public static abstract class FanOutSubscriberException extends Exception {
        private static final long serialVersionUID = -3899472233945299730L;

        public FanOutSubscriberException(Throwable th) {
            super(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber$FanOutSubscriberInterruptedException.class */
    public static class FanOutSubscriberInterruptedException extends FanOutSubscriberException {
        private static final long serialVersionUID = -2783477408630427189L;

        public FanOutSubscriberInterruptedException(Throwable th) {
            super(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber$FanOutSubscriptionEvent.class */
    public interface FanOutSubscriptionEvent {
        default boolean isSubscribeToShardEvent() {
            return false;
        }

        default boolean isSubscriptionComplete() {
            return false;
        }

        default SubscribeToShardEvent getSubscribeToShardEvent() {
            throw new UnsupportedOperationException("This event does not support getSubscribeToShardEvent()");
        }

        default Throwable getThrowable() {
            throw new UnsupportedOperationException("This event does not support getThrowable()");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber$RecoverableFanOutSubscriberException.class */
    public static class RecoverableFanOutSubscriberException extends FanOutSubscriberException {
        private static final long serialVersionUID = -3223347557038294482L;

        public RecoverableFanOutSubscriberException(Throwable th) {
            super(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber$RetryableFanOutSubscriberException.class */
    public static class RetryableFanOutSubscriberException extends FanOutSubscriberException {
        private static final long serialVersionUID = -2967281117554404883L;

        public RetryableFanOutSubscriberException(Throwable th) {
            super(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber$SubscriptionCompleteEvent.class */
    private static class SubscriptionCompleteEvent implements FanOutSubscriptionEvent {
        private SubscriptionCompleteEvent() {
        }

        @Override // org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriptionEvent
        public boolean isSubscriptionComplete() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber$SubscriptionErrorEvent.class */
    public static class SubscriptionErrorEvent implements FanOutSubscriptionEvent {
        private final Throwable throwable;

        private SubscriptionErrorEvent(Throwable th) {
            this.throwable = th;
        }

        @Override // org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriptionEvent
        public Throwable getThrowable() {
            return this.throwable;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber$SubscriptionNextEvent.class */
    private static class SubscriptionNextEvent implements FanOutSubscriptionEvent {
        private final SubscribeToShardEvent subscribeToShardEvent;

        private SubscriptionNextEvent(SubscribeToShardEvent subscribeToShardEvent) {
            this.subscribeToShardEvent = subscribeToShardEvent;
        }

        @Override // org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriptionEvent
        public boolean isSubscribeToShardEvent() {
            return true;
        }

        @Override // org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriptionEvent
        public SubscribeToShardEvent getSubscribeToShardEvent() {
            return this.subscribeToShardEvent;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FanOutShardSubscriber(String str, String str2, KinesisProxyV2Interface kinesisProxyV2Interface, Duration duration) {
        this(str, str2, kinesisProxyV2Interface, duration, DEFAULT_QUEUE_TIMEOUT);
    }

    @VisibleForTesting
    FanOutShardSubscriber(String str, String str2, KinesisProxyV2Interface kinesisProxyV2Interface, Duration duration, Duration duration2) {
        this.queue = new LinkedBlockingQueue(2);
        this.subscriptionErrorEvent = new AtomicReference<>();
        this.kinesis = (KinesisProxyV2Interface) Preconditions.checkNotNull(kinesisProxyV2Interface);
        this.consumerArn = (String) Preconditions.checkNotNull(str);
        this.shardId = (String) Preconditions.checkNotNull(str2);
        this.subscribeToShardTimeout = duration;
        this.queueWaitTimeout = duration2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean subscribeToShardAndConsumeRecords(StartingPosition startingPosition, Consumer<SubscribeToShardEvent> consumer) throws InterruptedException, FanOutSubscriberException {
        LOG.debug("Subscribing to shard {} ({})", this.shardId, this.consumerArn);
        try {
            return consumeAllRecordsFromKinesisShard(consumer, openSubscriptionToShard(startingPosition));
        } catch (FanOutSubscriberException e) {
            if (e.getCause() instanceof ResourceNotFoundException) {
                throw ((ResourceNotFoundException) e.getCause());
            }
            throw e;
        }
    }

    private FanOutShardSubscription openSubscriptionToShard(StartingPosition startingPosition) throws FanOutSubscriberException, InterruptedException {
        SubscribeToShardRequest subscribeToShardRequest = (SubscribeToShardRequest) SubscribeToShardRequest.builder().consumerARN(this.consumerArn).shardId(this.shardId).startingPosition(startingPosition).mo2944build();
        AtomicReference atomicReference = new AtomicReference();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        FanOutShardSubscription fanOutShardSubscription = new FanOutShardSubscription(countDownLatch);
        this.kinesis.subscribeToShard(subscribeToShardRequest, SubscribeToShardResponseHandler.builder().onError(th -> {
            if (countDownLatch.getCount() > 0) {
                atomicReference.set(th);
                countDownLatch.countDown();
            }
        }).subscriber(() -> {
            return fanOutShardSubscription;
        }).build());
        if (!countDownLatch.await(this.subscribeToShardTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
            String str = "Timed out acquiring subscription - " + this.shardId + " (" + this.consumerArn + ")";
            LOG.error(str);
            fanOutShardSubscription.cancelSubscription();
            handleError(new RecoverableFanOutSubscriberException(new TimeoutException(str)));
        }
        Throwable th2 = (Throwable) atomicReference.get();
        if (th2 != null) {
            handleError(th2);
        }
        LOG.debug("Acquired subscription - {} ({})", this.shardId, this.consumerArn);
        fanOutShardSubscription.requestRecord();
        return fanOutShardSubscription;
    }

    private void handleError(Throwable th) throws FanOutSubscriberException {
        Throwable cause = ((th instanceof CompletionException) || (th instanceof ExecutionException)) ? th.getCause() : th;
        LOG.warn("Error occurred on EFO subscription: {} - ({}).  {} ({})", new Object[]{th.getClass().getName(), th.getMessage(), this.shardId, this.consumerArn, cause});
        if (isInterrupted(th)) {
            throw new FanOutSubscriberInterruptedException(th);
        }
        if (cause instanceof FanOutSubscriberException) {
            throw ((FanOutSubscriberException) cause);
        }
        if (!(cause instanceof ReadTimeoutException)) {
            throw new RetryableFanOutSubscriberException(cause);
        }
        throw new RecoverableFanOutSubscriberException(cause);
    }

    private boolean isInterrupted(Throwable th) {
        Throwable th2 = th;
        while (true) {
            Throwable th3 = th2;
            if (th3 == null) {
                return false;
            }
            if (th3 instanceof InterruptedException) {
                return true;
            }
            th2 = th3.getCause();
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:15:0x00a8, code lost:
    
        r7.cancelSubscription();
     */
    /* JADX WARN: Code restructure failed: missing block: B:16:0x00ae, code lost:
    
        return r9;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private boolean consumeAllRecordsFromKinesisShard(java.util.function.Consumer<org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent> r6, org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutShardSubscription r7) throws java.lang.InterruptedException, org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriberException {
        /*
            r5 = this;
            r0 = 1
            r9 = r0
        L3:
            r0 = r5
            java.util.concurrent.atomic.AtomicReference<org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber$FanOutSubscriptionEvent> r0 = r0.subscriptionErrorEvent
            java.lang.Object r0 = r0.get()
            if (r0 == 0) goto L1c
            r0 = r5
            java.util.concurrent.atomic.AtomicReference<org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber$FanOutSubscriptionEvent> r0 = r0.subscriptionErrorEvent
            java.lang.Object r0 = r0.get()
            org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber$FanOutSubscriptionEvent r0 = (org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriptionEvent) r0
            r10 = r0
            goto L34
        L1c:
            r0 = r5
            java.util.concurrent.BlockingQueue<org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber$FanOutSubscriptionEvent> r0 = r0.queue
            r1 = r5
            java.time.Duration r1 = r1.queueWaitTimeout
            long r1 = r1.toMillis()
            java.util.concurrent.TimeUnit r2 = java.util.concurrent.TimeUnit.MILLISECONDS
            java.lang.Object r0 = r0.poll(r1, r2)
            org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber$FanOutSubscriptionEvent r0 = (org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriptionEvent) r0
            r10 = r0
        L34:
            r0 = r10
            if (r0 != 0) goto L52
            org.slf4j.Logger r0 = org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.LOG
            java.lang.String r1 = "Timed out polling events from network, reacquiring subscription - {} ({})"
            r2 = r5
            java.lang.String r2 = r2.shardId
            r3 = r5
            java.lang.String r3 = r3.consumerArn
            r0.info(r1, r2, r3)
            r0 = 0
            r9 = r0
            goto La8
        L52:
            r0 = r10
            boolean r0 = r0.isSubscribeToShardEvent()
            if (r0 == 0) goto L87
            r0 = r7
            r0.requestRecord()
            r0 = r10
            org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent r0 = r0.getSubscribeToShardEvent()
            r11 = r0
            r0 = r11
            java.lang.String r0 = r0.continuationSequenceNumber()
            r8 = r0
            r0 = r11
            java.util.List r0 = r0.records()
            boolean r0 = r0.isEmpty()
            if (r0 != 0) goto L84
            r0 = r6
            r1 = r11
            r0.accept(r1)
        L84:
            goto La4
        L87:
            r0 = r10
            boolean r0 = r0.isSubscriptionComplete()
            if (r0 == 0) goto L93
            r0 = 0
            return r0
        L93:
            r0 = r5
            r1 = r10
            java.lang.Throwable r1 = r1.getThrowable()
            r0.handleError(r1)
            r0 = 0
            r9 = r0
            goto La8
        La4:
            r0 = r8
            if (r0 != 0) goto L3
        La8:
            r0 = r7
            org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutShardSubscription.access$100(r0)
            r0 = r9
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(java.util.function.Consumer, org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber$FanOutShardSubscription):boolean");
    }
}
