/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.sqs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import junit.framework.TestCase;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.aws2.sqs.EmbeddedSqsServer;
import org.apache.beam.sdk.io.aws2.sqs.SqsCheckpointMark;
import org.apache.beam.sdk.io.aws2.sqs.SqsClientProvider;
import org.apache.beam.sdk.io.aws2.sqs.SqsClientProviderMock;
import org.apache.beam.sdk.io.aws2.sqs.SqsIO;
import org.apache.beam.sdk.io.aws2.sqs.SqsMessage;
import org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedReader;
import org.apache.beam.sdk.io.aws2.sqs.SqsUnboundedSource;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.CoderUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

@RunWith(value=JUnit4.class)
public class SqsUnboundedReaderTest {
    private static final String DATA = "testData";
    @Rule
    public TestPipeline pipeline = TestPipeline.create();
    @Rule
    public EmbeddedSqsServer embeddedSqsRestServer = new EmbeddedSqsServer();
    private SqsUnboundedSource source;

    private void setupOneMessage() {
        SqsClient client = this.embeddedSqsRestServer.getClient();
        String queueUrl = this.embeddedSqsRestServer.getQueueUrl();
        client.sendMessage((SendMessageRequest)SendMessageRequest.builder().queueUrl(queueUrl).messageBody(DATA).build());
        this.source = new SqsUnboundedSource(SqsIO.read().withQueueUrl(queueUrl).withSqsClientProvider((SqsClientProvider)SqsClientProviderMock.of(client)).withMaxNumRecords(1L));
    }

    private void setupMessages(List<String> messages) {
        SqsClient client = this.embeddedSqsRestServer.getClient();
        String queueUrl = this.embeddedSqsRestServer.getQueueUrl();
        for (String message : messages) {
            client.sendMessage((SendMessageRequest)SendMessageRequest.builder().queueUrl(queueUrl).messageBody(message).build());
        }
        this.source = new SqsUnboundedSource(SqsIO.read().withQueueUrl(queueUrl).withSqsClientProvider((SqsClientProvider)SqsClientProviderMock.of(client)).withMaxNumRecords((long)messages.size()));
    }

    @Test
    public void testReadOneMessage() throws IOException {
        this.setupOneMessage();
        UnboundedSource.UnboundedReader reader = this.source.createReader(this.pipeline.getOptions(), null);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)DATA, (Object)((SqsMessage)reader.getCurrent()).getBody());
        TestCase.assertFalse((boolean)reader.advance());
        UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader.close();
    }

    @Test
    public void testTimeoutAckAndRereadOneMessage() throws IOException {
        this.setupOneMessage();
        UnboundedSource.UnboundedReader reader = this.source.createReader(this.pipeline.getOptions(), null);
        SqsClient sqsClient = this.source.getSqs();
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)DATA, (Object)((SqsMessage)reader.getCurrent()).getBody());
        String receiptHandle = ((SqsMessage)reader.getCurrent()).getReceiptHandle();
        sqsClient.changeMessageVisibility((ChangeMessageVisibilityRequest)ChangeMessageVisibilityRequest.builder().queueUrl(this.source.getRead().queueUrl()).receiptHandle(receiptHandle).visibilityTimeout(Integer.valueOf(0)).build());
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)DATA, (Object)((SqsMessage)reader.getCurrent()).getBody());
        TestCase.assertFalse((boolean)reader.advance());
        UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader.close();
    }

    @Test
    public void testMultipleReaders() throws IOException {
        ArrayList<String> incoming = new ArrayList<String>();
        for (int i = 0; i < 2; ++i) {
            incoming.add(String.format("data_%d", i));
        }
        this.setupMessages(incoming);
        UnboundedSource.UnboundedReader reader = this.source.createReader(this.pipeline.getOptions(), null);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)"data_0", (Object)((SqsMessage)reader.getCurrent()).getBody());
        SqsCheckpointMark checkpoint = (SqsCheckpointMark)reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        Assert.assertEquals((long)1L, (long)checkpoint.notYetReadReceipts.size());
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)"data_1", (Object)((SqsMessage)reader.getCurrent()).getBody());
        byte[] checkpointBytes = CoderUtils.encodeToByteArray((Coder)this.source.getCheckpointMarkCoder(), (Object)checkpoint);
        checkpoint = (SqsCheckpointMark)CoderUtils.decodeFromByteArray((Coder)this.source.getCheckpointMarkCoder(), (byte[])checkpointBytes);
        Assert.assertEquals((long)1L, (long)checkpoint.notYetReadReceipts.size());
        reader = this.source.createReader(this.pipeline.getOptions(), checkpoint);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)"data_1", (Object)((SqsMessage)reader.getCurrent()).getBody());
        TestCase.assertFalse((boolean)reader.advance());
        checkpoint = (SqsCheckpointMark)reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader.close();
    }

    @Test
    public void testReadMany() throws IOException {
        HashSet<String> messages = new HashSet<String>();
        ArrayList<String> incoming = new ArrayList<String>();
        for (int i = 0; i < 100; ++i) {
            String content = String.format("data_%d", i);
            messages.add(content);
            incoming.add(String.format("data_%d", i));
        }
        this.setupMessages(incoming);
        SqsUnboundedReader reader = (SqsUnboundedReader)this.source.createReader(this.pipeline.getOptions(), null);
        for (int i = 0; i < 100; ++i) {
            if (i == 0) {
                Assert.assertTrue((boolean)reader.start());
            } else {
                Assert.assertTrue((boolean)reader.advance());
            }
            String data = reader.getCurrent().getBody();
            boolean messageNum = messages.remove(data);
            Assert.assertTrue((boolean)messageNum);
        }
        TestCase.assertFalse((boolean)reader.advance());
        Assert.assertTrue((boolean)messages.isEmpty());
        reader.close();
    }

    @Test
    public void testCloseWithActiveCheckpoints() throws Exception {
        this.setupOneMessage();
        UnboundedSource.UnboundedReader reader = this.source.createReader(this.pipeline.getOptions(), null);
        reader.start();
        UnboundedSource.CheckpointMark checkpoint = reader.getCheckpointMark();
        reader.close();
        checkpoint.finalizeCheckpoint();
    }
}

