/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.api.client.util.Clock;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class PubsubUnboundedSourceTest {
    private static final PubsubClient.SubscriptionPath SUBSCRIPTION = PubsubClient.subscriptionPathFromName((String)"testProject", (String)"testSubscription");
    private static final String DATA = "testData";
    private static final long TIMESTAMP = 1234L;
    private static final long REQ_TIME = 6373L;
    private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
    private static final String ID_ATTRIBUTE = "id";
    private static final String ACK_ID = "testAckId";
    private static final String RECORD_ID = "testRecordId";
    private static final int ACK_TIMEOUT_S = 60;
    private AtomicLong now;
    private Clock clock;
    private PubsubTestClient.PubsubTestClientFactory factory;
    private PubsubUnboundedSource.PubsubSource primSource;
    @Rule
    public TestPipeline p = TestPipeline.create();

    private void setupOneMessage(Iterable<PubsubClient.IncomingMessage> incoming) {
        this.now = new AtomicLong(6373L);
        this.clock = () -> this.now.get();
        this.factory = PubsubTestClient.createFactoryForPull((Clock)this.clock, (PubsubClient.SubscriptionPath)SUBSCRIPTION, (int)60, incoming);
        PubsubUnboundedSource source = new PubsubUnboundedSource(this.clock, (PubsubClient.PubsubClientFactory)this.factory, null, null, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)SUBSCRIPTION), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, true);
        this.primSource = new PubsubUnboundedSource.PubsubSource(source);
    }

    private void setupOneMessage() {
        this.setupOneMessage((Iterable<PubsubClient.IncomingMessage>)ImmutableList.of((Object)new PubsubClient.IncomingMessage(DATA.getBytes(StandardCharsets.UTF_8), null, 1234L, 0L, ACK_ID, RECORD_ID)));
    }

    @After
    public void after() throws IOException {
        this.factory.close();
        this.now = null;
        this.clock = null;
        this.primSource = null;
        this.factory = null;
    }

    private static String data(PubsubMessage message) {
        return new String(message.getPayload(), StandardCharsets.UTF_8);
    }

    @Test
    public void checkpointCoderIsSane() {
        this.setupOneMessage((Iterable<PubsubClient.IncomingMessage>)ImmutableList.of());
        CoderProperties.coderSerializable((Coder)this.primSource.getCheckpointMarkCoder());
    }

    @Test
    public void readOneMessage() throws IOException {
        this.setupOneMessage();
        PubsubUnboundedSource.PubsubReader reader = this.primSource.createReader(this.p.getOptions(), null);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)DATA, (Object)PubsubUnboundedSourceTest.data(reader.getCurrent()));
        TestCase.assertFalse((boolean)reader.advance());
        PubsubUnboundedSource.PubsubCheckpoint checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader.close();
    }

    @Test
    public void timeoutAckAndRereadOneMessage() throws IOException {
        this.setupOneMessage();
        PubsubUnboundedSource.PubsubReader reader = this.primSource.createReader(this.p.getOptions(), null);
        PubsubTestClient pubsubClient = (PubsubTestClient)reader.getPubsubClient();
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)DATA, (Object)PubsubUnboundedSourceTest.data(reader.getCurrent()));
        this.now.addAndGet(65000L);
        pubsubClient.advance();
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)DATA, (Object)PubsubUnboundedSourceTest.data(reader.getCurrent()));
        TestCase.assertFalse((boolean)reader.advance());
        PubsubUnboundedSource.PubsubCheckpoint checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader.close();
    }

    @Test
    public void extendAck() throws IOException {
        this.setupOneMessage();
        PubsubUnboundedSource.PubsubReader reader = this.primSource.createReader(this.p.getOptions(), null);
        PubsubTestClient pubsubClient = (PubsubTestClient)reader.getPubsubClient();
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)DATA, (Object)PubsubUnboundedSourceTest.data(reader.getCurrent()));
        this.now.addAndGet(55000L);
        pubsubClient.advance();
        TestCase.assertFalse((boolean)reader.advance());
        this.now.addAndGet(25000L);
        pubsubClient.advance();
        TestCase.assertFalse((boolean)reader.advance());
        PubsubUnboundedSource.PubsubCheckpoint checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader.close();
    }

    @Test
    public void timeoutAckExtensions() throws IOException {
        this.setupOneMessage();
        PubsubUnboundedSource.PubsubReader reader = this.primSource.createReader(this.p.getOptions(), null);
        PubsubTestClient pubsubClient = (PubsubTestClient)reader.getPubsubClient();
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)DATA, (Object)PubsubUnboundedSourceTest.data(reader.getCurrent()));
        this.now.addAndGet(55000L);
        pubsubClient.advance();
        TestCase.assertFalse((boolean)reader.advance());
        for (int i = 0; i < 3; ++i) {
            this.now.addAndGet(25000L);
            pubsubClient.advance();
            TestCase.assertFalse((boolean)reader.advance());
        }
        this.now.addAndGet(25000L);
        pubsubClient.advance();
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)DATA, (Object)PubsubUnboundedSourceTest.data(reader.getCurrent()));
        PubsubUnboundedSource.PubsubCheckpoint checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader.close();
    }

    @Test
    public void multipleReaders() throws IOException {
        ArrayList<PubsubClient.IncomingMessage> incoming = new ArrayList<PubsubClient.IncomingMessage>();
        for (int i = 0; i < 2; ++i) {
            String data = String.format("data_%d", i);
            String ackid = String.format("ackid_%d", i);
            incoming.add(new PubsubClient.IncomingMessage(data.getBytes(StandardCharsets.UTF_8), null, 1234L, 0L, ackid, RECORD_ID));
        }
        this.setupOneMessage(incoming);
        PubsubUnboundedSource.PubsubReader reader = this.primSource.createReader(this.p.getOptions(), null);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)"data_0", (Object)PubsubUnboundedSourceTest.data(reader.getCurrent()));
        PubsubUnboundedSource.PubsubCheckpoint checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        Assert.assertEquals((long)1L, (long)checkpoint.notYetReadIds.size());
        Assert.assertEquals((Object)"ackid_1", checkpoint.notYetReadIds.get(0));
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((Object)"data_1", (Object)PubsubUnboundedSourceTest.data(reader.getCurrent()));
        byte[] checkpointBytes = CoderUtils.encodeToByteArray((Coder)this.primSource.getCheckpointMarkCoder(), (Object)checkpoint);
        checkpoint = (PubsubUnboundedSource.PubsubCheckpoint)CoderUtils.decodeFromByteArray((Coder)this.primSource.getCheckpointMarkCoder(), (byte[])checkpointBytes);
        Assert.assertEquals((long)1L, (long)checkpoint.notYetReadIds.size());
        Assert.assertEquals((Object)"ackid_1", checkpoint.notYetReadIds.get(0));
        reader = this.primSource.createReader(this.p.getOptions(), checkpoint);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((Object)"data_1", (Object)PubsubUnboundedSourceTest.data(reader.getCurrent()));
        TestCase.assertFalse((boolean)reader.advance());
        checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader.close();
    }

    private long messageNumToTimestamp(int messageNum) {
        return 1234L + (long)(messageNum * 100);
    }

    @Test
    public void readManyMessages() throws IOException {
        HashMap<String, Integer> dataToMessageNum = new HashMap<String, Integer>();
        int m = 97;
        int n = 10000;
        ArrayList<PubsubClient.IncomingMessage> incoming = new ArrayList<PubsubClient.IncomingMessage>();
        for (int i = 0; i < 10000; ++i) {
            int messageNum = i / 97 * 97 + 96 - i % 97;
            String data = String.format("data_%d", messageNum);
            dataToMessageNum.put(data, messageNum);
            String recid = String.format("recordid_%d", messageNum);
            String ackId = String.format("ackid_%d", messageNum);
            incoming.add(new PubsubClient.IncomingMessage(data.getBytes(StandardCharsets.UTF_8), null, this.messageNumToTimestamp(messageNum), 0L, ackId, recid));
        }
        this.setupOneMessage(incoming);
        PubsubUnboundedSource.PubsubReader reader = this.primSource.createReader(this.p.getOptions(), null);
        PubsubTestClient pubsubClient = (PubsubTestClient)reader.getPubsubClient();
        for (int i = 0; i < 10000; ++i) {
            if (i == 0) {
                Assert.assertTrue((boolean)reader.start());
            } else {
                Assert.assertTrue((boolean)reader.advance());
            }
            this.now.addAndGet(30L);
            pubsubClient.advance();
            String data = PubsubUnboundedSourceTest.data(reader.getCurrent());
            Integer messageNum = (Integer)dataToMessageNum.remove(data);
            Assert.assertNotNull((Object)messageNum);
            Assert.assertEquals((Object)new Instant(this.messageNumToTimestamp(messageNum)), (Object)reader.getCurrentTimestamp());
            String recid = String.format("recordid_%d", messageNum);
            Assert.assertArrayEquals((byte[])recid.getBytes(StandardCharsets.UTF_8), (byte[])reader.getCurrentRecordId());
            if (i % 1000 != 999) continue;
            long watermark = reader.getWatermark().getMillis();
            long minOutstandingTimestamp = Long.MAX_VALUE;
            for (Integer outstandingMessageNum : dataToMessageNum.values()) {
                minOutstandingTimestamp = Math.min(minOutstandingTimestamp, this.messageNumToTimestamp(outstandingMessageNum));
            }
            Assert.assertThat((Object)watermark, (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(minOutstandingTimestamp)));
            PubsubUnboundedSource.PubsubCheckpoint checkpoint = reader.getCheckpointMark();
            if (i % 2000 != 1999) continue;
            checkpoint.finalizeCheckpoint();
        }
        TestCase.assertFalse((boolean)reader.advance());
        Assert.assertTrue((boolean)dataToMessageNum.isEmpty());
        reader.close();
    }

    @Test
    public void noSubscriptionSplitGeneratesSubscription() throws Exception {
        PubsubClient.TopicPath topicPath = PubsubClient.topicPathFromName((String)"my_project", (String)"my_topic");
        this.factory = PubsubTestClient.createFactoryForCreateSubscription();
        PubsubUnboundedSource source = new PubsubUnboundedSource((PubsubClient.PubsubClientFactory)this.factory, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)PubsubClient.projectPathFromId((String)"my_project")), (ValueProvider)ValueProvider.StaticValueProvider.of((Object)topicPath), null, null, null, false);
        Assert.assertThat((Object)source.getSubscription(), (Matcher)Matchers.nullValue());
        Assert.assertThat((Object)source.getSubscription(), (Matcher)Matchers.nullValue());
        PipelineOptions options = PipelineOptionsFactory.create();
        List splits = new PubsubUnboundedSource.PubsubSource(source).split(3, options);
        Assert.assertThat((Object)splits, (Matcher)Matchers.hasSize((Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0))));
        for (PubsubUnboundedSource.PubsubSource split : splits) {
            Assert.assertThat((Object)split, (Matcher)Matchers.equalTo((Object)((PubsubUnboundedSource.PubsubSource)splits.get(0))));
        }
        Assert.assertThat((Object)((PubsubUnboundedSource.PubsubSource)splits.get((int)0)).subscriptionPath, (Matcher)Matchers.not((Matcher)Matchers.nullValue()));
    }

    @Test
    public void noSubscriptionNoSplitGeneratesSubscription() throws Exception {
        PubsubClient.TopicPath topicPath = PubsubClient.topicPathFromName((String)"my_project", (String)"my_topic");
        this.factory = PubsubTestClient.createFactoryForCreateSubscription();
        PubsubUnboundedSource source = new PubsubUnboundedSource((PubsubClient.PubsubClientFactory)this.factory, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)PubsubClient.projectPathFromId((String)"my_project")), (ValueProvider)ValueProvider.StaticValueProvider.of((Object)topicPath), null, null, null, false);
        Assert.assertThat((Object)source.getSubscription(), (Matcher)Matchers.nullValue());
        Assert.assertThat((Object)source.getSubscription(), (Matcher)Matchers.nullValue());
        PipelineOptions options = PipelineOptionsFactory.create();
        PubsubUnboundedSource.PubsubSource actualSource = new PubsubUnboundedSource.PubsubSource(source);
        PubsubUnboundedSource.PubsubReader reader = actualSource.createReader(options, null);
        PubsubClient.SubscriptionPath createdSubscription = reader.subscription;
        Assert.assertThat((Object)createdSubscription, (Matcher)Matchers.not((Matcher)Matchers.nullValue()));
        PubsubUnboundedSource.PubsubCheckpoint checkpoint = reader.getCheckpointMark();
        Assert.assertThat((Object)checkpoint.subscriptionPath, (Matcher)Matchers.equalTo((Object)createdSubscription.getPath()));
        checkpoint.finalizeCheckpoint();
        PubsubUnboundedSource.PubsubCheckpoint deserCheckpoint = (PubsubUnboundedSource.PubsubCheckpoint)CoderUtils.clone((Coder)actualSource.getCheckpointMarkCoder(), (Object)checkpoint);
        Assert.assertThat((Object)checkpoint.subscriptionPath, (Matcher)Matchers.not((Matcher)Matchers.nullValue()));
        Assert.assertThat((Object)checkpoint.subscriptionPath, (Matcher)Matchers.equalTo((Object)deserCheckpoint.subscriptionPath));
        PubsubUnboundedSource.PubsubReader readerFromOriginal = actualSource.createReader(options, checkpoint);
        PubsubUnboundedSource.PubsubReader readerFromDeser = actualSource.createReader(options, deserCheckpoint);
        Assert.assertThat((Object)readerFromOriginal.subscription, (Matcher)Matchers.equalTo((Object)createdSubscription));
        Assert.assertThat((Object)readerFromDeser.subscription, (Matcher)Matchers.equalTo((Object)createdSubscription));
    }

    @Test
    public void closeWithActiveCheckpoints() throws Exception {
        this.setupOneMessage();
        PubsubUnboundedSource.PubsubReader reader = this.primSource.createReader(this.p.getOptions(), null);
        reader.start();
        PubsubUnboundedSource.PubsubCheckpoint checkpoint = reader.getCheckpointMark();
        reader.close();
        checkpoint.finalizeCheckpoint();
    }
}

