/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.pulsar;

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.io.pulsar.FakePulsarClient;
import org.apache.beam.sdk.io.pulsar.FakePulsarReader;
import org.apache.beam.sdk.io.pulsar.PulsarIO;
import org.apache.beam.sdk.io.pulsar.PulsarMessage;
import org.apache.beam.sdk.io.pulsar.PulsarSourceDescriptor;
import org.apache.beam.sdk.io.pulsar.ReadFromPulsarDoFn;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class ReadFromPulsarDoFnTest {
    public static final String SERVICE_URL = "pulsar://localhost:6650";
    public static final String ADMIN_URL = "http://localhost:8080";
    public static final String TOPIC = "PULSARIO_READFROMPULSAR_TEST";
    public static final int NUMBEROFMESSAGES = 100;
    private final ReadFromPulsarDoFn dofnInstance = new ReadFromPulsarDoFn(this.readSourceDescriptor());
    public FakePulsarReader fakePulsarReader = new FakePulsarReader("PULSARIO_READFROMPULSAR_TEST", 100);
    private FakePulsarClient fakePulsarClient = new FakePulsarClient(this.fakePulsarReader);

    private PulsarIO.Read readSourceDescriptor() {
        return PulsarIO.read().withClientUrl(SERVICE_URL).withTopic(TOPIC).withAdminUrl(ADMIN_URL).withPublishTime().withPulsarClient((SerializableFunction)new SerializableFunction<String, PulsarClient>(){

            public PulsarClient apply(String input) {
                return ReadFromPulsarDoFnTest.this.fakePulsarClient;
            }
        });
    }

    @Before
    public void setup() throws Exception {
        this.dofnInstance.initPulsarClients();
        this.fakePulsarReader.reset();
    }

    @Test
    public void testInitialRestrictionWhenHasStartOffset() throws Exception {
        long expectedStartOffset = 0L;
        OffsetRange result = this.dofnInstance.getInitialRestriction(PulsarSourceDescriptor.of((String)TOPIC, (Long)expectedStartOffset, null, null, (String)SERVICE_URL, (String)ADMIN_URL));
        Assert.assertEquals((Object)new OffsetRange(expectedStartOffset, Long.MAX_VALUE), (Object)result);
    }

    @Test
    public void testInitialRestrictionWithConsumerPosition() throws Exception {
        long expectedStartOffset = Instant.now().getMillis();
        OffsetRange result = this.dofnInstance.getInitialRestriction(PulsarSourceDescriptor.of((String)TOPIC, (Long)expectedStartOffset, null, null, (String)SERVICE_URL, (String)ADMIN_URL));
        Assert.assertEquals((Object)new OffsetRange(expectedStartOffset, Long.MAX_VALUE), (Object)result);
    }

    @Test
    public void testInitialRestrictionWithConsumerEndPosition() throws Exception {
        long startOffset = this.fakePulsarReader.getStartTimestamp();
        long endOffset = this.fakePulsarReader.getEndTimestamp();
        OffsetRange result = this.dofnInstance.getInitialRestriction(PulsarSourceDescriptor.of((String)TOPIC, (Long)startOffset, (Long)endOffset, null, (String)SERVICE_URL, (String)ADMIN_URL));
        Assert.assertEquals((Object)new OffsetRange(startOffset, endOffset), (Object)result);
    }

    @Test
    public void testProcessElement() throws Exception {
        MockOutputReceiver receiver = new MockOutputReceiver();
        long startOffset = this.fakePulsarReader.getStartTimestamp();
        long endOffset = this.fakePulsarReader.getEndTimestamp();
        OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(startOffset, endOffset));
        PulsarSourceDescriptor descriptor = PulsarSourceDescriptor.of((String)TOPIC, (Long)startOffset, (Long)endOffset, null, (String)SERVICE_URL, (String)ADMIN_URL);
        DoFn.ProcessContinuation result = this.dofnInstance.processElement(descriptor, (RestrictionTracker)tracker, null, (DoFn.OutputReceiver)receiver);
        int expectedResultWithoutCountingLastOffset = 99;
        Assert.assertEquals((Object)DoFn.ProcessContinuation.stop(), (Object)result);
        Assert.assertEquals((long)expectedResultWithoutCountingLastOffset, (long)receiver.getOutputs().size());
    }

    @Test
    public void testProcessElementWhenEndMessageIdIsDefined() throws Exception {
        MockOutputReceiver receiver = new MockOutputReceiver();
        OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
        MessageId endMessageId = DefaultImplementation.newMessageId((long)50L, (long)50L, (int)50);
        DoFn.ProcessContinuation result = this.dofnInstance.processElement(PulsarSourceDescriptor.of((String)TOPIC, null, null, (MessageId)endMessageId, (String)SERVICE_URL, (String)ADMIN_URL), (RestrictionTracker)tracker, null, (DoFn.OutputReceiver)receiver);
        Assert.assertEquals((Object)DoFn.ProcessContinuation.stop(), (Object)result);
        Assert.assertEquals((long)50L, (long)receiver.getOutputs().size());
    }

    @Test
    public void testProcessElementWithEmptyRecords() throws Exception {
        MockOutputReceiver receiver = new MockOutputReceiver();
        this.fakePulsarReader.emptyMockRecords();
        OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
        DoFn.ProcessContinuation result = this.dofnInstance.processElement(PulsarSourceDescriptor.of((String)TOPIC, null, null, null, (String)SERVICE_URL, (String)ADMIN_URL), (RestrictionTracker)tracker, null, (DoFn.OutputReceiver)receiver);
        Assert.assertEquals((Object)DoFn.ProcessContinuation.resume(), (Object)result);
        Assert.assertTrue((boolean)receiver.getOutputs().isEmpty());
    }

    @Test
    public void testProcessElementWhenHasReachedEndTopic() throws Exception {
        MockOutputReceiver receiver = new MockOutputReceiver();
        this.fakePulsarReader.setReachedEndOfTopic(true);
        OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
        DoFn.ProcessContinuation result = this.dofnInstance.processElement(PulsarSourceDescriptor.of((String)TOPIC, null, null, null, (String)SERVICE_URL, (String)ADMIN_URL), (RestrictionTracker)tracker, null, (DoFn.OutputReceiver)receiver);
        Assert.assertEquals((Object)DoFn.ProcessContinuation.stop(), (Object)result);
    }

    private static class MockOutputReceiver
    implements DoFn.OutputReceiver<PulsarMessage> {
        private final List<PulsarMessage> records = new ArrayList<PulsarMessage>();

        private MockOutputReceiver() {
        }

        public void output(PulsarMessage output) {
        }

        public void outputWithTimestamp(PulsarMessage output, @UnknownKeyFor @NonNull @Initialized Instant timestamp) {
            this.records.add(output);
        }

        public List<PulsarMessage> getOutputs() {
            return this.records;
        }
    }
}

