package org.apache.flink.streaming.connectors.akka.utils;

import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.UntypedActor;
import java.util.Iterator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/* loaded from: input_file:org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.class */
public class ReceiverActor extends UntypedActor {
    private final SourceFunction.SourceContext<Object> ctx;
    private final String urlOfPublisher;
    private final boolean autoAck;
    private ActorSelection remotePublisher;

    public ReceiverActor(SourceFunction.SourceContext<Object> sourceContext, String str, boolean z) {
        this.ctx = sourceContext;
        this.urlOfPublisher = str;
        this.autoAck = z;
    }

    public void preStart() throws Exception {
        this.remotePublisher = getContext().actorSelection(this.urlOfPublisher);
        this.remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
    }

    public void onReceive(Object obj) throws Exception {
        if (obj instanceof Iterable) {
            collect((Iterable<Object>) obj);
        } else if (obj instanceof Tuple2) {
            Tuple2 tuple2 = (Tuple2) obj;
            collect(tuple2.f0, ((Long) tuple2.f1).longValue());
        } else {
            collect(obj);
        }
        if (this.autoAck) {
            getSender().tell("ack", getSelf());
        }
    }

    private void collect(Iterable<Object> iterable) {
        Iterator<Object> it = iterable.iterator();
        while (it.hasNext()) {
            this.ctx.collect(it.next());
        }
    }

    private void collect(Object obj) {
        this.ctx.collect(obj);
    }

    private void collect(Object obj, long j) {
        this.ctx.collectWithTimestamp(obj, j);
    }

    public void postStop() throws Exception {
        this.remotePublisher.tell(new UnsubscribeReceiver(ActorRef.noSender()), ActorRef.noSender());
    }
}
