package org.apache.beam.sdk.io;

import com.google.api.client.util.Clock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.PubsubUnboundedSource;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.PubsubClient;
import org.apache.beam.sdk.util.PubsubTestClient;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/PubsubUnboundedSourceTest.class */
public class PubsubUnboundedSourceTest {
    private static final PubsubClient.SubscriptionPath SUBSCRIPTION = PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
    private static final String DATA = "testData";
    private static final long TIMESTAMP = 1234;
    private static final long REQ_TIME = 6373;
    private static final String TIMESTAMP_LABEL = "timestamp";
    private static final String ID_LABEL = "id";
    private static final String ACK_ID = "testAckId";
    private static final String RECORD_ID = "testRecordId";
    private static final int ACK_TIMEOUT_S = 60;
    private AtomicLong now;
    private Clock clock;
    private PubsubTestClient.PubsubTestClientFactory factory;
    private PubsubUnboundedSource.PubsubSource<String> primSource;

    private void setupOneMessage(Iterable<PubsubClient.IncomingMessage> iterable) {
        this.now = new AtomicLong(REQ_TIME);
        this.clock = new Clock() { // from class: org.apache.beam.sdk.io.PubsubUnboundedSourceTest.1
            public long currentTimeMillis() {
                return PubsubUnboundedSourceTest.this.now.get();
            }
        };
        this.factory = PubsubTestClient.createFactoryForPull(this.clock, SUBSCRIPTION, ACK_TIMEOUT_S, iterable);
        this.primSource = new PubsubUnboundedSource.PubsubSource<>(new PubsubUnboundedSource(this.clock, this.factory, (PubsubClient.ProjectPath) null, (PubsubClient.TopicPath) null, SUBSCRIPTION, StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL));
    }

    private void setupOneMessage() {
        setupOneMessage(ImmutableList.of(new PubsubClient.IncomingMessage(DATA.getBytes(), TIMESTAMP, 0L, ACK_ID, RECORD_ID)));
    }

    @After
    public void after() throws IOException {
        this.factory.close();
        this.now = null;
        this.clock = null;
        this.primSource = null;
        this.factory = null;
    }

    @Test
    public void checkpointCoderIsSane() throws Exception {
        setupOneMessage(ImmutableList.of());
        CoderProperties.coderSerializable(this.primSource.getCheckpointMarkCoder());
    }

    @Test
    public void readOneMessage() throws IOException {
        setupOneMessage();
        PubsubUnboundedSource.PubsubReader createReader = this.primSource.createReader(TestPipeline.create().getOptions(), (PubsubUnboundedSource.PubsubCheckpoint) null);
        Assert.assertTrue(createReader.start());
        Assert.assertEquals(DATA, createReader.getCurrent());
        TestCase.assertFalse(createReader.advance());
        createReader.getCheckpointMark().finalizeCheckpoint();
        createReader.close();
    }

    @Test
    public void timeoutAckAndRereadOneMessage() throws IOException {
        setupOneMessage();
        PubsubUnboundedSource.PubsubReader createReader = this.primSource.createReader(TestPipeline.create().getOptions(), (PubsubUnboundedSource.PubsubCheckpoint) null);
        PubsubTestClient pubsubClient = createReader.getPubsubClient();
        Assert.assertTrue(createReader.start());
        Assert.assertEquals(DATA, createReader.getCurrent());
        this.now.addAndGet(65000L);
        pubsubClient.advance();
        Assert.assertTrue(createReader.advance());
        Assert.assertEquals(DATA, createReader.getCurrent());
        TestCase.assertFalse(createReader.advance());
        createReader.getCheckpointMark().finalizeCheckpoint();
        createReader.close();
    }

    @Test
    public void extendAck() throws IOException {
        setupOneMessage();
        PubsubUnboundedSource.PubsubReader createReader = this.primSource.createReader(TestPipeline.create().getOptions(), (PubsubUnboundedSource.PubsubCheckpoint) null);
        PubsubTestClient pubsubClient = createReader.getPubsubClient();
        Assert.assertTrue(createReader.start());
        Assert.assertEquals(DATA, createReader.getCurrent());
        this.now.addAndGet(55000L);
        pubsubClient.advance();
        TestCase.assertFalse(createReader.advance());
        this.now.addAndGet(25000L);
        pubsubClient.advance();
        TestCase.assertFalse(createReader.advance());
        createReader.getCheckpointMark().finalizeCheckpoint();
        createReader.close();
    }

