/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kinesis.shaded.io.netty.handler.timeout.ReadTimeoutException;
import org.apache.flink.kinesis.shaded.org.reactivestreams.Subscriber;
import org.apache.flink.kinesis.shaded.org.reactivestreams.Subscription;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.StartingPosition;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class FanOutShardSubscriber {
    private static final Logger LOG = LoggerFactory.getLogger(FanOutShardSubscriber.class);
    private static final int QUEUE_CAPACITY = 2;
    private static final Duration DEFAULT_QUEUE_TIMEOUT = Duration.ofSeconds(35L);
    private final BlockingQueue<FanOutSubscriptionEvent> queue = new LinkedBlockingQueue<FanOutSubscriptionEvent>(2);
    private final AtomicReference<FanOutSubscriptionEvent> subscriptionErrorEvent = new AtomicReference();
    private final KinesisProxyV2Interface kinesis;
    private final String consumerArn;
    private final String shardId;
    private final Duration subscribeToShardTimeout;
    private final Duration queueWaitTimeout;

    FanOutShardSubscriber(String consumerArn, String shardId, KinesisProxyV2Interface kinesis, Duration subscribeToShardTimeout) {
        this(consumerArn, shardId, kinesis, subscribeToShardTimeout, DEFAULT_QUEUE_TIMEOUT);
    }

    @VisibleForTesting
    FanOutShardSubscriber(String consumerArn, String shardId, KinesisProxyV2Interface kinesis, Duration subscribeToShardTimeout, Duration queueWaitTimeout) {
        this.kinesis = (KinesisProxyV2Interface)Preconditions.checkNotNull((Object)kinesis);
        this.consumerArn = (String)Preconditions.checkNotNull((Object)consumerArn);
        this.shardId = (String)Preconditions.checkNotNull((Object)shardId);
        this.subscribeToShardTimeout = subscribeToShardTimeout;
        this.queueWaitTimeout = queueWaitTimeout;
    }

    boolean subscribeToShardAndConsumeRecords(StartingPosition startingPosition, Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException, FanOutSubscriberException {
        FanOutShardSubscription subscription;
        LOG.debug("Subscribing to shard {} ({})", (Object)this.shardId, (Object)this.consumerArn);
        try {
            subscription = this.openSubscriptionToShard(startingPosition);
        }
        catch (FanOutSubscriberException ex) {
            if (ex.getCause() instanceof ResourceNotFoundException) {
                throw (ResourceNotFoundException)ex.getCause();
            }
            throw ex;
        }
        return this.consumeAllRecordsFromKinesisShard(eventConsumer, subscription);
    }

    private FanOutShardSubscription openSubscriptionToShard(StartingPosition startingPosition) throws FanOutSubscriberException, InterruptedException {
        Throwable throwable;
        SubscribeToShardRequest request = (SubscribeToShardRequest)SubscribeToShardRequest.builder().consumerARN(this.consumerArn).shardId(this.shardId).startingPosition(startingPosition).build();
        AtomicReference exception = new AtomicReference();
        CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
        FanOutShardSubscription subscription = new FanOutShardSubscription(waitForSubscriptionLatch);
        SubscribeToShardResponseHandler responseHandler = ((SubscribeToShardResponseHandler.Builder)((SubscribeToShardResponseHandler.Builder)SubscribeToShardResponseHandler.builder().onError(e -> {
            if (waitForSubscriptionLatch.getCount() > 0L) {
                exception.set(e);
                waitForSubscriptionLatch.countDown();
            }
        })).subscriber(() -> subscription)).build();
        this.kinesis.subscribeToShard(request, responseHandler);
        boolean subscriptionEstablished = waitForSubscriptionLatch.await(this.subscribeToShardTimeout.toMillis(), TimeUnit.MILLISECONDS);
        if (!subscriptionEstablished) {
            String errorMessage = "Timed out acquiring subscription - " + this.shardId + " (" + this.consumerArn + ")";
            LOG.error(errorMessage);
            subscription.cancelSubscription();
            this.handleError(new RecoverableFanOutSubscriberException(new TimeoutException(errorMessage)));
        }
        if ((throwable = (Throwable)exception.get()) != null) {
            this.handleError(throwable);
        }
        LOG.debug("Acquired subscription - {} ({})", (Object)this.shardId, (Object)this.consumerArn);
        subscription.requestRecord();
        return subscription;
    }

    private void handleError(Throwable throwable) throws FanOutSubscriberException {
        Throwable cause = throwable instanceof CompletionException || throwable instanceof ExecutionException ? throwable.getCause() : throwable;
        LOG.warn("Error occurred on EFO subscription: {} - ({}).  {} ({})", new Object[]{throwable.getClass().getName(), throwable.getMessage(), this.shardId, this.consumerArn, cause});
        if (this.isInterrupted(throwable)) {
            throw new FanOutSubscriberInterruptedException(throwable);
        }
        if (cause instanceof FanOutSubscriberException) {
            throw (FanOutSubscriberException)cause;
        }
        if (cause instanceof ReadTimeoutException) {
            throw new RecoverableFanOutSubscriberException(cause);
        }
        throw new RetryableFanOutSubscriberException(cause);
    }

    private boolean isInterrupted(Throwable throwable) {
        for (Throwable cause = throwable; cause != null; cause = cause.getCause()) {
            if (!(cause instanceof InterruptedException)) continue;
            return true;
        }
        return false;
    }

    private boolean consumeAllRecordsFromKinesisShard(Consumer<SubscribeToShardEvent> eventConsumer, FanOutShardSubscription subscription) throws InterruptedException, FanOutSubscriberException {
        String continuationSequenceNumber;
        boolean result = true;
        do {
            FanOutSubscriptionEvent subscriptionEvent;
            if ((subscriptionEvent = this.subscriptionErrorEvent.get() != null ? this.subscriptionErrorEvent.get() : this.queue.poll(this.queueWaitTimeout.toMillis(), TimeUnit.MILLISECONDS)) == null) {
                LOG.info("Timed out polling events from network, reacquiring subscription - {} ({})", (Object)this.shardId, (Object)this.consumerArn);
                result = false;
                break;
            }
            if (subscriptionEvent.isSubscribeToShardEvent()) {
                subscription.requestRecord();
                SubscribeToShardEvent event = subscriptionEvent.getSubscribeToShardEvent();
                continuationSequenceNumber = event.continuationSequenceNumber();
                if (event.records().isEmpty()) continue;
                eventConsumer.accept(event);
                continue;
            }
            if (subscriptionEvent.isSubscriptionComplete()) {
                result = false;
                break;
            }
            this.handleError(subscriptionEvent.getThrowable());
            result = false;
            break;
        } while (continuationSequenceNumber != null);
        subscription.cancelSubscription();
        return result;
    }

    private static class SubscriptionNextEvent
    implements FanOutSubscriptionEvent {
        private final SubscribeToShardEvent subscribeToShardEvent;

        private SubscriptionNextEvent(SubscribeToShardEvent subscribeToShardEvent) {
            this.subscribeToShardEvent = subscribeToShardEvent;
        }

        @Override
        public boolean isSubscribeToShardEvent() {
            return true;
        }

        @Override
        public SubscribeToShardEvent getSubscribeToShardEvent() {
            return this.subscribeToShardEvent;
        }
    }

    private static class SubscriptionErrorEvent
    implements FanOutSubscriptionEvent {
        private final Throwable throwable;

        private SubscriptionErrorEvent(Throwable throwable) {
            this.throwable = throwable;
        }

        @Override
        public Throwable getThrowable() {
            return this.throwable;
        }
    }

    private static class SubscriptionCompleteEvent
    implements FanOutSubscriptionEvent {
        private SubscriptionCompleteEvent() {
        }

        @Override
        public boolean isSubscriptionComplete() {
            return true;
        }
    }

    private static interface FanOutSubscriptionEvent {
        default public boolean isSubscribeToShardEvent() {
            return false;
        }

        default public boolean isSubscriptionComplete() {
            return false;
        }

        default public SubscribeToShardEvent getSubscribeToShardEvent() {
            throw new UnsupportedOperationException("This event does not support getSubscribeToShardEvent()");
        }

        default public Throwable getThrowable() {
            throw new UnsupportedOperationException("This event does not support getThrowable()");
        }
    }

    static class FanOutSubscriberInterruptedException
    extends FanOutSubscriberException {
        private static final long serialVersionUID = -2783477408630427189L;

        public FanOutSubscriberInterruptedException(Throwable cause) {
            super(cause);
        }
    }

    static class RecoverableFanOutSubscriberException
    extends FanOutSubscriberException {
        private static final long serialVersionUID = -3223347557038294482L;

        public RecoverableFanOutSubscriberException(Throwable cause) {
            super(cause);
        }
    }

    static class RetryableFanOutSubscriberException
    extends FanOutSubscriberException {
        private static final long serialVersionUID = -2967281117554404883L;

        public RetryableFanOutSubscriberException(Throwable cause) {
            super(cause);
        }
    }

    static abstract class FanOutSubscriberException
    extends Exception {
        private static final long serialVersionUID = -3899472233945299730L;

        public FanOutSubscriberException(Throwable cause) {
            super(cause);
        }
    }

    private class FanOutShardSubscription
    implements Subscriber<SubscribeToShardEventStream> {
        private Subscription subscription;
        private volatile boolean cancelled = false;
        private final CountDownLatch waitForSubscriptionLatch;

        private FanOutShardSubscription(CountDownLatch waitForSubscriptionLatch) {
            this.waitForSubscriptionLatch = waitForSubscriptionLatch;
        }

        void requestRecord() {
            if (!this.cancelled) {
                LOG.debug("Requesting more records from EFO subscription - {} ({})", (Object)FanOutShardSubscriber.this.shardId, (Object)FanOutShardSubscriber.this.consumerArn);
                this.subscription.request(1L);
            }
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            this.waitForSubscriptionLatch.countDown();
        }

        @Override
        public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) {
            subscribeToShardEventStream.accept(new SubscribeToShardResponseHandler.Visitor(){

                @Override
                public void visit(SubscribeToShardEvent event) {
                    FanOutShardSubscription.this.enqueueEvent(new SubscriptionNextEvent(event));
                }
            });
        }

        @Override
        public void onError(Throwable throwable) {
            LOG.debug("Error occurred on EFO subscription: {} - ({}).  {} ({})", new Object[]{throwable.getClass().getName(), throwable.getMessage(), FanOutShardSubscriber.this.shardId, FanOutShardSubscriber.this.consumerArn, throwable});
            SubscriptionErrorEvent subscriptionErrorEvent = new SubscriptionErrorEvent(throwable);
            if (FanOutShardSubscriber.this.subscriptionErrorEvent.get() == null) {
                FanOutShardSubscriber.this.subscriptionErrorEvent.set(subscriptionErrorEvent);
            } else {
                LOG.warn("Error already queued. Ignoring subsequent exception.", throwable);
            }
            this.cancelSubscription();
            if (FanOutShardSubscriber.this.queue.isEmpty()) {
                FanOutShardSubscriber.this.queue.offer(subscriptionErrorEvent);
            }
        }

        @Override
        public void onComplete() {
            LOG.debug("EFO subscription complete - {} ({})", (Object)FanOutShardSubscriber.this.shardId, (Object)FanOutShardSubscriber.this.consumerArn);
            this.enqueueEvent(new SubscriptionCompleteEvent());
        }

        private void cancelSubscription() {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
            if (this.subscription != null) {
                this.subscription.cancel();
            }
        }

        private void enqueueEvent(FanOutSubscriptionEvent event) {
            if (this.cancelled) {
                return;
            }
            try {
                if (!FanOutShardSubscriber.this.queue.offer(event, FanOutShardSubscriber.this.queueWaitTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                    String errorMessage = "Timed out enqueuing event " + event.getClass().getSimpleName() + " - " + FanOutShardSubscriber.this.shardId + " (" + FanOutShardSubscriber.this.consumerArn + ")";
                    LOG.error(errorMessage);
                    this.onError(new RecoverableFanOutSubscriberException(new TimeoutException(errorMessage)));
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }
}

