package org.apache.beam.sdk.io.pulsar;

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.io.pulsar.PulsarIO;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.class */
public class ReadFromPulsarDoFnTest {
    public static final String SERVICE_URL = "pulsar://localhost:6650";
    public static final String ADMIN_URL = "http://localhost:8080";
    public static final String TOPIC = "PULSARIO_READFROMPULSAR_TEST";
    public static final int NUMBEROFMESSAGES = 100;
    private final ReadFromPulsarDoFn dofnInstance = new ReadFromPulsarDoFn(readSourceDescriptor());
    public FakePulsarReader fakePulsarReader = new FakePulsarReader(TOPIC, 100);
    private FakePulsarClient fakePulsarClient = new FakePulsarClient(this.fakePulsarReader);

    /* loaded from: input_file:org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest$MockOutputReceiver.class */
    private static class MockOutputReceiver implements DoFn.OutputReceiver<PulsarMessage> {
        private final List<PulsarMessage> records;

        private MockOutputReceiver() {
            this.records = new ArrayList();
        }

        public void output(PulsarMessage pulsarMessage) {
        }

        public void outputWithTimestamp(PulsarMessage pulsarMessage, Instant instant) {
            this.records.add(pulsarMessage);
        }

        public List<PulsarMessage> getOutputs() {
            return this.records;
        }
    }

    private PulsarIO.Read readSourceDescriptor() {
        return PulsarIO.read().withClientUrl(SERVICE_URL).withTopic(TOPIC).withAdminUrl(ADMIN_URL).withPublishTime().withPulsarClient(new SerializableFunction<String, PulsarClient>() { // from class: org.apache.beam.sdk.io.pulsar.ReadFromPulsarDoFnTest.1
            public PulsarClient apply(String str) {
                return ReadFromPulsarDoFnTest.this.fakePulsarClient;
            }
        });
    }

    @Before
    public void setup() throws Exception {
        this.dofnInstance.initPulsarClients();
        this.fakePulsarReader.reset();
    }

    @Test
    public void testInitialRestrictionWhenHasStartOffset() throws Exception {
        Assert.assertEquals(new OffsetRange(0L, Long.MAX_VALUE), this.dofnInstance.getInitialRestriction(PulsarSourceDescriptor.of(TOPIC, 0L, (Long) null, (MessageId) null, SERVICE_URL, ADMIN_URL)));
    }

    @Test
    public void testInitialRestrictionWithConsumerPosition() throws Exception {
        long millis = Instant.now().getMillis();
        Assert.assertEquals(new OffsetRange(millis, Long.MAX_VALUE), this.dofnInstance.getInitialRestriction(PulsarSourceDescriptor.of(TOPIC, Long.valueOf(millis), (Long) null, (MessageId) null, SERVICE_URL, ADMIN_URL)));
    }

    @Test
    public void testInitialRestrictionWithConsumerEndPosition() throws Exception {
        long startTimestamp = this.fakePulsarReader.getStartTimestamp();
        long endTimestamp = this.fakePulsarReader.getEndTimestamp();
        Assert.assertEquals(new OffsetRange(startTimestamp, endTimestamp), this.dofnInstance.getInitialRestriction(PulsarSourceDescriptor.of(TOPIC, Long.valueOf(startTimestamp), Long.valueOf(endTimestamp), (MessageId) null, SERVICE_URL, ADMIN_URL)));
    }

    @Test
    public void testProcessElement() throws Exception {
        MockOutputReceiver mockOutputReceiver = new MockOutputReceiver();
        long startTimestamp = this.fakePulsarReader.getStartTimestamp();
        long endTimestamp = this.fakePulsarReader.getEndTimestamp();
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), this.dofnInstance.processElement(PulsarSourceDescriptor.of(TOPIC, Long.valueOf(startTimestamp), Long.valueOf(endTimestamp), (MessageId) null, SERVICE_URL, ADMIN_URL), new OffsetRangeTracker(new OffsetRange(startTimestamp, endTimestamp)), (WatermarkEstimator) null, mockOutputReceiver));
        Assert.assertEquals(99, mockOutputReceiver.getOutputs().size());
    }

    @Test
    public void testProcessElementWhenEndMessageIdIsDefined() throws Exception {
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), this.dofnInstance.processElement(PulsarSourceDescriptor.of(TOPIC, (Long) null, (Long) null, DefaultImplementation.newMessageId(50L, 50L, 50), SERVICE_URL, ADMIN_URL), new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE)), (WatermarkEstimator) null, new MockOutputReceiver()));
        Assert.assertEquals(50L, r0.getOutputs().size());
    }

    @Test
    public void testProcessElementWithEmptyRecords() throws Exception {
        MockOutputReceiver mockOutputReceiver = new MockOutputReceiver();
        this.fakePulsarReader.emptyMockRecords();
        Assert.assertEquals(DoFn.ProcessContinuation.resume(), this.dofnInstance.processElement(PulsarSourceDescriptor.of(TOPIC, (Long) null, (Long) null, (MessageId) null, SERVICE_URL, ADMIN_URL), new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE)), (WatermarkEstimator) null, mockOutputReceiver));
        Assert.assertTrue(mockOutputReceiver.getOutputs().isEmpty());
    }

    @Test
    public void testProcessElementWhenHasReachedEndTopic() throws Exception {
        MockOutputReceiver mockOutputReceiver = new MockOutputReceiver();
        this.fakePulsarReader.setReachedEndOfTopic(true);
        Assert.assertEquals(DoFn.ProcessContinuation.stop(), this.dofnInstance.processElement(PulsarSourceDescriptor.of(TOPIC, (Long) null, (Long) null, (MessageId) null, SERVICE_URL, ADMIN_URL), new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE)), (WatermarkEstimator) null, mockOutputReceiver));
    }
}
