package org.apache.beam.sdk.io.aws.sns;

import com.amazonaws.http.SdkHttpMetadata;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.model.GetTopicAttributesResult;
import com.amazonaws.services.sns.model.InternalErrorException;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.UUID;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.io.aws.sns.SnsIO;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws/sns/SnsIOTest.class */
public class SnsIOTest implements Serializable {
    private static final String topicName = "arn:aws:sns:us-west-2:5880:topic-FMFEHJ47NRFO";

    @Rule
    public TestPipeline p = TestPipeline.create();

    @Rule
    public final transient ExpectedLogs expectedLogs = ExpectedLogs.none(SnsIO.class);

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    /* loaded from: input_file:org/apache/beam/sdk/io/aws/sns/SnsIOTest$MockCoder.class */
    private static class MockCoder extends AtomicCoder<PublishResult> {
        private PublishResult captured;

        private MockCoder() {
        }

        public void encode(PublishResult publishResult, OutputStream outputStream) throws CoderException, IOException {
            this.captured = publishResult;
            PublishResultCoders.defaultPublishResult().encode(publishResult, outputStream);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public PublishResult m3decode(InputStream inputStream) throws CoderException, IOException {
            return (PublishResult) PublishResultCoders.defaultPublishResult().decode(inputStream);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/aws/sns/SnsIOTest$Provider.class */
    private static class Provider implements AwsClientsProvider {
        private static AmazonSNS publisher;

        public Provider(AmazonSNS amazonSNS) {
            publisher = amazonSNS;
        }

        public AmazonCloudWatch getCloudWatchClient() {
            return (AmazonCloudWatch) Mockito.mock(AmazonCloudWatch.class);
        }

        public AmazonSNS createSnsPublisher() {
            return publisher;
        }
    }

    private static PublishRequest createSampleMessage(String str) {
        return new PublishRequest().withTopicArn(topicName).withMessage(str);
    }

    @Test
    public void testDataWritesToSNS() {
        PublishRequest createSampleMessage = createSampleMessage("my_first_message");
        PublishRequest createSampleMessage2 = createSampleMessage("my_second_message");
        TupleTag tupleTag = new TupleTag();
        PAssert.that(this.p.apply(Create.of(createSampleMessage, new PublishRequest[]{createSampleMessage2})).apply(SnsIO.write().withTopicName(topicName).withRetryConfiguration(SnsIO.RetryConfiguration.create(5, Duration.standardMinutes(1L))).withAWSClientsProvider(new Provider(getAmazonSnsMockSuccess())).withResultOutputTag(tupleTag)).get(tupleTag).apply(Count.globally())).containsInAnyOrder(ImmutableList.of(2L));
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testRetries() throws Throwable {
        this.thrown.expectMessage("Error writing to SNS");
        this.p.apply(Create.of(createSampleMessage("my message that will not be published"), new PublishRequest[0])).apply(SnsIO.write().withTopicName(topicName).withRetryConfiguration(SnsIO.RetryConfiguration.create(4, Duration.standardSeconds(10L))).withAWSClientsProvider(new Provider(getAmazonSnsMockErrors())).withResultOutputTag(new TupleTag()));
        try {
            this.p.run();
            Assert.fail("Pipeline is expected to fail because we were unable to write to SNS.");
        } catch (Pipeline.PipelineExecutionException e) {
            this.expectedLogs.verifyWarn(String.format("Error writing to SNS. Retry attempt[%d]", 1));
            this.expectedLogs.verifyWarn(String.format("Error writing to SNS. Retry attempt[%d]", 2));
            this.expectedLogs.verifyWarn(String.format("Error writing to SNS. Retry attempt[%d]", 3));
            throw e.getCause();
        }
    }

    @Test
    public void testCustomCoder() throws Exception {
        PublishRequest createSampleMessage = createSampleMessage("my_first_message");
        TupleTag tupleTag = new TupleTag();
        AmazonSNS amazonSnsMockSuccess = getAmazonSnsMockSuccess();
        MockCoder mockCoder = new MockCoder();
        PAssert.that(this.p.apply(Create.of(createSampleMessage, new PublishRequest[0])).apply(SnsIO.write().withTopicName(topicName).withAWSClientsProvider(new Provider(amazonSnsMockSuccess)).withResultOutputTag(tupleTag).withCoder(mockCoder)).get(tupleTag).apply(MapElements.into(TypeDescriptors.strings()).via(publishResult -> {
            return publishResult.getMessageId();
        })).apply(Count.globally())).containsInAnyOrder(ImmutableList.of(1L));
        this.p.run().waitUntilFinish();
        Assertions.assertThat(mockCoder.captured).isNotNull();
    }

    private static AmazonSNS getAmazonSnsMockSuccess() {
        AmazonSNS amazonSNS = (AmazonSNS) Mockito.mock(AmazonSNS.class);
        configureAmazonSnsMock(amazonSNS);
        PublishResult publishResult = (PublishResult) Mockito.mock(PublishResult.class);
        SdkHttpMetadata sdkHttpMetadata = (SdkHttpMetadata) Mockito.mock(SdkHttpMetadata.class);
        Mockito.when(sdkHttpMetadata.getHttpHeaders()).thenReturn(new HashMap());
        Mockito.when(Integer.valueOf(sdkHttpMetadata.getHttpStatusCode())).thenReturn(200);
        Mockito.when(publishResult.getSdkHttpMetadata()).thenReturn(sdkHttpMetadata);
        Mockito.when(publishResult.getMessageId()).thenReturn(UUID.randomUUID().toString());
        Mockito.when(amazonSNS.publish((PublishRequest) Mockito.any())).thenReturn(publishResult);
        return amazonSNS;
    }

    private static AmazonSNS getAmazonSnsMockErrors() {
        AmazonSNS amazonSNS = (AmazonSNS) Mockito.mock(AmazonSNS.class);
        configureAmazonSnsMock(amazonSNS);
        Mockito.when(amazonSNS.publish((PublishRequest) Mockito.any())).thenThrow(new Throwable[]{new InternalErrorException("Service unavailable")});
        return amazonSNS;
    }

    private static void configureAmazonSnsMock(AmazonSNS amazonSNS) {
        GetTopicAttributesResult getTopicAttributesResult = (GetTopicAttributesResult) Mockito.mock(GetTopicAttributesResult.class);
        SdkHttpMetadata sdkHttpMetadata = (SdkHttpMetadata) Mockito.mock(SdkHttpMetadata.class);
        Mockito.when(sdkHttpMetadata.getHttpHeaders()).thenReturn(new HashMap());
        Mockito.when(Integer.valueOf(sdkHttpMetadata.getHttpStatusCode())).thenReturn(200);
        Mockito.when(getTopicAttributesResult.getSdkHttpMetadata()).thenReturn(sdkHttpMetadata);
        Mockito.when(amazonSNS.getTopicAttributes(Mockito.anyString())).thenReturn(getTopicAttributesResult);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1057847491:
                if (implMethodName.equals("lambda$testCustomCoder$d29ce6b0$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws/sns/SnsIOTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/amazonaws/services/sns/model/PublishResult;)Ljava/lang/String;")) {
                    return publishResult -> {
                        return publishResult.getMessageId();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
