package org.apache.flink.streaming.connectors.akka;

import akka.actor.ActorSystem;
import akka.actor.Props;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.akka.utils.FeederActor;
import org.apache.flink.streaming.connectors.akka.utils.Message;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:org/apache/flink/streaming/connectors/akka/AkkaSourceTest.class */
public class AkkaSourceTest {
    private AkkaSource source;
    private static final String feederActorName = "JavaFeederActor";
    private static final String receiverActorName = "receiverActor";
    private static final String urlOfFeeder = "akka.tcp://feederActorSystem@127.0.0.1:5150/user/JavaFeederActor";
    private ActorSystem feederActorSystem;
    private Configuration config = new Configuration();
    private Config sourceConfiguration = ConfigFactory.empty();
    private Thread sourceThread;
    private SourceFunction.SourceContext<Object> sourceContext;
    private volatile Exception exception;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/akka/AkkaSourceTest$AkkaTestSource.class */
    private class AkkaTestSource extends AkkaSource {
        private AkkaTestSource(Config config) {
            super(AkkaSourceTest.receiverActorName, AkkaSourceTest.urlOfFeeder, config);
        }

        public RuntimeContext getRuntimeContext() {
            return (RuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/akka/AkkaSourceTest$DummySourceContext.class */
    private static class DummySourceContext implements SourceFunction.SourceContext<Object> {
        private static final Object lock = new Object();
        private static long numElementsCollected;
        private static List<Object> message;

        private DummySourceContext() {
            numElementsCollected = 0L;
            message = new ArrayList();
        }

        public void collect(Object obj) {
            message.add(obj);
            numElementsCollected++;
        }

        public void collectWithTimestamp(Object obj, long j) {
            message.add(obj);
            numElementsCollected++;
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void markAsTemporarilyIdle() {
            throw new UnsupportedOperationException();
        }

        public Object getCheckpointLock() {
            return lock;
        }

        public void close() {
        }
    }

    @BeforeEach
    public void beforeTest() throws Exception {
        this.feederActorSystem = ActorSystem.create("feederActorSystem", getFeederActorConfig());
        this.sourceContext = new DummySourceContext();
        this.sourceThread = new Thread(new Runnable() { // from class: org.apache.flink.streaming.connectors.akka.AkkaSourceTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AkkaSourceTest.this.source.run(new DummySourceContext());
                } catch (Exception e) {
                    AkkaSourceTest.this.exception = e;
                }
            }
        });
    }

    @AfterEach
    public void afterTest() throws Exception {
        this.feederActorSystem.terminate();
        Await.result(this.feederActorSystem.whenTerminated(), Duration.Inf());
        this.source.cancel();
        this.sourceThread.join();
    }

    @Test
    public void testWithSingleData() throws Exception {
        this.source = new AkkaTestSource(this.sourceConfiguration);
        this.feederActorSystem.actorOf(Props.create(FeederActor.class, new Object[]{FeederActor.MessageTypes.SINGLE_DATA}), feederActorName);
        this.source.autoAck = false;
        this.source.open(this.config);
        this.sourceThread.start();
        while (DummySourceContext.numElementsCollected != 1) {
            Thread.sleep(5L);
        }
        Assertions.assertEquals(DummySourceContext.message.get(0).toString(), Message.WELCOME_MESSAGE);
    }

    @Test
    public void testWithIterableData() throws Exception {
        this.source = new AkkaTestSource(this.sourceConfiguration);
        this.feederActorSystem.actorOf(Props.create(FeederActor.class, new Object[]{FeederActor.MessageTypes.ITERABLE_DATA}), feederActorName);
        this.source.autoAck = false;
        this.source.open(this.config);
        this.sourceThread.start();
        while (DummySourceContext.numElementsCollected != 2) {
            Thread.sleep(5L);
        }
        List list = DummySourceContext.message;
        Assertions.assertEquals(list.get(0).toString(), Message.WELCOME_MESSAGE);
        Assertions.assertEquals(list.get(1).toString(), Message.FEEDER_MESSAGE);
    }

    @Test
    public void testWithByteArrayData() throws Exception {
        this.source = new AkkaTestSource(this.sourceConfiguration);
        this.feederActorSystem.actorOf(Props.create(FeederActor.class, new Object[]{FeederActor.MessageTypes.BYTES_DATA}), feederActorName);
        this.source.autoAck = false;
        this.source.open(this.config);
        this.sourceThread.start();
        while (DummySourceContext.numElementsCollected != 1) {
            Thread.sleep(5L);
        }
        List list = DummySourceContext.message;
        if (list.get(0) instanceof byte[]) {
            Assertions.assertEquals(new String((byte[]) list.get(0)), Message.WELCOME_MESSAGE);
        }
    }

    @Test
    public void testWithSingleDataWithTimestamp() throws Exception {
        this.source = new AkkaTestSource(this.sourceConfiguration);
        this.feederActorSystem.actorOf(Props.create(FeederActor.class, new Object[]{FeederActor.MessageTypes.SINGLE_DATA_WITH_TIMESTAMP}), feederActorName);
        this.source.autoAck = false;
        this.source.open(this.config);
        this.sourceThread.start();
        while (DummySourceContext.numElementsCollected != 1) {
            Thread.sleep(5L);
        }
        Assertions.assertEquals(DummySourceContext.message.get(0).toString(), Message.WELCOME_MESSAGE);
    }

    @Test
    public void testAcksWithSingleData() throws Exception {
        this.sourceConfiguration = this.sourceConfiguration.withValue("akka.remote.auto-ack", ConfigValueFactory.fromAnyRef("on"));
        this.source = new AkkaTestSource(this.sourceConfiguration);
        this.feederActorSystem.actorOf(Props.create(FeederActor.class, new Object[]{FeederActor.MessageTypes.SINGLE_DATA}), feederActorName);
        this.source.open(this.config);
        this.sourceThread.start();
        while (DummySourceContext.numElementsCollected != 1) {
            Thread.sleep(5L);
        }
        for (int i = 1; Message.ACK_MESSAGE == null && i <= 5; i++) {
            Thread.sleep(5L);
        }
        Assertions.assertEquals("ack", Message.ACK_MESSAGE);
    }

    private Config getFeederActorConfig() {
        return ConfigFactory.parseFile(new File(getClass().getClassLoader().getResource("feeder_actor.conf").getFile()));
    }
}
