package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisException;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxySyncV2Interface;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/StreamConsumerRegistrar.class */
public class StreamConsumerRegistrar {
    private static final Logger LOG = LoggerFactory.getLogger(StreamConsumerRegistrar.class);
    private final KinesisProxySyncV2Interface kinesisProxyV2Interface;
    private final FanOutRecordPublisherConfiguration configuration;
    private final FullJitterBackoff backoff;

    public StreamConsumerRegistrar(KinesisProxySyncV2Interface kinesisProxySyncV2Interface, FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration, FullJitterBackoff fullJitterBackoff) {
        this.kinesisProxyV2Interface = (KinesisProxySyncV2Interface) Preconditions.checkNotNull(kinesisProxySyncV2Interface);
        this.configuration = (FanOutRecordPublisherConfiguration) Preconditions.checkNotNull(fanOutRecordPublisherConfiguration);
        this.backoff = (FullJitterBackoff) Preconditions.checkNotNull(fullJitterBackoff);
    }

    public String registerStreamConsumer(String str, String str2) throws Exception {
        LOG.debug("Registering stream consumer - {}::{}", str, str2);
        int i = 1;
        if (this.configuration.getEfoRegistrationType() == ConsumerConfigConstants.EFORegistrationType.LAZY) {
            i = 1 + 1;
            registrationBackoff(this.configuration, this.backoff, 1);
        }
        String streamARN = this.kinesisProxyV2Interface.describeStreamSummary(str).streamDescriptionSummary().streamARN();
        LOG.debug("Found stream ARN - {}", streamARN);
        Optional<DescribeStreamConsumerResponse> describeStreamConsumer = describeStreamConsumer(streamARN, str2);
        if (!describeStreamConsumer.isPresent()) {
            invokeIgnoringResourceInUse(() -> {
                return this.kinesisProxyV2Interface.registerStreamConsumer(streamARN, str2);
            });
        }
        String waitForConsumerToBecomeActive = waitForConsumerToBecomeActive(describeStreamConsumer.orElse(null), streamARN, str2, i);
        LOG.debug("Using stream consumer - {}", waitForConsumerToBecomeActive);
        return waitForConsumerToBecomeActive;
    }

    public void deregisterStreamConsumer(String str) throws Exception {
        LOG.debug("Deregistering stream consumer - {}", str);
        String streamConsumerArn = getStreamConsumerArn(str);
        int i = 1 + 1;
        deregistrationBackoff(this.configuration, this.backoff, 1);
        Optional<DescribeStreamConsumerResponse> describeStreamConsumer = describeStreamConsumer(streamConsumerArn);
        if (describeStreamConsumer.isPresent() && describeStreamConsumer.get().consumerDescription().consumerStatus() != ConsumerStatus.DELETING) {
            invokeIgnoringResourceInUse(() -> {
                return this.kinesisProxyV2Interface.deregisterStreamConsumer(streamConsumerArn);
            });
        }
        waitForConsumerToDeregister(describeStreamConsumer.orElse(null), streamConsumerArn, i);
        LOG.debug("Deregistered stream consumer - {}", streamConsumerArn);
    }

    public void close() {
        this.kinesisProxyV2Interface.close();
    }

    @VisibleForTesting
    void registrationBackoff(FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration, FullJitterBackoff fullJitterBackoff, int i) throws InterruptedException {
        fullJitterBackoff.sleep(fullJitterBackoff.calculateFullJitterBackoff(fanOutRecordPublisherConfiguration.getRegisterStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getRegisterStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getRegisterStreamExpConstant(), i));
    }

    @VisibleForTesting
    void deregistrationBackoff(FanOutRecordPublisherConfiguration fanOutRecordPublisherConfiguration, FullJitterBackoff fullJitterBackoff, int i) throws InterruptedException {
        fullJitterBackoff.sleep(fullJitterBackoff.calculateFullJitterBackoff(fanOutRecordPublisherConfiguration.getDeregisterStreamBaseBackoffMillis(), fanOutRecordPublisherConfiguration.getDeregisterStreamMaxBackoffMillis(), fanOutRecordPublisherConfiguration.getDeregisterStreamExpConstant(), i));
    }

