package org.apache.beam.sdk.io.aws2.sqs;

import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.stream.IntStream;
import junit.framework.TestCase;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer;
import org.apache.beam.sdk.util.CoderUtils;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.sqs.SqsClient;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.class */
public class SqsUnboundedReaderTest {
    private static final String DATA = "testData";

    @ClassRule
    public static EmbeddedSqsServer sqsServer = new EmbeddedSqsServer();

    @Rule
    public EmbeddedSqsServer.TestCaseEnv testCase = new EmbeddedSqsServer.TestCaseEnv(sqsServer);

    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
    public SqsUnboundedSource mockSource;

    @Mock
    public AwsOptions options;

    private void setupMessages(String... strArr) {
        SqsClient client = this.testCase.getClient();
        String queueUrl = this.testCase.getQueueUrl();
        for (String str : strArr) {
            client.sendMessage(builder -> {
                builder.queueUrl(queueUrl).messageBody(str);
            });
        }
        Mockito.when(this.mockSource.getRead().sqsClientProvider()).thenReturn(StaticSqsClientProvider.of(client));
        Mockito.when(this.mockSource.getRead().queueUrl()).thenReturn(queueUrl);
    }

    @Test
    public void testReadOneMessage() throws IOException {
        setupMessages(DATA);
        SqsUnboundedReader sqsUnboundedReader = new SqsUnboundedReader(this.mockSource, (SqsCheckpointMark) null, this.options);
        Assert.assertTrue(sqsUnboundedReader.start());
        Assert.assertEquals(DATA, sqsUnboundedReader.getCurrent().getBody());
        TestCase.assertFalse(sqsUnboundedReader.advance());
        sqsUnboundedReader.getCheckpointMark().finalizeCheckpoint();
        sqsUnboundedReader.close();
    }

    @Test
    public void testAckDeletedMessage() throws IOException {
        setupMessages(DATA);
        SqsUnboundedReader sqsUnboundedReader = new SqsUnboundedReader(this.mockSource, (SqsCheckpointMark) null, this.options);
        Assert.assertTrue(sqsUnboundedReader.start());
        Assert.assertEquals(DATA, sqsUnboundedReader.getCurrent().getBody());
        String receiptHandle = sqsUnboundedReader.getCurrent().getReceiptHandle();
        TestCase.assertFalse(sqsUnboundedReader.advance());
        this.testCase.getClient().deleteMessage(builder -> {
            builder.queueUrl(this.testCase.getQueueUrl()).receiptHandle(receiptHandle);
        });
        sqsUnboundedReader.getCheckpointMark().finalizeCheckpoint();
        sqsUnboundedReader.close();
    }

    @Test
    public void testExtendDeletedMessage() throws IOException {
        setupMessages(DATA);
        Clock clock = (Clock) Mockito.mock(Clock.class);
        Mockito.when(Long.valueOf(clock.millis())).thenReturn(Long.valueOf(System.currentTimeMillis()));
        SqsUnboundedReader sqsUnboundedReader = new SqsUnboundedReader(this.mockSource, (SqsCheckpointMark) null, this.options, clock);
        Assert.assertTrue(sqsUnboundedReader.start());
        Assert.assertEquals(DATA, sqsUnboundedReader.getCurrent().getBody());
        String receiptHandle = sqsUnboundedReader.getCurrent().getReceiptHandle();
        this.testCase.getClient().deleteMessage(builder -> {
            builder.queueUrl(this.testCase.getQueueUrl()).receiptHandle(receiptHandle);
        });
        Mockito.when(Long.valueOf(clock.millis())).thenReturn(Long.valueOf(System.currentTimeMillis() + ((sqsUnboundedReader.getVisibilityTimeoutMs() * 8) / 10)));
        TestCase.assertFalse(sqsUnboundedReader.advance());
        sqsUnboundedReader.close();
    }

