/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.sparkreceiver;

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.io.sparkreceiver.CustomReceiverWithOffset;
import org.apache.beam.sdk.io.sparkreceiver.ReadFromSparkReceiverWithOffsetDoFn;
import org.apache.beam.sdk.io.sparkreceiver.ReceiverBuilder;
import org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;

public class ReadFromSparkReceiverWithOffsetDoFnTest {
    private static final byte[] TEST_ELEMENT = new byte[0];
    private final ReadFromSparkReceiverWithOffsetDoFn<String> dofnInstance = new ReadFromSparkReceiverWithOffsetDoFn(this.makeReadTransform());
    private final ManualWatermarkEstimator<Instant> mockWatermarkEstimator = new ManualWatermarkEstimator<Instant>(){

        public void setWatermark(Instant watermark) {
        }

        public Instant currentWatermark() {
            return null;
        }

        public Instant getState() {
            return null;
        }
    };

    private SparkReceiverIO.Read<String> makeReadTransform() {
        ReceiverBuilder receiverBuilder = new ReceiverBuilder(CustomReceiverWithOffset.class).withConstructorArgs(new Object[0]);
        return SparkReceiverIO.read().withSparkReceiverBuilder(receiverBuilder).withGetOffsetFn(Long::valueOf).withTimestampFn(Instant::parse);
    }

    private List<String> createExpectedRecords(int numRecords) {
        ArrayList<String> records = new ArrayList<String>();
        for (int i = 0; i < numRecords; ++i) {
            records.add(String.valueOf(i));
        }
        return records;
    }

    @Test
    public void testInitialRestriction() {
        long expectedStartOffset = 0L;
        OffsetRange result = this.dofnInstance.initialRestriction(TEST_ELEMENT);
        Assert.assertEquals((Object)new OffsetRange(expectedStartOffset, Long.MAX_VALUE), (Object)result);
    }

    @Test
    public void testRestrictionTrackerSplit() {
        OffsetRangeTracker offsetRangeTracker = this.dofnInstance.restrictionTracker(TEST_ELEMENT, this.dofnInstance.initialRestriction(TEST_ELEMENT));
        Assert.assertEquals((long)0L, (long)offsetRangeTracker.currentRestriction().getFrom());
        Assert.assertEquals((long)Long.MAX_VALUE, (long)offsetRangeTracker.currentRestriction().getTo());
        Assert.assertNull((Object)offsetRangeTracker.trySplit(0.0));
        offsetRangeTracker = this.dofnInstance.restrictionTracker(TEST_ELEMENT, this.dofnInstance.initialRestriction(TEST_ELEMENT));
        Assert.assertTrue((boolean)offsetRangeTracker.tryClaim(Long.valueOf(0L)));
        Assert.assertNull((Object)offsetRangeTracker.trySplit(0.0));
        offsetRangeTracker.checkDone();
        Assert.assertEquals((Object)SplitResult.of((Object)new OffsetRange(0L, 1L), (Object)new OffsetRange(1L, Long.MAX_VALUE)), (Object)offsetRangeTracker.trySplit(0.0));
    }

    @Test
    public void testProcessElement() {
        MockOutputReceiver receiver = new MockOutputReceiver();
        DoFn.ProcessContinuation result = this.dofnInstance.processElement(TEST_ELEMENT, (RestrictionTracker)this.dofnInstance.restrictionTracker(TEST_ELEMENT, this.dofnInstance.initialRestriction(TEST_ELEMENT)), this.mockWatermarkEstimator, (DoFn.OutputReceiver)receiver);
        Assert.assertEquals((Object)DoFn.ProcessContinuation.resume(), (Object)result);
        Assert.assertEquals(this.createExpectedRecords(20), receiver.getOutputs());
    }

    private static class MockOutputReceiver
    implements DoFn.OutputReceiver<String> {
        private final List<String> records = new ArrayList<String>();

        private MockOutputReceiver() {
        }

        public void output(String output) {
        }

        public void outputWithTimestamp(String output, @UnknownKeyFor @NonNull @Initialized Instant timestamp) {
            this.records.add(output);
        }

        public List<String> getOutputs() {
            return this.records;
        }
    }
}

