/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.sparkreceiver;

import java.io.Serializable;
import java.util.ArrayList;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.sparkreceiver.ArrayBufferDataReceiver;
import org.apache.beam.sdk.io.sparkreceiver.ByteBufferDataReceiver;
import org.apache.beam.sdk.io.sparkreceiver.CustomReceiverWithOffset;
import org.apache.beam.sdk.io.sparkreceiver.IteratorDataReceiver;
import org.apache.beam.sdk.io.sparkreceiver.ReceiverBuilder;
import org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class SparkReceiverIOTest {
    public static final TestPipelineOptions OPTIONS = (TestPipelineOptions)TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
    public static final long PULL_FREQUENCY_SEC = 1L;
    public static final long START_POLL_TIMEOUT_SEC = 2L;
    public static final long START_OFFSET = 0L;
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.fromOptions((PipelineOptions)OPTIONS);

    @Test
    public void testReadBuildsCorrectly() {
        ReceiverBuilder receiverBuilder = new ReceiverBuilder(CustomReceiverWithOffset.class).withConstructorArgs(new Object[0]);
        SerializableFunction & Serializable offsetFn = Long::valueOf;
        SerializableFunction & Serializable timestampFn = Instant::parse;
        SparkReceiverIO.Read read = SparkReceiverIO.read().withGetOffsetFn((SerializableFunction)offsetFn).withTimestampFn((SerializableFunction)timestampFn).withPullFrequencySec(Long.valueOf(1L)).withStartPollTimeoutSec(Long.valueOf(2L)).withStartOffset(Long.valueOf(0L)).withSparkReceiverBuilder(receiverBuilder);
        Assert.assertEquals((Object)offsetFn, (Object)read.getGetOffsetFn());
        Assert.assertEquals((Object)receiverBuilder, (Object)read.getSparkReceiverBuilder());
    }

    @Test
    public void testReadObjectCreationFailsIfReceiverBuilderIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> SparkReceiverIO.read().withSparkReceiverBuilder(null));
    }

    @Test
    public void testReadObjectCreationFailsIfGetOffsetFnIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> SparkReceiverIO.read().withGetOffsetFn(null));
    }

    @Test
    public void testReadObjectCreationFailsIfTimestampFnIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> SparkReceiverIO.read().withTimestampFn(null));
    }

    @Test
    public void testReadObjectCreationFailsIfPullFrequencySecIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> SparkReceiverIO.read().withPullFrequencySec(null));
    }

    @Test
    public void testReadObjectCreationFailsIfStartPollTimeoutSecIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> SparkReceiverIO.read().withStartPollTimeoutSec(null));
    }

    @Test
    public void testReadObjectCreationFailsIfStartOffsetIsNull() {
        Assert.assertThrows(IllegalArgumentException.class, () -> SparkReceiverIO.read().withStartOffset(null));
    }

    @Test
    public void testReadValidationFailsMissingReceiverBuilder() {
        SparkReceiverIO.Read read = SparkReceiverIO.read();
        Assert.assertThrows(IllegalStateException.class, () -> ((SparkReceiverIO.Read)read).validateTransform());
    }

    @Test
    public void testReadValidationFailsMissingSparkConsumer() {
        ReceiverBuilder receiverBuilder = new ReceiverBuilder(CustomReceiverWithOffset.class).withConstructorArgs(new Object[0]);
        SparkReceiverIO.Read read = SparkReceiverIO.read().withSparkReceiverBuilder(receiverBuilder);
        Assert.assertThrows(IllegalStateException.class, () -> ((SparkReceiverIO.Read)read).validateTransform());
    }

    @Test
    public void testReadFromCustomReceiverWithOffset() {
        CustomReceiverWithOffset.shouldFailInTheMiddle = false;
        ReceiverBuilder receiverBuilder = new ReceiverBuilder(CustomReceiverWithOffset.class).withConstructorArgs(new Object[0]);
        SparkReceiverIO.Read reader = SparkReceiverIO.read().withGetOffsetFn(Long::valueOf).withTimestampFn(Instant::parse).withPullFrequencySec(Long.valueOf(1L)).withStartPollTimeoutSec(Long.valueOf(2L)).withStartOffset(Long.valueOf(0L)).withSparkReceiverBuilder(receiverBuilder);
        ArrayList<String> expected = new ArrayList<String>();
        for (int i = 0; i < 20; ++i) {
            expected.add(String.valueOf(i));
        }
        PCollection actual = ((PCollection)this.pipeline.apply((PTransform)reader)).setCoder((Coder)StringUtf8Coder.of());
        PAssert.that((PCollection)actual).containsInAnyOrder(expected);
        this.pipeline.run().waitUntilFinish(Duration.standardSeconds((long)15L));
    }

    @Test
    public void testReadFromCustomReceiverWithOffsetFailsAndReread() {
        CustomReceiverWithOffset.shouldFailInTheMiddle = true;
        ReceiverBuilder receiverBuilder = new ReceiverBuilder(CustomReceiverWithOffset.class).withConstructorArgs(new Object[0]);
        SparkReceiverIO.Read reader = SparkReceiverIO.read().withGetOffsetFn(Long::valueOf).withTimestampFn(Instant::parse).withPullFrequencySec(Long.valueOf(1L)).withStartPollTimeoutSec(Long.valueOf(2L)).withStartOffset(Long.valueOf(0L)).withSparkReceiverBuilder(receiverBuilder);
        ArrayList<String> expected = new ArrayList<String>();
        for (int i = 0; i < 20; ++i) {
            expected.add(String.valueOf(i));
        }
        PCollection actual = ((PCollection)this.pipeline.apply((PTransform)reader)).setCoder((Coder)StringUtf8Coder.of());
        PAssert.that((PCollection)actual).containsInAnyOrder(expected);
        this.pipeline.run().waitUntilFinish(Duration.standardSeconds((long)15L));
    }

    @Test
    public void testReadFromReceiverArrayBufferData() {
        ReceiverBuilder receiverBuilder = new ReceiverBuilder(ArrayBufferDataReceiver.class).withConstructorArgs(new Object[0]);
        SparkReceiverIO.Read reader = SparkReceiverIO.read().withGetOffsetFn(Long::valueOf).withTimestampFn(Instant::parse).withPullFrequencySec(Long.valueOf(1L)).withStartPollTimeoutSec(Long.valueOf(2L)).withStartOffset(Long.valueOf(0L)).withSparkReceiverBuilder(receiverBuilder);
        ArrayList<String> expected = new ArrayList<String>();
        for (int i = 0; i < 20; ++i) {
            expected.add(String.valueOf(i));
        }
        PCollection actual = ((PCollection)this.pipeline.apply((PTransform)reader)).setCoder((Coder)StringUtf8Coder.of());
        PAssert.that((PCollection)actual).containsInAnyOrder(expected);
        this.pipeline.run().waitUntilFinish(Duration.standardSeconds((long)15L));
    }

    @Test
    public void testReadFromReceiverByteBufferData() {
        ReceiverBuilder receiverBuilder = new ReceiverBuilder(ByteBufferDataReceiver.class).withConstructorArgs(new Object[0]);
        SparkReceiverIO.Read reader = SparkReceiverIO.read().withGetOffsetFn(Long::valueOf).withTimestampFn(Instant::parse).withPullFrequencySec(Long.valueOf(1L)).withStartPollTimeoutSec(Long.valueOf(2L)).withStartOffset(Long.valueOf(0L)).withSparkReceiverBuilder(receiverBuilder);
        ArrayList<String> expected = new ArrayList<String>();
        for (int i = 0; i < 20; ++i) {
            expected.add(String.valueOf(i));
        }
        PCollection actual = ((PCollection)this.pipeline.apply((PTransform)reader)).setCoder((Coder)StringUtf8Coder.of());
        PAssert.that((PCollection)actual).containsInAnyOrder(expected);
        this.pipeline.run().waitUntilFinish(Duration.standardSeconds((long)15L));
    }

    @Test
    public void testReadFromReceiverIteratorData() {
        ReceiverBuilder receiverBuilder = new ReceiverBuilder(IteratorDataReceiver.class).withConstructorArgs(new Object[0]);
        SparkReceiverIO.Read reader = SparkReceiverIO.read().withGetOffsetFn(Long::valueOf).withTimestampFn(Instant::parse).withPullFrequencySec(Long.valueOf(1L)).withStartPollTimeoutSec(Long.valueOf(2L)).withStartOffset(Long.valueOf(0L)).withSparkReceiverBuilder(receiverBuilder);
        ArrayList<String> expected = new ArrayList<String>();
        for (int i = 0; i < 20; ++i) {
            expected.add(String.valueOf(i));
        }
        PCollection actual = ((PCollection)this.pipeline.apply((PTransform)reader)).setCoder((Coder)StringUtf8Coder.of());
        PAssert.that((PCollection)actual).containsInAnyOrder(expected);
        this.pipeline.run().waitUntilFinish(Duration.standardSeconds((long)15L));
    }

    static {
        OPTIONS.setBlockOnRun(false);
    }
}

