/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.sqs;

import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.stream.IntStream;
import junit.framework.TestCase;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer;
import org.apache.beam.sdk.io.aws2.sqs.SqsCheckpointMark;
import org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader;
import org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.CoderUtils;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsClientBuilder;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class SqsUnboundedReaderTest {
    private static final String DATA = "testData";
    @ClassRule
    public static EmbeddedSqsServer sqsServer = new EmbeddedSqsServer();
    @Rule
    public EmbeddedSqsServer.TestCaseEnv testCase = new EmbeddedSqsServer.TestCaseEnv(sqsServer);
    @Mock(answer=Answers.RETURNS_DEEP_STUBS)
    public SqsUnboundedSource mockSource;
    private AwsOptions options = (AwsOptions)PipelineOptionsFactory.create().as(AwsOptions.class);

    private void setupMessages(String ... messages) {
        SqsClient client = this.testCase.getClient();
        String queueUrl = this.testCase.getQueueUrl();
        for (String message : messages) {
            client.sendMessage(b -> b.queueUrl(queueUrl).messageBody(message));
        }
        MockClientBuilderFactory.set((PipelineOptions)this.options, SqsClientBuilder.class, client);
        Mockito.when((Object)this.mockSource.getRead().queueUrl()).thenReturn((Object)queueUrl);
    }

    @Test
    public void testReadOneMessage() throws IOException {
        this.setupMessages(DATA);
        SqsUnboundedReader reader = new SqsUnboundedReader(this.mockSource, null, this.options);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)DATA, (Object)reader.getCurrent().getBody());
        TestCase.assertFalse((boolean)reader.advance());
        UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader.close();
    }

    @Test
    public void testAckDeletedMessage() throws IOException {
        this.setupMessages(DATA);
        SqsUnboundedReader reader = new SqsUnboundedReader(this.mockSource, null, this.options);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)DATA, (Object)reader.getCurrent().getBody());
        String receiptHandle = reader.getCurrent().getReceiptHandle();
        TestCase.assertFalse((boolean)reader.advance());
        this.testCase.getClient().deleteMessage(b -> b.queueUrl(this.testCase.getQueueUrl()).receiptHandle(receiptHandle));
        UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader.close();
    }

    @Test
    public void testExtendDeletedMessage() throws IOException {
        this.setupMessages(DATA);
        Clock clock = (Clock)Mockito.mock(Clock.class);
        Mockito.when((Object)clock.millis()).thenReturn((Object)System.currentTimeMillis());
        SqsUnboundedReader reader = new SqsUnboundedReader(this.mockSource, null, this.options, clock);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)DATA, (Object)reader.getCurrent().getBody());
        String receiptHandle = reader.getCurrent().getReceiptHandle();
        this.testCase.getClient().deleteMessage(b -> b.queueUrl(this.testCase.getQueueUrl()).receiptHandle(receiptHandle));
        Mockito.when((Object)clock.millis()).thenReturn((Object)(System.currentTimeMillis() + reader.getVisibilityTimeoutMs() * 8L / 10L));
        TestCase.assertFalse((boolean)reader.advance());
        reader.close();
    }

    @Test
    public void testRereadExpiredMessage() throws IOException {
        this.setupMessages(DATA);
        SqsUnboundedReader reader = new SqsUnboundedReader(this.mockSource, null, this.options);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)DATA, (Object)reader.getCurrent().getBody());
        String receiptHandle = reader.getCurrent().getReceiptHandle();
        this.testCase.getClient().changeMessageVisibility(b -> b.queueUrl(this.testCase.getQueueUrl()).receiptHandle(receiptHandle).visibilityTimeout(Integer.valueOf(0)));
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)DATA, (Object)reader.getCurrent().getBody());
        TestCase.assertFalse((boolean)reader.advance());
        UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader.close();
    }

    @Test
    public void testRestoreReaderFromCheckpoint() throws IOException {
        this.setupMessages("data_0", "data_1");
        SqsUnboundedReader reader = new SqsUnboundedReader(this.mockSource, null, this.options);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)"data_0", (Object)reader.getCurrent().getBody());
        SqsCheckpointMark checkpoint = (SqsCheckpointMark)reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        Assert.assertEquals((long)1L, (long)checkpoint.notYetReadReceipts.size());
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)"data_1", (Object)reader.getCurrent().getBody());
        SerializableCoder coder = SerializableCoder.of(SqsCheckpointMark.class);
        byte[] checkpointBytes = CoderUtils.encodeToByteArray((Coder)coder, (Object)checkpoint);
        checkpoint = (SqsCheckpointMark)CoderUtils.decodeFromByteArray((Coder)coder, (byte[])checkpointBytes);
        Assert.assertEquals((long)1L, (long)checkpoint.notYetReadReceipts.size());
        reader = new SqsUnboundedReader(this.mockSource, checkpoint, this.options);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)"data_1", (Object)reader.getCurrent().getBody());
        TestCase.assertFalse((boolean)reader.advance());
        checkpoint = (SqsCheckpointMark)reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader.close();
    }

    @Test
    public void testReadManyMessages() throws IOException {
        ArrayList<String> receivedMessages = new ArrayList<String>();
        this.setupMessages((String[])IntStream.range(0, 100).mapToObj(Integer::toString).toArray(String[]::new));
        SqsUnboundedReader reader = new SqsUnboundedReader(this.mockSource, null, this.options);
        Assert.assertTrue((boolean)reader.start());
        do {
            receivedMessages.add(reader.getCurrent().getBody());
        } while (reader.advance());
        Assertions.assertThat(receivedMessages).hasSize(100);
        Assertions.assertThat(receivedMessages).doesNotHaveDuplicates();
        UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader.close();
    }

    @Test
    public void testCloseWithActiveCheckpoints() throws Exception {
        this.setupMessages(DATA);
        SqsUnboundedReader reader = new SqsUnboundedReader(this.mockSource, null, this.options);
        reader.start();
        UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
        reader.close();
        checkpoint.finalizeCheckpoint();
    }
}

