/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.sparkreceiver;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.io.sparkreceiver.ReceiverBuilder;
import org.apache.beam.sdk.io.sparkreceiver.WrappedSupervisor;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class ReceiverBuilderTest {
    private static final Logger LOG = LoggerFactory.getLogger(ReceiverBuilderTest.class);
    public static final String TEST_MESSAGE = "testMessage";

    @Test
    public void testCreatingCustomSparkReceiver() {
        try {
            AtomicBoolean customStoreConsumerWasUsed = new AtomicBoolean(false);
            ReceiverBuilder receiverBuilder = new ReceiverBuilder(CustomReceiver.class);
            Receiver receiver = receiverBuilder.withConstructorArgs(new Object[]{StorageLevel.DISK_ONLY()}).build();
            new WrappedSupervisor(receiver, new SparkConf(), (SerializableFunction & Serializable)args -> {
                customStoreConsumerWasUsed.set(true);
                return null;
            });
            receiver.onStart();
            Assert.assertTrue((boolean)(receiver.supervisor() instanceof WrappedSupervisor));
            receiver.store((Object)TEST_MESSAGE);
            Assert.assertTrue((boolean)customStoreConsumerWasUsed.get());
        }
        catch (Exception e) {
            LOG.error("Can not get receiver", (Throwable)e);
        }
    }

    private static class CustomReceiver
    extends Receiver<String> {
        public CustomReceiver(StorageLevel storageLevel) {
            super(storageLevel);
        }

        public void onStart() {
            LOG.info("Receiver onStart()");
        }

        public void onStop() {
            LOG.info("Receiver onStop()");
        }
    }
}