    @Test
    public void timeoutAckExtensions() throws IOException {
        setupOneMessage();
        PubsubUnboundedSource.PubsubReader createReader = this.primSource.createReader(TestPipeline.create().getOptions(), (PubsubUnboundedSource.PubsubCheckpoint) null);
        PubsubTestClient pubsubClient = createReader.getPubsubClient();
        Assert.assertTrue(createReader.start());
        Assert.assertEquals(DATA, createReader.getCurrent());
        this.now.addAndGet(55000L);
        pubsubClient.advance();
        TestCase.assertFalse(createReader.advance());
        for (int i = 0; i < 3; i++) {
            this.now.addAndGet(25000L);
            pubsubClient.advance();
            TestCase.assertFalse(createReader.advance());
        }
        this.now.addAndGet(25000L);
        pubsubClient.advance();
        Assert.assertTrue(createReader.advance());
        Assert.assertEquals(DATA, createReader.getCurrent());
        createReader.getCheckpointMark().finalizeCheckpoint();
        createReader.close();
    }

    @Test
    public void multipleReaders() throws IOException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            arrayList.add(new PubsubClient.IncomingMessage(String.format("data_%d", Integer.valueOf(i)).getBytes(), TIMESTAMP, 0L, String.format("ackid_%d", Integer.valueOf(i)), RECORD_ID));
        }
        setupOneMessage(arrayList);
        TestPipeline create = TestPipeline.create();
        PubsubUnboundedSource.PubsubReader createReader = this.primSource.createReader(create.getOptions(), (PubsubUnboundedSource.PubsubCheckpoint) null);
        Assert.assertTrue(createReader.start());
        Assert.assertEquals("data_0", createReader.getCurrent());
        PubsubUnboundedSource.PubsubCheckpoint checkpointMark = createReader.getCheckpointMark();
        checkpointMark.finalizeCheckpoint();
        Assert.assertEquals(1L, checkpointMark.notYetReadIds.size());
        Assert.assertEquals("ackid_1", checkpointMark.notYetReadIds.get(0));
        Assert.assertTrue(createReader.advance());
        Assert.assertEquals("data_1", createReader.getCurrent());
        PubsubUnboundedSource.PubsubCheckpoint pubsubCheckpoint = (PubsubUnboundedSource.PubsubCheckpoint) CoderUtils.decodeFromByteArray(this.primSource.getCheckpointMarkCoder(), CoderUtils.encodeToByteArray(this.primSource.getCheckpointMarkCoder(), checkpointMark));
        Assert.assertEquals(1L, pubsubCheckpoint.notYetReadIds.size());
        Assert.assertEquals("ackid_1", pubsubCheckpoint.notYetReadIds.get(0));
        PubsubUnboundedSource.PubsubReader createReader2 = this.primSource.createReader(create.getOptions(), pubsubCheckpoint);
        Assert.assertTrue(createReader2.start());
        Assert.assertEquals("data_1", createReader2.getCurrent());
        TestCase.assertFalse(createReader2.advance());
        createReader2.getCheckpointMark().finalizeCheckpoint();
        createReader2.close();
    }

    private long messageNumToTimestamp(int i) {
        return TIMESTAMP + (i * 100);
    }

    @Test
    public void readManyMessages() throws IOException {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10000; i++) {
            int i2 = (((i / 97) * 97) + 96) - (i % 97);
            String format = String.format("data_%d", Integer.valueOf(i2));
            hashMap.put(format, Integer.valueOf(i2));
            arrayList.add(new PubsubClient.IncomingMessage(format.getBytes(), messageNumToTimestamp(i2), 0L, String.format("ackid_%d", Integer.valueOf(i2)), String.format("recordid_%d", Integer.valueOf(i2))));
        }
        setupOneMessage(arrayList);
        PubsubUnboundedSource.PubsubReader createReader = this.primSource.createReader(TestPipeline.create().getOptions(), (PubsubUnboundedSource.PubsubCheckpoint) null);
        PubsubTestClient pubsubClient = createReader.getPubsubClient();
        for (int i3 = 0; i3 < 10000; i3++) {
            if (i3 == 0) {
                Assert.assertTrue(createReader.start());
            } else {
                Assert.assertTrue(createReader.advance());
            }
            this.now.addAndGet(30L);
            pubsubClient.advance();
            Integer num = (Integer) hashMap.remove((String) createReader.getCurrent());
            Assert.assertNotNull(num);
            Assert.assertEquals(new Instant(messageNumToTimestamp(num.intValue())), createReader.getCurrentTimestamp());
            Assert.assertArrayEquals(String.format("recordid_%d", num).getBytes(), createReader.getCurrentRecordId());
            if (i3 % 1000 == 999) {
                long millis = createReader.getWatermark().getMillis();
                long j = Long.MAX_VALUE;
                Iterator it = hashMap.values().iterator();
                while (it.hasNext()) {
                    j = Math.min(j, messageNumToTimestamp(((Integer) it.next()).intValue()));
                }
                Assert.assertThat(Long.valueOf(millis), Matchers.lessThanOrEqualTo(Long.valueOf(j)));
                PubsubUnboundedSource.PubsubCheckpoint checkpointMark = createReader.getCheckpointMark();
                if (i3 % 2000 == 1999) {
                    checkpointMark.finalizeCheckpoint();
                }
            }
        }
        TestCase.assertFalse(createReader.advance());
        Assert.assertTrue(hashMap.isEmpty());
        createReader.close();
    }
}
