package org.apache.beam.sdk.io.aws2.sns;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.function.BiConsumer;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.io.aws2.sns.AutoValue_SnsIO_Write;
import org.apache.beam.sdk.io.aws2.sns.AutoValue_SnsIO_WriteAsync;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsAsyncClient;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.InvalidParameterException;
import software.amazon.awssdk.services.sns.model.NotFoundException;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.PublishResponse;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/sns/SnsIO.class */
public final class SnsIO {

    @AutoValue
    @Deprecated
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/sns/SnsIO$RetryConfiguration.class */
    public static abstract class RetryConfiguration implements Serializable {
        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract int getMaxAttempts();

        @Deprecated
        public static RetryConfiguration create(int i, Duration duration) {
            Preconditions.checkArgument(i > 0, "maxAttempts should be greater than 0");
            return new AutoValue_SnsIO_RetryConfiguration(i);
        }

        org.apache.beam.sdk.io.aws2.common.RetryConfiguration convertLegacyConfig() {
            return org.apache.beam.sdk.io.aws2.common.RetryConfiguration.builder().numRetries((getMaxAttempts() * 3) - 1).build();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/sns/SnsIO$SnsWriteException.class */
    public static class SnsWriteException extends RuntimeException {
        SnsWriteException(String str, Throwable th) {
            super(str, th);
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/sns/SnsIO$Write.class */
    public static abstract class Write<T> extends PTransform<PCollection<T>, PCollection<PublishResponse>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/sns/SnsIO$Write$Builder.class */
        public static abstract class Builder<T> {
            abstract Builder<T> setClientConfiguration(ClientConfiguration clientConfiguration);

            abstract Builder<T> setTopicArn(String str);

            abstract Builder<T> setPublishRequestBuilder(SerializableFunction<T, PublishRequest.Builder> serializableFunction);

            abstract Builder<T> setSnsClientProvider(SnsClientProvider snsClientProvider);

            abstract Builder<T> setCoder(Coder<PublishResponse> coder);

            abstract Write<T> build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/sns/SnsIO$Write$SnsWriterFn.class */
        public static class SnsWriterFn<T> extends DoFn<T, PublishResponse> {
            private static final Logger LOG = LoggerFactory.getLogger(SnsWriterFn.class);
            private static final Counter SNS_WRITE_FAILURES = Metrics.counter(SnsWriterFn.class, "SNS_Write_Failures");
            private final Write<T> spec;
            private transient SnsClient producer;

            SnsWriterFn(Write<T> write) {
                this.spec = write;
            }

            @DoFn.Setup
            public void setup(PipelineOptions pipelineOptions) throws Exception {
                this.producer = this.spec.buildClient((AwsOptions) pipelineOptions.as(AwsOptions.class));
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<T, PublishResponse>.ProcessContext processContext) throws Exception {
                PublishRequest.Builder builder = (PublishRequest.Builder) this.spec.getPublishRequestBuilder().apply(processContext.element());
                if (this.spec.getTopicArn() != null) {
                    builder.topicArn(this.spec.getTopicArn());
                }
                PublishRequest publishRequest = (PublishRequest) builder.build();
                try {
                    processContext.output(this.producer.publish(publishRequest));
                } catch (SdkException e) {
                    SNS_WRITE_FAILURES.inc();
                    LOG.error("Unable to publish message {}.", publishRequest.message());
                    throw e;
                }
            }

            @DoFn.Teardown
            public void tearDown() {
                if (this.producer != null) {
                    this.producer.close();
                    this.producer = null;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ClientConfiguration getClientConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getTopicArn();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableFunction<T, PublishRequest.Builder> getPublishRequestBuilder();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SnsClientProvider getSnsClientProvider();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Coder<PublishResponse> getCoder();

        abstract Builder<T> builder();

        public Write<T> withTopicArn(String str) {
            return builder().setTopicArn(str).build();
        }

        public Write<T> withPublishRequestBuilder(SerializableFunction<T, PublishRequest.Builder> serializableFunction) {
            return builder().setPublishRequestBuilder(serializableFunction).build();
        }

        @Deprecated
        public Write<T> withPublishRequestFn(SerializableFunction<T, PublishRequest> serializableFunction) {
            return builder().setPublishRequestBuilder(obj -> {
                return ((PublishRequest) serializableFunction.apply(obj)).toBuilder();
            }).build();
        }

        @Deprecated
        public Write<T> withSnsClientProvider(SnsClientProvider snsClientProvider) {
            Preconditions.checkArgument(snsClientProvider != null, "SnsClientProvider cannot be null");
            return builder().setClientConfiguration(null).setSnsClientProvider(snsClientProvider).build();
        }

        @Deprecated
        public Write<T> withSnsClientProvider(AwsCredentialsProvider awsCredentialsProvider, String str) {
            return updateClientConfig(builder -> {
                return builder.credentialsProvider(awsCredentialsProvider).region(Region.of(str)).build();
            });
        }

        @Deprecated
        public Write<T> withSnsClientProvider(AwsCredentialsProvider awsCredentialsProvider, String str, URI uri) {
            return updateClientConfig(builder -> {
                return builder.credentialsProvider(awsCredentialsProvider).region(Region.of(str)).endpoint(uri).build();
            });
        }

        public Write<T> withClientConfiguration(ClientConfiguration clientConfiguration) {
            return updateClientConfig(builder -> {
                return clientConfiguration;
            });
        }

        private Write<T> updateClientConfig(Function<ClientConfiguration.Builder, ClientConfiguration> function) {
            Preconditions.checkState(getSnsClientProvider() == null, "Legacy SnsClientProvider is set, but incompatible with ClientConfiguration.");
            ClientConfiguration clientConfiguration = (ClientConfiguration) function.apply(getClientConfiguration().toBuilder());
            Preconditions.checkArgument(clientConfiguration != null, "ClientConfiguration cannot be null");
            return builder().setClientConfiguration(clientConfiguration).build();
        }

        @Deprecated
        public Write<T> withRetryConfiguration(RetryConfiguration retryConfiguration) {
            return updateClientConfig(builder -> {
                return builder.retry(retryConfiguration.convertLegacyConfig()).build();
            });
        }

        public Write<T> withFullPublishResponse() {
            return withCoder(PublishResponseCoders.fullPublishResponse());
        }

        public Write<T> withFullPublishResponseWithoutHeaders() {
            return withCoder(PublishResponseCoders.fullPublishResponseWithoutHeaders());
        }

        public Write<T> withCoder(Coder<PublishResponse> coder) {
            return builder().setCoder(coder).build();
        }

        public PCollection<PublishResponse> expand(PCollection<T> pCollection) {
            Preconditions.checkArgument(getPublishRequestBuilder() != null, "withPublishRequestBuilder() is required");
            AwsOptions awsOptions = (AwsOptions) pCollection.getPipeline().getOptions().as(AwsOptions.class);
            if (getSnsClientProvider() == null) {
                Preconditions.checkArgument(getClientConfiguration() != null, "withClientConfiguration() is required");
                ClientBuilderFactory.validate(awsOptions, getClientConfiguration());
            }
            if (getTopicArn() != null) {
                Preconditions.checkArgument(checkTopicExists(awsOptions), "Topic arn %s does not exist", getTopicArn());
            }
            PCollection<PublishResponse> apply = pCollection.apply(ParDo.of(new SnsWriterFn(this)));
            if (getCoder() != null) {
                apply.setCoder(getCoder());
            }
            return apply;
        }

        private boolean checkTopicExists(AwsOptions awsOptions) {
            try {
                SnsClient buildClient = buildClient(awsOptions);
                Throwable th = null;
                try {
                    try {
                        buildClient.getTopicAttributes(builder -> {
                            builder.topicArn(getTopicArn());
                        });
                        if (buildClient != null) {
                            if (0 != 0) {
                                try {
                                    buildClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                buildClient.close();
                            }
                        }
                        return true;
                    } finally {
                    }
                } finally {
                }
            } catch (NotFoundException | InvalidParameterException e) {
                LoggerFactory.getLogger(Write.class).warn("Configured topic ARN '" + getTopicArn() + "' does not exist.", e);
                return false;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public SnsClient buildClient(AwsOptions awsOptions) {
            return getSnsClientProvider() != null ? getSnsClientProvider().getSnsClient() : (SnsClient) ClientBuilderFactory.buildClient((AwsOptions) awsOptions.as(AwsOptions.class), SnsClient.builder(), getClientConfiguration());
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 675266853:
                    if (implMethodName.equals("lambda$withPublishRequestFn$52c7b5fa$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/sns/SnsIO$Write") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/transforms/SerializableFunction;Ljava/lang/Object;)Lsoftware/amazon/awssdk/services/sns/model/PublishRequest$Builder;")) {
                        SerializableFunction serializableFunction = (SerializableFunction) serializedLambda.getCapturedArg(0);
                        return obj -> {
                            return ((PublishRequest) serializableFunction.apply(obj)).toBuilder();
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    @AutoValue
    @Deprecated
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/sns/SnsIO$WriteAsync.class */
    public static abstract class WriteAsync<T> extends PTransform<PCollection<T>, PCollection<SnsResponse<T>>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/sns/SnsIO$WriteAsync$Builder.class */
        public static abstract class Builder<T> {
            abstract Builder<T> setSnsClientProvider(SnsAsyncClientProvider snsAsyncClientProvider);

            abstract Builder<T> setCoder(Coder<T> coder);

            abstract Builder<T> setPublishRequestFn(SerializableFunction<T, PublishRequest> serializableFunction);

            abstract WriteAsync<T> build();
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/sns/SnsIO$WriteAsync$SnsWriteAsyncFn.class */
        public static class SnsWriteAsyncFn<T> extends DoFn<T, SnsResponse<T>> {
            private static final Logger LOG = LoggerFactory.getLogger(SnsWriteAsyncFn.class);
            private final WriteAsync<T> spec;
            private transient SnsAsyncClient client;

            SnsWriteAsyncFn(WriteAsync<T> writeAsync) {
                this.spec = writeAsync;
            }

            @DoFn.Setup
            public void setup() {
                this.client = this.spec.getSnsClientProvider().getSnsAsyncClient();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<T, SnsResponse<T>>.ProcessContext processContext) {
                this.client.publish((PublishRequest) this.spec.getPublishRequestFn().apply(processContext.element())).whenComplete((BiConsumer) getPublishResponse(processContext));
            }

            private BiConsumer<? super PublishResponse, ? super Throwable> getPublishResponse(DoFn<T, SnsResponse<T>>.ProcessContext processContext) {
                return (publishResponse, th) -> {
                    if (th == null) {
                        processContext.output(SnsResponse.of(processContext.element(), publishResponse));
                    } else {
                        LOG.error("Error while publishing request to SNS", th);
                        throw new SnsWriteException("Error while publishing request to SNS", th);
                    }
                };
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SnsAsyncClientProvider getSnsClientProvider();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableFunction<T, PublishRequest> getPublishRequestFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Coder<T> getCoder();

        abstract Builder<T> builder();

        public WriteAsync<T> withCoder(Coder<T> coder) {
            Preconditions.checkNotNull(coder, "elementCoder cannot be null");
            return builder().setCoder(coder).build();
        }

        public WriteAsync<T> withPublishRequestFn(SerializableFunction<T, PublishRequest> serializableFunction) {
            Preconditions.checkNotNull(serializableFunction, "publishRequestFn cannot be null");
            return builder().setPublishRequestFn(serializableFunction).build();
        }

        public WriteAsync<T> withSnsClientProvider(SnsAsyncClientProvider snsAsyncClientProvider) {
            Preconditions.checkNotNull(snsAsyncClientProvider, "asyncClientProvider cannot be null");
            return builder().setSnsClientProvider(snsAsyncClientProvider).build();
        }

        public WriteAsync<T> withSnsClientProvider(AwsCredentialsProvider awsCredentialsProvider, String str) {
            Preconditions.checkNotNull(awsCredentialsProvider, "credentialsProvider cannot be null");
            Preconditions.checkNotNull(str, "region cannot be null");
            return withSnsClientProvider(awsCredentialsProvider, str, null);
        }

        public WriteAsync<T> withSnsClientProvider(AwsCredentialsProvider awsCredentialsProvider, String str, URI uri) {
            Preconditions.checkNotNull(awsCredentialsProvider, "credentialsProvider cannot be null");
            Preconditions.checkNotNull(str, "region cannot be null");
            return withSnsClientProvider(new BasicSnsAsyncClientProvider(awsCredentialsProvider, str, uri));
        }

        public PCollection<SnsResponse<T>> expand(PCollection<T> pCollection) {
            Preconditions.checkArgument(getSnsClientProvider() != null, "withSnsClientProvider() needs to called");
            Preconditions.checkArgument(getPublishRequestFn() != null, "withPublishRequestFn() needs to called");
            Preconditions.checkArgument(getCoder() != null, "withElementCoder() needs to called");
            return pCollection.apply(ParDo.of(new SnsWriteAsyncFn(this))).setCoder(SnsResponseCoder.of(getCoder()));
        }
    }

    public static <T> Write<T> write() {
        return new AutoValue_SnsIO_Write.Builder().setClientConfiguration(ClientConfiguration.builder().build()).build();
    }

    @Deprecated
    public static <T> WriteAsync<T> writeAsync() {
        return new AutoValue_SnsIO_WriteAsync.Builder().build();
    }
}
