package org.apache.beam.sdk.io.aws.sns;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.model.GetTopicAttributesResult;
import com.amazonaws.services.sns.model.InternalErrorException;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.function.Predicate;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.aws.sns.AutoValue_SnsIO_RetryConfiguration;
import org.apache.beam.sdk.io.aws.sns.AutoValue_SnsIO_Write;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
/* loaded from: input_file:org/apache/beam/sdk/io/aws/sns/SnsIO.class */
public final class SnsIO {

    @AutoValue
    @AutoValue.CopyAnnotations
    /* loaded from: input_file:org/apache/beam/sdk/io/aws/sns/SnsIO$RetryConfiguration.class */
    public static abstract class RetryConfiguration implements Serializable {
        private static final Duration DEFAULT_INITIAL_DURATION = Duration.standardSeconds(5);

        @VisibleForTesting
        static final RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate();

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/aws/sns/SnsIO$RetryConfiguration$Builder.class */
        public static abstract class Builder {
            abstract Builder setMaxAttempts(int i);

            abstract Builder setMaxDuration(Duration duration);

            abstract Builder setInitialDuration(Duration duration);

            abstract Builder setRetryPredicate(RetryPredicate retryPredicate);

            abstract RetryConfiguration build();
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/aws/sns/SnsIO$RetryConfiguration$DefaultRetryPredicate.class */
        private static class DefaultRetryPredicate implements RetryPredicate {
            private static final ImmutableSet<Integer> ELIGIBLE_CODES = ImmutableSet.of(503);

            private DefaultRetryPredicate() {
            }

