/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws.sns;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.model.GetTopicAttributesResult;
import com.amazonaws.services.sns.model.InternalErrorException;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.aws.sns.AutoValue_SnsIO_RetryConfiguration;
import org.apache.beam.sdk.io.aws.sns.AutoValue_SnsIO_Write;
import org.apache.beam.sdk.io.aws.sns.AwsClientsProvider;
import org.apache.beam.sdk.io.aws.sns.BasicSnsProvider;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public final class SnsIO {
    public static Write write() {
        return new AutoValue_SnsIO_Write.Builder().build();
    }

    @AutoValue
    public static abstract class Write
    extends PTransform<PCollection<PublishRequest>, PCollectionTuple> {
        @Nullable
        abstract String getTopicName();

        @Nullable
        abstract AwsClientsProvider getAWSClientsProvider();

        @Nullable
        abstract RetryConfiguration getRetryConfiguration();

        @Nullable
        abstract TupleTag<PublishResult> getResultOutputTag();

        abstract Builder builder();

        public Write withTopicName(String topicName) {
            return this.builder().setTopicName(topicName).build();
        }

        public Write withAWSClientsProvider(AwsClientsProvider awsClientsProvider) {
            return this.builder().setAWSClientsProvider(awsClientsProvider).build();
        }

        public Write withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Regions region) {
            return this.withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null);
        }

        public Write withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Regions region, String serviceEndpoint) {
            return this.withAWSClientsProvider(new BasicSnsProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
        }

        public Write withRetryConfiguration(RetryConfiguration retryConfiguration) {
            Preconditions.checkArgument((retryConfiguration != null ? 1 : 0) != 0, (Object)"retryConfiguration is required");
            return this.builder().setRetryConfiguration(retryConfiguration).build();
        }

        public Write withResultOutputTag(TupleTag<PublishResult> results) {
            return this.builder().setResultOutputTag(results).build();
        }

        public PCollectionTuple expand(PCollection<PublishRequest> input) {
            Preconditions.checkArgument((this.getTopicName() != null ? 1 : 0) != 0, (Object)"withTopicName() is required");
            return (PCollectionTuple)input.apply((PTransform)ParDo.of((DoFn)new SnsWriterFn(this)).withOutputTags(this.getResultOutputTag(), TupleTagList.empty()));
        }

        static class SnsWriterFn
        extends DoFn<PublishRequest, PublishResult> {
            @VisibleForTesting
            static final String RETRY_ATTEMPT_LOG = "Error writing to SNS. Retry attempt[%d]";
            private static final Duration RETRY_INITIAL_BACKOFF = Duration.standardSeconds((long)5L);
            private transient FluentBackoff retryBackoff;
            private static final Logger LOG = LoggerFactory.getLogger(SnsWriterFn.class);
            private static final Counter SNS_WRITE_FAILURES = Metrics.counter(SnsWriterFn.class, (String)"SNS_Write_Failures");
            private final Write spec;
            private transient AmazonSNS producer;

            SnsWriterFn(Write spec) {
                this.spec = spec;
            }

            @DoFn.Setup
            public void setup() throws Exception {
                this.producer = this.spec.getAWSClientsProvider().createSnsPublisher();
                Preconditions.checkArgument((boolean)SnsWriterFn.topicExists(this.producer, this.spec.getTopicName()), (String)"Topic %s does not exist", (Object)this.spec.getTopicName());
                this.retryBackoff = FluentBackoff.DEFAULT.withMaxRetries(0).withInitialBackoff(RETRY_INITIAL_BACKOFF);
                if (this.spec.getRetryConfiguration() != null) {
                    this.retryBackoff = this.retryBackoff.withMaxRetries(this.spec.getRetryConfiguration().getMaxAttempts() - 1).withMaxCumulativeBackoff(this.spec.getRetryConfiguration().getMaxDuration());
                }
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext context) throws Exception {
                PublishRequest request = (PublishRequest)context.element();
                Sleeper sleeper = Sleeper.DEFAULT;
                BackOff backoff = this.retryBackoff.backoff();
                int attempt = 0;
                while (true) {
                    ++attempt;
                    try {
                        PublishResult pr = this.producer.publish(request);
                        context.output((Object)pr);
                    }
                    catch (Exception ex) {
                        if (this.spec.getRetryConfiguration() == null || !this.spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
                            SNS_WRITE_FAILURES.inc();
                            LOG.info("Unable to publish message {} due to {} ", (Object)request.getMessage(), (Object)ex);
                            throw new IOException("Error writing to SNS (no attempt made to retry)", ex);
                        }
                        if (!BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff)) {
                            throw new IOException(String.format("Error writing to SNS after %d attempt(s). No more attempts allowed", attempt), ex);
                        }
                        LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), (Throwable)ex);
                        continue;
                    }
                    break;
                }
            }

            @DoFn.Teardown
            public void tearDown() {
                if (this.producer != null) {
                    this.producer.shutdown();
                    this.producer = null;
                }
            }

            private static boolean topicExists(AmazonSNS client, String topicName) {
                try {
                    GetTopicAttributesResult topicAttributesResult = client.getTopicAttributes(topicName);
                    return topicAttributesResult != null && topicAttributesResult.getSdkHttpMetadata().getHttpStatusCode() == 200;
                }
                catch (Exception e) {
                    LOG.warn("Error checking whether topic {} exists.", (Object)topicName, (Object)e);
                    throw e;
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setTopicName(String var1);

            abstract Builder setAWSClientsProvider(AwsClientsProvider var1);

            abstract Builder setRetryConfiguration(RetryConfiguration var1);

            abstract Builder setResultOutputTag(TupleTag<PublishResult> var1);

            abstract Write build();
        }
    }

    @AutoValue
    public static abstract class RetryConfiguration
    implements Serializable {
        @VisibleForTesting
        static final RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate();

        abstract int getMaxAttempts();

        abstract Duration getMaxDuration();

        abstract RetryPredicate getRetryPredicate();

        abstract Builder builder();

        public static RetryConfiguration create(int maxAttempts, Duration maxDuration) {
            Preconditions.checkArgument((maxAttempts > 0 ? 1 : 0) != 0, (Object)"maxAttempts should be greater than 0");
            Preconditions.checkArgument((maxDuration != null && maxDuration.isLongerThan((ReadableDuration)Duration.ZERO) ? 1 : 0) != 0, (Object)"maxDuration should be greater than 0");
            return new AutoValue_SnsIO_RetryConfiguration.Builder().setMaxAttempts(maxAttempts).setMaxDuration(maxDuration).setRetryPredicate(DEFAULT_RETRY_PREDICATE).build();
        }

        private static class DefaultRetryPredicate
        implements RetryPredicate {
            private static final ImmutableSet<Integer> ELIGIBLE_CODES = ImmutableSet.of((Object)503);

            private DefaultRetryPredicate() {
            }

            @Override
            public boolean test(Throwable throwable) {
                return throwable instanceof IOException || throwable instanceof InternalErrorException || throwable instanceof InternalErrorException && ELIGIBLE_CODES.contains((Object)((InternalErrorException)throwable).getStatusCode());
            }
        }

        @FunctionalInterface
        static interface RetryPredicate
        extends Predicate<Throwable>,
        Serializable {
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setMaxAttempts(int var1);

            abstract Builder setMaxDuration(Duration var1);

            abstract Builder setRetryPredicate(RetryPredicate var1);

            abstract RetryConfiguration build();
        }
    }
}

