package org.apache.flink.statefun.flink.core.translation;

import com.google.protobuf.Any;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.MessageFactory;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.Router;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/translation/IngressRouterOperator.class */
public final class IngressRouterOperator<T> extends AbstractStreamOperator<Message> implements OneInputStreamOperator<T, Message> {
    private static final Logger LOG = LoggerFactory.getLogger(IngressRouterOperator.class);
    private static final long serialVersionUID = 1;
    private final StatefulFunctionsConfig configuration;
    private final IngressIdentifier<T> id;
    private transient List<Router<T>> routers;
    private transient DownstreamCollector<T> downstream;

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/translation/IngressRouterOperator$DownstreamCollector.class */
    static final class DownstreamCollector<T> implements Router.Downstream<T> {
        private final MessageFactory factory;
        private final boolean multiLanguagePayloads;
        private final StreamRecord<Message> reuse = new StreamRecord<>((Object) null);
        private final Output<StreamRecord<Message>> output;

        DownstreamCollector(MessageFactoryType messageFactoryType, Output<StreamRecord<Message>> output) {
            this.factory = MessageFactory.forType(messageFactoryType);
            this.output = (Output) Objects.requireNonNull(output);
            this.multiLanguagePayloads = messageFactoryType == MessageFactoryType.WITH_PROTOBUF_PAYLOADS_MULTILANG;
        }

        @Override // org.apache.flink.statefun.sdk.io.Router.Downstream
        public void forward(Address address, Object obj) {
            if (address == null) {
                throw new NullPointerException("Unable to send a message downstream without an address.");
            }
            if (obj == null) {
                throw new NullPointerException("message is mandatory parameter and can not be NULL.");
            }
            if (this.multiLanguagePayloads) {
                obj = wrapAsProtobufAny(obj);
            }
            this.output.collect(this.reuse.replace(this.factory.from(null, address, obj)));
        }

        private Object wrapAsProtobufAny(Object obj) {
            return obj instanceof Any ? obj : Any.pack((com.google.protobuf.Message) obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IngressRouterOperator(StatefulFunctionsConfig statefulFunctionsConfig, IngressIdentifier<T> ingressIdentifier) {
        this.configuration = statefulFunctionsConfig;
        this.id = (IngressIdentifier) Objects.requireNonNull(ingressIdentifier);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void open() throws Exception {
        super.open();
        StatefulFunctionsUniverse statefulFunctionsUniverse = StatefulFunctionsUniverses.get(Thread.currentThread().getContextClassLoader(), this.configuration);
        LOG.info("Using message factory type " + statefulFunctionsUniverse.messageFactoryType());
        this.downstream = new DownstreamCollector<>(statefulFunctionsUniverse.messageFactoryType(), this.output);
        this.routers = loadRoutersAttachedToIngress(this.id, statefulFunctionsUniverse.routers());
    }

    public void processElement(StreamRecord<T> streamRecord) {
        Object value = streamRecord.getValue();
        Iterator<Router<T>> it2 = this.routers.iterator();
        while (it2.hasNext()) {
            it2.next().route(value, this.downstream);
        }
    }

    private static <T> List<Router<T>> loadRoutersAttachedToIngress(IngressIdentifier<T> ingressIdentifier, Map<IngressIdentifier<?>, List<Router<?>>> map) {
        List<Router<T>> list = (List) map.get(ingressIdentifier);
        Preconditions.checkState(list != null, "unable to find a router for ingress " + ingressIdentifier);
        return list;
    }
}