            @Override // java.util.function.Predicate
            public boolean test(Throwable th) {
                return (th instanceof IOException) || (th instanceof InternalErrorException) || ((th instanceof InternalErrorException) && ELIGIBLE_CODES.contains(Integer.valueOf(((InternalErrorException) th).getStatusCode())));
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @FunctionalInterface
        /* loaded from: input_file:org/apache/beam/sdk/io/aws/sns/SnsIO$RetryConfiguration$RetryPredicate.class */
        public interface RetryPredicate extends Predicate<Throwable>, Serializable {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract int getMaxAttempts();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getMaxDuration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getInitialDuration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract RetryPredicate getRetryPredicate();

        abstract Builder builder();

        public static RetryConfiguration create(int i, Duration duration) {
            return create(i, duration, DEFAULT_INITIAL_DURATION);
        }

        @VisibleForTesting
        static RetryConfiguration create(int i, Duration duration, Duration duration2) {
            Preconditions.checkArgument(i > 0, "maxAttempts should be greater than 0");
            Preconditions.checkArgument(duration != null && duration.isLongerThan(Duration.ZERO), "maxDuration should be greater than 0");
            Preconditions.checkArgument(duration2 != null && duration2.isLongerThan(Duration.ZERO), "initialDuration should be greater than 0");
            return new AutoValue_SnsIO_RetryConfiguration.Builder().setMaxAttempts(i).setMaxDuration(duration).setInitialDuration(duration2).setRetryPredicate(DEFAULT_RETRY_PREDICATE).build();
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    /* loaded from: input_file:org/apache/beam/sdk/io/aws/sns/SnsIO$Write.class */
    public static abstract class Write extends PTransform<PCollection<PublishRequest>, PCollectionTuple> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/aws/sns/SnsIO$Write$Builder.class */
        public static abstract class Builder {
            abstract Builder setTopicName(String str);

            abstract Builder setAWSClientsProvider(AwsClientsProvider awsClientsProvider);

            abstract Builder setRetryConfiguration(RetryConfiguration retryConfiguration);

            abstract Builder setResultOutputTag(TupleTag<PublishResult> tupleTag);

            abstract Builder setCoder(Coder<PublishResult> coder);

            abstract Write build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/aws/sns/SnsIO$Write$SnsWriterFn.class */
        public static class SnsWriterFn extends DoFn<PublishRequest, PublishResult> {

            @VisibleForTesting
            static final String RETRY_ATTEMPT_LOG = "Error writing to SNS. Retry attempt[{}]";
            private transient FluentBackoff retryBackoff;
            private static final Logger LOG = LoggerFactory.getLogger(SnsWriterFn.class);
            private static final Counter SNS_WRITE_FAILURES = Metrics.counter(SnsWriterFn.class, "SNS_Write_Failures");
            private final Write spec;
            private transient AmazonSNS producer;

            SnsWriterFn(Write write) {
                this.spec = write;
            }

            @DoFn.Setup
            public void setup() throws Exception {
                this.producer = this.spec.getAWSClientsProvider().createSnsPublisher();
                Preconditions.checkArgument(topicExists(this.producer, this.spec.getTopicName()), "Topic %s does not exist", this.spec.getTopicName());
                this.retryBackoff = FluentBackoff.DEFAULT.withMaxRetries(0);
                if (this.spec.getRetryConfiguration() != null) {
                    this.retryBackoff = this.retryBackoff.withMaxRetries(this.spec.getRetryConfiguration().getMaxAttempts() - 1).withInitialBackoff(this.spec.getRetryConfiguration().getInitialDuration()).withMaxCumulativeBackoff(this.spec.getRetryConfiguration().getMaxDuration());
                }
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<PublishRequest, PublishResult>.ProcessContext processContext) throws Exception {
                PublishRequest publishRequest = (PublishRequest) processContext.element();
                Sleeper sleeper = Sleeper.DEFAULT;
                BackOff backoff = this.retryBackoff.backoff();
                int i = 0;
                while (true) {
                    i++;
                    try {
                        processContext.output(this.producer.publish(publishRequest));
                        return;
                    } catch (Exception e) {
                        if (this.spec.getRetryConfiguration() == null || !this.spec.getRetryConfiguration().getRetryPredicate().test(e)) {
                            SNS_WRITE_FAILURES.inc();
                            LOG.info("Unable to publish message {}.", publishRequest.getMessage(), e);
                            throw new IOException("Error writing to SNS (no attempt made to retry)", e);
                        }
                        if (!BackOffUtils.next(sleeper, backoff)) {
                            throw new IOException(String.format("Error writing to SNS after %d attempt(s). No more attempts allowed", Integer.valueOf(i)), e);
                        }
                        LOG.warn(RETRY_ATTEMPT_LOG, Integer.valueOf(i), e);
                    }
                }
                SNS_WRITE_FAILURES.inc();
                LOG.info("Unable to publish message {}.", publishRequest.getMessage(), e);
                throw new IOException("Error writing to SNS (no attempt made to retry)", e);
            }

            @DoFn.Teardown
            public void tearDown() {
                if (this.producer != null) {
                    this.producer.shutdown();
                    this.producer = null;
                }
            }

            private static boolean topicExists(AmazonSNS amazonSNS, String str) {
                try {
                    GetTopicAttributesResult topicAttributes = amazonSNS.getTopicAttributes(str);
                    if (topicAttributes != null) {
                        if (topicAttributes.getSdkHttpMetadata().getHttpStatusCode() == 200) {
                            return true;
                        }
                    }
                    return false;
                } catch (Exception e) {
                    LOG.warn("Error checking whether topic {} exists.", str, e);
                    throw e;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getTopicName();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract AwsClientsProvider getAWSClientsProvider();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract RetryConfiguration getRetryConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract TupleTag<PublishResult> getResultOutputTag();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Coder<PublishResult> getCoder();

        abstract Builder builder();

        public Write withTopicName(String str) {
            return builder().setTopicName(str).build();
        }

        public Write withAWSClientsProvider(AwsClientsProvider awsClientsProvider) {
            return builder().setAWSClientsProvider(awsClientsProvider).build();
        }

        public Write withAWSClientsProvider(String str, String str2, Regions regions) {
            return withAWSClientsProvider(str, str2, regions, null);
        }

        public Write withAWSClientsProvider(String str, String str2, Regions regions, String str3) {
            return withAWSClientsProvider(new BasicSnsProvider(str, str2, regions, str3));
        }

        public Write withRetryConfiguration(RetryConfiguration retryConfiguration) {
            Preconditions.checkArgument(retryConfiguration != null, "retryConfiguration is required");
            return builder().setRetryConfiguration(retryConfiguration).build();
        }

        public Write withResultOutputTag(TupleTag<PublishResult> tupleTag) {
            return builder().setResultOutputTag(tupleTag).build();
        }

        public Write withFullPublishResult() {
            return withCoder(PublishResultCoders.fullPublishResult());
        }

        public Write withFullPublishResultWithoutHeaders() {
            return withCoder(PublishResultCoders.fullPublishResultWithoutHeaders());
        }

        public Write withCoder(Coder<PublishResult> coder) {
            return builder().setCoder(coder).build();
        }

        public PCollectionTuple expand(PCollection<PublishRequest> pCollection) {
            LoggerFactory.getLogger(SnsIO.class).warn("You are using a deprecated IO for Sns. Please migrate to module 'org.apache.beam:beam-sdks-java-io-amazon-web-services2'.");
            Preconditions.checkArgument(getTopicName() != null, "withTopicName() is required");
            PCollectionTuple apply = pCollection.apply(ParDo.of(new SnsWriterFn(this)).withOutputTags(getResultOutputTag(), TupleTagList.empty()));
            if (getCoder() != null) {
                apply.get(getResultOutputTag()).setCoder(getCoder());
            }
            return apply;
        }
    }

    public static Write write() {
        return new AutoValue_SnsIO_Write.Builder().build();
    }
}
