package org.apache.beam.sdk.io.aws.sqs;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.aws.options.AwsOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.CoderUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws/sqs/SqsUnboundedReaderTest.class */
public class SqsUnboundedReaderTest {
    private static final String DATA = "testData";

    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    @Rule
    public EmbeddedSqsServer embeddedSqsRestServer = new EmbeddedSqsServer();
    private SqsUnboundedSource source;

    private void setupOneMessage() {
        AmazonSQS client = this.embeddedSqsRestServer.getClient();
        String queueUrl = this.embeddedSqsRestServer.getQueueUrl();
        client.sendMessage(queueUrl, DATA);
        this.source = new SqsUnboundedSource(SqsIO.read().withQueueUrl(queueUrl).withMaxNumRecords(1L), new SqsConfiguration(this.pipeline.getOptions().as(AwsOptions.class)), SqsMessageCoder.of());
    }

    private void setupMessages(List<String> list) {
        AmazonSQS client = this.embeddedSqsRestServer.getClient();
        String queueUrl = this.embeddedSqsRestServer.getQueueUrl();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            client.sendMessage(queueUrl, it.next());
        }
        this.source = new SqsUnboundedSource(SqsIO.read().withQueueUrl(queueUrl).withMaxNumRecords(1L), new SqsConfiguration(this.pipeline.getOptions().as(AwsOptions.class)), SqsMessageCoder.of());
    }

    @Test
    public void testReadOneMessage() throws IOException {
        setupOneMessage();
        UnboundedSource.UnboundedReader createReader = this.source.createReader(this.pipeline.getOptions(), (SqsCheckpointMark) null);
        Assert.assertTrue(createReader.start());
        Assert.assertEquals(DATA, ((Message) createReader.getCurrent()).getBody());
        TestCase.assertFalse(createReader.advance());
        createReader.getCheckpointMark().finalizeCheckpoint();
        createReader.close();
    }

    @Test
    public void testTimeoutAckAndRereadOneMessage() throws IOException {
        setupOneMessage();
        UnboundedSource.UnboundedReader createReader = this.source.createReader(this.pipeline.getOptions(), (SqsCheckpointMark) null);
        AmazonSQS client = this.embeddedSqsRestServer.getClient();
        Assert.assertTrue(createReader.start());
        Assert.assertEquals(DATA, ((Message) createReader.getCurrent()).getBody());
        client.changeMessageVisibility(this.source.getRead().queueUrl(), ((Message) createReader.getCurrent()).getReceiptHandle(), 0);
        Assert.assertTrue(createReader.advance());
        Assert.assertEquals(DATA, ((Message) createReader.getCurrent()).getBody());
        TestCase.assertFalse(createReader.advance());
        createReader.getCheckpointMark().finalizeCheckpoint();
        createReader.close();
    }

    @Test
    public void testMultipleReaders() throws IOException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            arrayList.add(String.format("data_%d", Integer.valueOf(i)));
        }
        setupMessages(arrayList);
        UnboundedSource.UnboundedReader createReader = this.source.createReader(this.pipeline.getOptions(), (SqsCheckpointMark) null);
        Assert.assertTrue(createReader.start());
        Assert.assertEquals("data_0", ((Message) createReader.getCurrent()).getBody());
        SqsCheckpointMark checkpointMark = createReader.getCheckpointMark();
        checkpointMark.finalizeCheckpoint();
        Assert.assertEquals(1L, checkpointMark.notYetReadReceipts.size());
        Assert.assertTrue(createReader.advance());
        Assert.assertEquals("data_1", ((Message) createReader.getCurrent()).getBody());
        SqsCheckpointMark sqsCheckpointMark = (SqsCheckpointMark) CoderUtils.decodeFromByteArray(this.source.getCheckpointMarkCoder(), CoderUtils.encodeToByteArray(this.source.getCheckpointMarkCoder(), checkpointMark));
        Assert.assertEquals(1L, sqsCheckpointMark.notYetReadReceipts.size());
        UnboundedSource.UnboundedReader createReader2 = this.source.createReader(this.pipeline.getOptions(), sqsCheckpointMark);
        Assert.assertTrue(createReader2.start());
        Assert.assertEquals("data_1", ((Message) createReader2.getCurrent()).getBody());
        TestCase.assertFalse(createReader2.advance());
        createReader2.getCheckpointMark().finalizeCheckpoint();
        createReader2.close();
    }

    @Test
    public void testReadMany() throws IOException {
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            hashSet.add(String.format("data_%d", Integer.valueOf(i)));
            arrayList.add(String.format("data_%d", Integer.valueOf(i)));
        }
        setupMessages(arrayList);
        SqsUnboundedReader createReader = this.source.createReader(this.pipeline.getOptions(), (SqsCheckpointMark) null);
        for (int i2 = 0; i2 < 100; i2++) {
            if (i2 == 0) {
                Assert.assertTrue(createReader.start());
            } else {
                Assert.assertTrue(createReader.advance());
            }
            Assert.assertTrue(hashSet.remove(createReader.getCurrent().getBody()));
        }
        TestCase.assertFalse(createReader.advance());
        Assert.assertTrue(hashSet.isEmpty());
        createReader.close();
    }

    @Test
    public void testCloseWithActiveCheckpoints() throws Exception {
        setupOneMessage();
        UnboundedSource.UnboundedReader createReader = this.source.createReader(this.pipeline.getOptions(), (SqsCheckpointMark) null);
        createReader.start();
        UnboundedSource.CheckpointMark checkpointMark = createReader.getCheckpointMark();
        createReader.close();
        checkpointMark.finalizeCheckpoint();
    }
}
