/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.sns;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.aws2.sns.AutoValue_SnsIO_RetryConfiguration;
import org.apache.beam.sdk.io.aws2.sns.AutoValue_SnsIO_Write;
import org.apache.beam.sdk.io.aws2.sns.AutoValue_SnsIO_WriteAsync;
import org.apache.beam.sdk.io.aws2.sns.BasicSnsAsyncClientProvider;
import org.apache.beam.sdk.io.aws2.sns.BasicSnsClientProvider;
import org.apache.beam.sdk.io.aws2.sns.SnsAsyncClientProvider;
import org.apache.beam.sdk.io.aws2.sns.SnsClientProvider;
import org.apache.beam.sdk.io.aws2.sns.SnsResponse;
import org.apache.beam.sdk.io.aws2.sns.SnsResponseCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.sns.SnsAsyncClient;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.GetTopicAttributesRequest;
import software.amazon.awssdk.services.sns.model.GetTopicAttributesResponse;
import software.amazon.awssdk.services.sns.model.InternalErrorException;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.PublishResponse;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public final class SnsIO {
    public static <T> @UnknownKeyFor @NonNull @Initialized Write<T> write() {
        return new AutoValue_SnsIO_Write.Builder().build();
    }

    public static <T> @UnknownKeyFor @NonNull @Initialized WriteAsync<T> writeAsync() {
        return new AutoValue_SnsIO_WriteAsync.Builder().build();
    }

    protected static class SnsWriteException
    extends RuntimeException {
        SnsWriteException(@UnknownKeyFor @NonNull @Initialized String message, @UnknownKeyFor @NonNull @Initialized Throwable error) {
            super(message, error);
        }
    }

    @AutoValue
    public static abstract class WriteAsync<@UnknownKeyFor T>
    extends PTransform<PCollection<T>, PCollection<SnsResponse<T>>> {
        abstract @Nullable @UnknownKeyFor @Initialized SnsAsyncClientProvider getSnsClientProvider();

        abstract @Nullable @UnknownKeyFor @Initialized SerializableFunction<T, @UnknownKeyFor @NonNull @Initialized PublishRequest> getPublishRequestFn();

        abstract @Nullable @UnknownKeyFor @Initialized Coder<T> getCoder();

        abstract @UnknownKeyFor @NonNull @Initialized Builder<T> builder();

        public @UnknownKeyFor @NonNull @Initialized WriteAsync<T> withCoder(@UnknownKeyFor @NonNull @Initialized Coder<T> elementCoder) {
            Preconditions.checkNotNull(elementCoder, (Object)"elementCoder cannot be null");
            return this.builder().setCoder(elementCoder).build();
        }

        public @UnknownKeyFor @NonNull @Initialized WriteAsync<T> withPublishRequestFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<T, @UnknownKeyFor @NonNull @Initialized PublishRequest> publishRequestFn) {
            Preconditions.checkNotNull(publishRequestFn, (Object)"publishRequestFn cannot be null");
            return this.builder().setPublishRequestFn(publishRequestFn).build();
        }

        public @UnknownKeyFor @NonNull @Initialized WriteAsync<T> withSnsClientProvider(@UnknownKeyFor @NonNull @Initialized SnsAsyncClientProvider asyncClientProvider) {
            Preconditions.checkNotNull((Object)asyncClientProvider, (Object)"asyncClientProvider cannot be null");
            return this.builder().setSnsClientProvider(asyncClientProvider).build();
        }

        public @UnknownKeyFor @NonNull @Initialized WriteAsync<T> withSnsClientProvider(@UnknownKeyFor @NonNull @Initialized AwsCredentialsProvider credentialsProvider, @UnknownKeyFor @NonNull @Initialized String region) {
            Preconditions.checkNotNull((Object)credentialsProvider, (Object)"credentialsProvider cannot be null");
            Preconditions.checkNotNull((Object)region, (Object)"region cannot be null");
            return this.withSnsClientProvider(credentialsProvider, region, null);
        }

        public @UnknownKeyFor @NonNull @Initialized WriteAsync<T> withSnsClientProvider(@UnknownKeyFor @NonNull @Initialized AwsCredentialsProvider credentialsProvider, @UnknownKeyFor @NonNull @Initialized String region, @UnknownKeyFor @NonNull @Initialized URI serviceEndpoint) {
            Preconditions.checkNotNull((Object)credentialsProvider, (Object)"credentialsProvider cannot be null");
            Preconditions.checkNotNull((Object)region, (Object)"region cannot be null");
            return this.withSnsClientProvider(new BasicSnsAsyncClientProvider(credentialsProvider, region, serviceEndpoint));
        }

        public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized SnsResponse<T>> expand(@UnknownKeyFor @NonNull @Initialized PCollection<T> input) {
            Preconditions.checkArgument((this.getSnsClientProvider() != null ? 1 : 0) != 0, (Object)"withSnsClientProvider() needs to called");
            Preconditions.checkArgument((this.getPublishRequestFn() != null ? 1 : 0) != 0, (Object)"withPublishRequestFn() needs to called");
            Preconditions.checkArgument((this.getCoder() != null ? 1 : 0) != 0, (Object)"withElementCoder() needs to called");
            return ((PCollection)input.apply((PTransform)ParDo.of(new SnsWriteAsyncFn(this)))).setCoder(SnsResponseCoder.of(this.getCoder()));
        }

        private static class SnsWriteAsyncFn<@UnknownKeyFor T>
        extends DoFn<T, SnsResponse<T>> {
            private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(SnsWriteAsyncFn.class);
            private final @UnknownKeyFor @NonNull @Initialized WriteAsync<T> spec;
            private transient @UnknownKeyFor @NonNull @Initialized SnsAsyncClient client;

            SnsWriteAsyncFn(@UnknownKeyFor @NonNull @Initialized WriteAsync<T> spec) {
                this.spec = spec;
            }

            @DoFn.Setup
            public void setup() {
                this.client = this.spec.getSnsClientProvider().getSnsAsyncClient();
            }

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context) {
                PublishRequest publishRequest = (PublishRequest)this.spec.getPublishRequestFn().apply(context.element());
                this.client.publish(publishRequest).whenComplete(this.getPublishResponse(context));
            }

            private @UnknownKeyFor @NonNull @Initialized BiConsumer<@UnknownKeyFor @Nullable @Initialized ? super @UnknownKeyFor @NonNull @Initialized PublishResponse, @UnknownKeyFor @Nullable @Initialized ? super @UnknownKeyFor @NonNull @Initialized Throwable> getPublishResponse(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context) {
                return (response, ex) -> {
                    if (ex != null) {
                        LOG.error("Error while publishing request to SNS", ex);
                        throw new SnsWriteException("Error while publishing request to SNS", (Throwable)ex);
                    }
                    SnsResponse<Object> snsResponse = SnsResponse.of(context.element(), response);
                    context.output(snsResponse);
                };
            }
        }

        @AutoValue.Builder
        static abstract class Builder<@UnknownKeyFor T> {
            Builder() {
            }

            abstract @UnknownKeyFor @NonNull @Initialized Builder<T> setSnsClientProvider(@UnknownKeyFor @NonNull @Initialized SnsAsyncClientProvider var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<T> setCoder(@UnknownKeyFor @NonNull @Initialized Coder<T> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<T> setPublishRequestFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<T, @UnknownKeyFor @NonNull @Initialized PublishRequest> var1);

            abstract @UnknownKeyFor @NonNull @Initialized WriteAsync<T> build();
        }
    }

    @AutoValue
    public static abstract class Write<@UnknownKeyFor T>
    extends PTransform<PCollection<T>, PCollection<PublishResponse>> {
        abstract @Nullable @UnknownKeyFor @Initialized String getTopicArn();

        abstract @Nullable @UnknownKeyFor @Initialized SerializableFunction<T, @UnknownKeyFor @NonNull @Initialized PublishRequest> getPublishRequestFn();

        abstract @Nullable @UnknownKeyFor @Initialized SnsClientProvider getSnsClientProvider();

        abstract @Nullable @UnknownKeyFor @Initialized RetryConfiguration getRetryConfiguration();

        abstract @UnknownKeyFor @NonNull @Initialized Builder<T> builder();

        public @UnknownKeyFor @NonNull @Initialized Write<T> withTopicArn(@UnknownKeyFor @NonNull @Initialized String topicArn) {
            return this.builder().setTopicArn(topicArn).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Write<T> withPublishRequestFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<T, @UnknownKeyFor @NonNull @Initialized PublishRequest> publishRequestFn) {
            return this.builder().setPublishRequestFn(publishRequestFn).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Write<T> withSnsClientProvider(@UnknownKeyFor @NonNull @Initialized SnsClientProvider awsClientsProvider) {
            return this.builder().setSnsClientProvider(awsClientsProvider).build();
        }

        public @UnknownKeyFor @NonNull @Initialized Write<T> withSnsClientProvider(@UnknownKeyFor @NonNull @Initialized AwsCredentialsProvider credentialsProvider, @UnknownKeyFor @NonNull @Initialized String region) {
            return this.withSnsClientProvider(credentialsProvider, region, null);
        }

        public @UnknownKeyFor @NonNull @Initialized Write<T> withSnsClientProvider(@UnknownKeyFor @NonNull @Initialized AwsCredentialsProvider credentialsProvider, @UnknownKeyFor @NonNull @Initialized String region, @UnknownKeyFor @NonNull @Initialized URI serviceEndpoint) {
            return this.withSnsClientProvider(new BasicSnsClientProvider(credentialsProvider, region, serviceEndpoint));
        }

        public @UnknownKeyFor @NonNull @Initialized Write<T> withRetryConfiguration(@UnknownKeyFor @NonNull @Initialized RetryConfiguration retryConfiguration) {
            Preconditions.checkArgument((retryConfiguration != null ? 1 : 0) != 0, (Object)"retryConfiguration is required");
            return this.builder().setRetryConfiguration(retryConfiguration).build();
        }

        private static @UnknownKeyFor @NonNull @Initialized boolean isTopicExists(@UnknownKeyFor @NonNull @Initialized SnsClient client, @UnknownKeyFor @NonNull @Initialized String topicArn) {
            GetTopicAttributesRequest getTopicAttributesRequest = (GetTopicAttributesRequest)GetTopicAttributesRequest.builder().topicArn(topicArn).build();
            GetTopicAttributesResponse topicAttributesResponse = client.getTopicAttributes(getTopicAttributesRequest);
            return topicAttributesResponse != null && topicAttributesResponse.sdkHttpResponse().statusCode() == 200;
        }

        public @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized PublishResponse> expand(@UnknownKeyFor @NonNull @Initialized PCollection<T> input) {
            Preconditions.checkArgument((this.getTopicArn() != null ? 1 : 0) != 0, (Object)"withTopicArn() is required");
            Preconditions.checkArgument((this.getPublishRequestFn() != null ? 1 : 0) != 0, (Object)"withPublishRequestFn() is required");
            Preconditions.checkArgument((this.getSnsClientProvider() != null ? 1 : 0) != 0, (Object)"withSnsClientProvider() is required");
            Preconditions.checkArgument((boolean)Write.isTopicExists(this.getSnsClientProvider().getSnsClient(), this.getTopicArn()), (String)"Topic arn %s does not exist", (Object)this.getTopicArn());
            return (PCollection)input.apply((PTransform)ParDo.of(new SnsWriterFn(this)));
        }

        static class SnsWriterFn<@UnknownKeyFor T>
        extends DoFn<T, PublishResponse> {
            @VisibleForTesting
            static final @UnknownKeyFor @NonNull @Initialized String RETRY_ATTEMPT_LOG = "Error writing to SNS. Retry attempt[%d]";
            private static final @UnknownKeyFor @NonNull @Initialized Duration RETRY_INITIAL_BACKOFF = Duration.standardSeconds((long)5L);
            private transient @UnknownKeyFor @NonNull @Initialized FluentBackoff retryBackoff;
            private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(SnsWriterFn.class);
            private static final @UnknownKeyFor @NonNull @Initialized Counter SNS_WRITE_FAILURES = Metrics.counter(SnsWriterFn.class, (String)"SNS_Write_Failures");
            private final @UnknownKeyFor @NonNull @Initialized Write spec;
            private transient @UnknownKeyFor @NonNull @Initialized SnsClient producer;

            SnsWriterFn(@UnknownKeyFor @NonNull @Initialized Write spec) {
                this.spec = spec;
            }

            @DoFn.Setup
            public void setup() throws @UnknownKeyFor @NonNull @Initialized Exception {
                this.producer = this.spec.getSnsClientProvider().getSnsClient();
                this.retryBackoff = FluentBackoff.DEFAULT.withMaxRetries(0).withInitialBackoff(RETRY_INITIAL_BACKOFF);
                if (this.spec.getRetryConfiguration() != null) {
                    this.retryBackoff = this.retryBackoff.withMaxRetries(this.spec.getRetryConfiguration().getMaxAttempts() - 1).withMaxCumulativeBackoff(this.spec.getRetryConfiguration().getMaxDuration());
                }
            }

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context) throws @UnknownKeyFor @NonNull @Initialized Exception {
                PublishRequest request = (PublishRequest)this.spec.getPublishRequestFn().apply(context.element());
                Sleeper sleeper = Sleeper.DEFAULT;
                BackOff backoff = this.retryBackoff.backoff();
                int attempt = 0;
                while (true) {
                    ++attempt;
                    try {
                        PublishResponse pr = this.producer.publish(request);
                        context.output((Object)pr);
                    }
                    catch (Exception ex) {
                        if (this.spec.getRetryConfiguration() == null || !this.spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
                            SNS_WRITE_FAILURES.inc();
                            LOG.info("Unable to publish message {}.", (Object)request.message(), (Object)ex);
                            throw new IOException("Error writing to SNS (no attempt made to retry)", ex);
                        }
                        if (!BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff)) {
                            throw new IOException(String.format("Error writing to SNS after %d attempt(s). No more attempts allowed", attempt), ex);
                        }
                        LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), (Throwable)ex);
                        continue;
                    }
                    break;
                }
            }

            @DoFn.Teardown
            public void tearDown() {
                if (this.producer != null) {
                    this.producer.close();
                    this.producer = null;
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder<@UnknownKeyFor T> {
            Builder() {
            }

            abstract @UnknownKeyFor @NonNull @Initialized Builder<T> setTopicArn(@UnknownKeyFor @NonNull @Initialized String var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<T> setPublishRequestFn(@UnknownKeyFor @NonNull @Initialized SerializableFunction<T, @UnknownKeyFor @NonNull @Initialized PublishRequest> var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<T> setSnsClientProvider(@UnknownKeyFor @NonNull @Initialized SnsClientProvider var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder<T> setRetryConfiguration(@UnknownKeyFor @NonNull @Initialized RetryConfiguration var1);

            abstract @UnknownKeyFor @NonNull @Initialized Write<T> build();
        }
    }

    @AutoValue
    public static abstract class RetryConfiguration
    implements Serializable {
        @VisibleForTesting
        static final @UnknownKeyFor @NonNull @Initialized RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate();

        abstract @UnknownKeyFor @NonNull @Initialized int getMaxAttempts();

        abstract @UnknownKeyFor @NonNull @Initialized Duration getMaxDuration();

        abstract @UnknownKeyFor @NonNull @Initialized RetryPredicate getRetryPredicate();

        abstract @UnknownKeyFor @NonNull @Initialized Builder builder();

        public static @UnknownKeyFor @NonNull @Initialized RetryConfiguration create(@UnknownKeyFor @NonNull @Initialized int maxAttempts, @UnknownKeyFor @NonNull @Initialized Duration maxDuration) {
            Preconditions.checkArgument((maxAttempts > 0 ? 1 : 0) != 0, (Object)"maxAttempts should be greater than 0");
            Preconditions.checkArgument((maxDuration != null && maxDuration.isLongerThan((ReadableDuration)Duration.ZERO) ? 1 : 0) != 0, (Object)"maxDuration should be greater than 0");
            return new AutoValue_SnsIO_RetryConfiguration.Builder().setMaxAttempts(maxAttempts).setMaxDuration(maxDuration).setRetryPredicate(DEFAULT_RETRY_PREDICATE).build();
        }

        private static class DefaultRetryPredicate
        implements RetryPredicate {
            private static final @UnknownKeyFor @NonNull @Initialized ImmutableSet<@UnknownKeyFor @NonNull @Initialized Integer> ELIGIBLE_CODES = ImmutableSet.of((Object)503);

            private DefaultRetryPredicate() {
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized boolean test(@UnknownKeyFor @NonNull @Initialized Throwable throwable) {
                return throwable instanceof IOException || throwable instanceof InternalErrorException || throwable instanceof InternalErrorException && ELIGIBLE_CODES.contains((Object)((InternalErrorException)throwable).statusCode());
            }
        }

        @FunctionalInterface
        static interface RetryPredicate
        extends Predicate<Throwable>,
        Serializable {
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract @UnknownKeyFor @NonNull @Initialized Builder setMaxAttempts(@UnknownKeyFor @NonNull @Initialized int var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder setMaxDuration(@UnknownKeyFor @NonNull @Initialized Duration var1);

            abstract @UnknownKeyFor @NonNull @Initialized Builder setRetryPredicate(@UnknownKeyFor @NonNull @Initialized RetryPredicate var1);

            abstract @UnknownKeyFor @NonNull @Initialized RetryConfiguration build();
        }
    }
}