    @Test
    public void testRereadExpiredMessage() throws IOException {
        setupMessages(DATA);
        SqsUnboundedReader sqsUnboundedReader = new SqsUnboundedReader(this.mockSource, (SqsCheckpointMark) null, this.options);
        Assert.assertTrue(sqsUnboundedReader.start());
        Assert.assertEquals(DATA, sqsUnboundedReader.getCurrent().getBody());
        String receiptHandle = sqsUnboundedReader.getCurrent().getReceiptHandle();
        this.testCase.getClient().changeMessageVisibility(builder -> {
            builder.queueUrl(this.testCase.getQueueUrl()).receiptHandle(receiptHandle).visibilityTimeout(0);
        });
        Assert.assertTrue(sqsUnboundedReader.advance());
        Assert.assertEquals(DATA, sqsUnboundedReader.getCurrent().getBody());
        TestCase.assertFalse(sqsUnboundedReader.advance());
        sqsUnboundedReader.getCheckpointMark().finalizeCheckpoint();
        sqsUnboundedReader.close();
    }

    @Test
    public void testRestoreReaderFromCheckpoint() throws IOException {
        setupMessages("data_0", "data_1");
        SqsUnboundedReader sqsUnboundedReader = new SqsUnboundedReader(this.mockSource, (SqsCheckpointMark) null, this.options);
        Assert.assertTrue(sqsUnboundedReader.start());
        Assert.assertEquals("data_0", sqsUnboundedReader.getCurrent().getBody());
        SqsCheckpointMark checkpointMark = sqsUnboundedReader.getCheckpointMark();
        checkpointMark.finalizeCheckpoint();
        Assert.assertEquals(1L, checkpointMark.notYetReadReceipts.size());
        Assert.assertTrue(sqsUnboundedReader.advance());
        Assert.assertEquals("data_1", sqsUnboundedReader.getCurrent().getBody());
        SerializableCoder of = SerializableCoder.of(SqsCheckpointMark.class);
        SqsCheckpointMark sqsCheckpointMark = (SqsCheckpointMark) CoderUtils.decodeFromByteArray(of, CoderUtils.encodeToByteArray(of, checkpointMark));
        Assert.assertEquals(1L, sqsCheckpointMark.notYetReadReceipts.size());
        SqsUnboundedReader sqsUnboundedReader2 = new SqsUnboundedReader(this.mockSource, sqsCheckpointMark, this.options);
        Assert.assertTrue(sqsUnboundedReader2.start());
        Assert.assertEquals("data_1", sqsUnboundedReader2.getCurrent().getBody());
        TestCase.assertFalse(sqsUnboundedReader2.advance());
        sqsUnboundedReader2.getCheckpointMark().finalizeCheckpoint();
        sqsUnboundedReader2.close();
    }

    @Test
    public void testReadManyMessages() throws IOException {
        ArrayList arrayList = new ArrayList();
        setupMessages((String[]) IntStream.range(0, 100).mapToObj(Integer::toString).toArray(i -> {
            return new String[i];
        }));
        SqsUnboundedReader sqsUnboundedReader = new SqsUnboundedReader(this.mockSource, (SqsCheckpointMark) null, this.options);
        Assert.assertTrue(sqsUnboundedReader.start());
        do {
            arrayList.add(sqsUnboundedReader.getCurrent().getBody());
        } while (sqsUnboundedReader.advance());
        Assertions.assertThat(arrayList).hasSize(100);
        Assertions.assertThat(arrayList).doesNotHaveDuplicates();
        sqsUnboundedReader.getCheckpointMark().finalizeCheckpoint();
        sqsUnboundedReader.close();
    }

    @Test
    public void testCloseWithActiveCheckpoints() throws Exception {
        setupMessages(DATA);
        SqsUnboundedReader sqsUnboundedReader = new SqsUnboundedReader(this.mockSource, (SqsCheckpointMark) null, this.options);
        sqsUnboundedReader.start();
        UnboundedSource.CheckpointMark checkpointMark = sqsUnboundedReader.getCheckpointMark();
        sqsUnboundedReader.close();
        checkpointMark.finalizeCheckpoint();
    }
}
