/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.sns;

import java.io.Serializable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws2.sns.SnsClientMockErrors;
import org.apache.beam.sdk.io.aws2.sns.SnsClientMockSuccess;
import org.apache.beam.sdk.io.aws2.sns.SnsIO;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import software.amazon.awssdk.services.sns.model.GetTopicAttributesResponse;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.PublishResponse;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={PublishResponse.class, GetTopicAttributesResponse.class})
public class SnsIOTest
implements Serializable {
    private static final String topicArn = "arn:aws:sns:us-west-2:5880:topic-FMFEHJ47NRFO";
    @Rule
    public TestPipeline p = TestPipeline.create();
    @Rule
    public final transient ExpectedLogs expectedLogs = ExpectedLogs.none(SnsIO.class);
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    private static PublishRequest createSampleMessage(String message) {
        return (PublishRequest)PublishRequest.builder().topicArn(topicArn).message(message).build();
    }

    @Test
    public void testDataWritesToSNS() {
        ImmutableList input = ImmutableList.of((Object)"message1", (Object)"message2");
        PCollection results = (PCollection)((PCollection)this.p.apply((PTransform)Create.of((Iterable)input))).apply((PTransform)SnsIO.write().withPublishRequestFn(SnsIOTest::createSampleMessage).withTopicArn(topicArn).withRetryConfiguration(SnsIO.RetryConfiguration.create((int)5, (Duration)Duration.standardMinutes((long)1L))).withSnsClientProvider(SnsClientMockSuccess::new));
        PCollection publishedResultsSize = (PCollection)results.apply(Count.globally());
        PAssert.that((PCollection)publishedResultsSize).containsInAnyOrder((Iterable)ImmutableList.of((Object)2L));
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testRetries() throws Throwable {
        this.thrown.expectMessage("Error writing to SNS");
        ImmutableList input = ImmutableList.of((Object)"message1", (Object)"message2");
        ((PCollection)this.p.apply((PTransform)Create.of((Iterable)input))).apply((PTransform)SnsIO.write().withPublishRequestFn(SnsIOTest::createSampleMessage).withTopicArn(topicArn).withRetryConfiguration(SnsIO.RetryConfiguration.create((int)4, (Duration)Duration.standardSeconds((long)10L))).withSnsClientProvider(SnsClientMockErrors::new));
        try {
            this.p.run();
        }
        catch (Pipeline.PipelineExecutionException e) {
            this.expectedLogs.verifyWarn(String.format("Error writing to SNS. Retry attempt[%d]", 1));
            this.expectedLogs.verifyWarn(String.format("Error writing to SNS. Retry attempt[%d]", 2));
            this.expectedLogs.verifyWarn(String.format("Error writing to SNS. Retry attempt[%d]", 3));
            throw e.getCause();
        }
        Assert.fail((String)"Pipeline is expected to fail because we were unable to write to SNS.");
    }
}

