package org.apache.flink.streaming.connectors.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import java.util.Collections;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.akka.utils.ReceiverActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:org/apache/flink/streaming/connectors/akka/AkkaSource.class */
public class AkkaSource extends RichSourceFunction<Object> {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaSource.class);
    private static final long serialVersionUID = 1;
    private final Class<?> classForActor = ReceiverActor.class;
    private final String actorName;
    private final String urlOfPublisher;
    private final Config configuration;
    private transient ActorSystem receiverActorSystem;
    private transient ActorRef receiverActor;
    protected transient boolean autoAck;

    public AkkaSource(String str, String str2, Config config) {
        this.actorName = str;
        this.urlOfPublisher = str2;
        this.configuration = config;
    }

    public void open(Configuration configuration) throws Exception {
        this.receiverActorSystem = createDefaultActorSystem();
        if (this.configuration.hasPath("akka.remote.auto-ack") && this.configuration.getString("akka.remote.auto-ack").equals("on")) {
            this.autoAck = true;
        } else {
            this.autoAck = false;
        }
    }

    public void run(SourceFunction.SourceContext<Object> sourceContext) throws Exception {
        LOG.info("Starting the Receiver actor {}", this.actorName);
        this.receiverActor = this.receiverActorSystem.actorOf(Props.create(this.classForActor, new Object[]{sourceContext, this.urlOfPublisher, Boolean.valueOf(this.autoAck)}), this.actorName);
        LOG.info("Started the Receiver actor {} successfully", this.actorName);
        Await.result(this.receiverActorSystem.whenTerminated(), Duration.Inf());
    }

    public void close() {
        LOG.info("Closing source");
        if (this.receiverActorSystem != null) {
            this.receiverActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
            this.receiverActorSystem.terminate();
        }
    }

    public void cancel() {
        LOG.info("Cancelling akka source");
        close();
    }

    private ActorSystem createDefaultActorSystem() {
        return ActorSystem.create("receiver-actor-system", getOrCreateMandatoryProperties(this.configuration));
    }

    private Config getOrCreateMandatoryProperties(Config config) {
        if (!config.hasPath("akka.actor.provider")) {
            config = config.withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("akka.remote.RemoteActorRefProvider"));
        }
        if (!config.hasPath("akka.remote.enabled-transports")) {
            config = config.withValue("akka.remote.enabled-transports", ConfigValueFactory.fromAnyRef(Collections.singletonList("akka.remote.netty.tcp")));
        }
        return config;
    }
}