    private String waitForConsumerToBecomeActive(@Nullable DescribeStreamConsumerResponse describeStreamConsumerResponse, String str, String str2, int i) throws InterruptedException, ExecutionException {
        int i2 = i;
        Instant now = Instant.now();
        Duration registerStreamConsumerTimeout = this.configuration.getRegisterStreamConsumerTimeout();
        DescribeStreamConsumerResponse describeStreamConsumerResponse2 = describeStreamConsumerResponse;
        do {
            if (describeStreamConsumerResponse2 != null && describeStreamConsumerResponse2.consumerDescription().consumerStatus() == ConsumerStatus.ACTIVE) {
                return describeStreamConsumerResponse2.consumerDescription().consumerARN();
            }
            LOG.debug("Waiting for stream consumer to become active, attempt {} - {} on {}", new Object[]{Integer.valueOf(i2), str2, str});
            int i3 = i2;
            i2++;
            registrationBackoff(this.configuration, this.backoff, i3);
            describeStreamConsumerResponse2 = this.kinesisProxyV2Interface.describeStreamConsumer(str, str2);
        } while (Duration.between(now, Instant.now()).compareTo(registerStreamConsumerTimeout) <= 0);
        throw new FlinkKinesisException.FlinkKinesisTimeoutException("Timeout waiting for stream consumer to become active: " + str2 + " on " + str);
    }

    private void waitForConsumerToDeregister(@Nullable DescribeStreamConsumerResponse describeStreamConsumerResponse, String str, int i) throws Exception {
        int i2 = i;
        Instant now = Instant.now();
        Duration deregisterStreamConsumerTimeout = this.configuration.getDeregisterStreamConsumerTimeout();
        Optional<DescribeStreamConsumerResponse> ofNullable = Optional.ofNullable(describeStreamConsumerResponse);
        while (ofNullable.isPresent() && ofNullable.get().consumerDescription().consumerStatus() != ConsumerStatus.DELETING) {
            LOG.debug("Waiting for stream consumer to deregister, attempt {} - {}", Integer.valueOf(i2), str);
            int i3 = i2;
            i2++;
            deregistrationBackoff(this.configuration, this.backoff, i3);
            ofNullable = describeStreamConsumer(str);
            if (Duration.between(now, Instant.now()).compareTo(deregisterStreamConsumerTimeout) > 0) {
                throw new FlinkKinesisException.FlinkKinesisTimeoutException("Timeout waiting for stream consumer to deregister: " + str);
            }
        }
    }

    private Optional<DescribeStreamConsumerResponse> describeStreamConsumer(String str, String str2) throws Exception {
        return describeStreamConsumer(() -> {
            return this.kinesisProxyV2Interface.describeStreamConsumer(str, str2);
        });
    }

    private Optional<DescribeStreamConsumerResponse> describeStreamConsumer(String str) throws Exception {
        return describeStreamConsumer(() -> {
            return this.kinesisProxyV2Interface.describeStreamConsumer(str);
        });
    }

    private Optional<DescribeStreamConsumerResponse> describeStreamConsumer(Callable<DescribeStreamConsumerResponse> callable) throws Exception {
        try {
            return Optional.ofNullable(callable.call());
        } catch (Exception e) {
            if (isResourceNotFound(e)) {
                return Optional.empty();
            }
            throw e;
        }
    }

    private <T> void invokeIgnoringResourceInUse(Callable<T> callable) throws Exception {
        try {
            callable.call();
        } catch (Exception e) {
            if (!isResourceInUse(e)) {
                throw e;
            }
        }
    }

    private boolean isResourceNotFound(Exception exc) {
        return ExceptionUtils.findThrowable(exc, ResourceNotFoundException.class).isPresent();
    }

    private boolean isResourceInUse(Exception exc) {
        return ExceptionUtils.findThrowable(exc, ResourceInUseException.class).isPresent();
    }

    private String getStreamConsumerArn(String str) {
        Optional<String> streamConsumerArn = this.configuration.getStreamConsumerArn(str);
        if (streamConsumerArn.isPresent()) {
            return streamConsumerArn.get();
        }
        throw new IllegalArgumentException("Stream consumer ARN not found for stream: " + str);
    }
}
