package org.apache.flink.statefun.flink.datastream;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.shaded.guava18.com.google.common.base.Optional;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
import org.apache.flink.statefun.flink.core.message.RoutableMessage;
import org.apache.flink.statefun.flink.core.translation.EmbeddedTranslator;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/* loaded from: input_file:org/apache/flink/statefun/flink/datastream/StatefulFunctionDataStreamBuilder.class */
public final class StatefulFunctionDataStreamBuilder {
    private static final AtomicInteger FEEDBACK_INVOCATION_ID_SEQ = new AtomicInteger();
    private final String pipelineName;
    private final List<DataStream<RoutableMessage>> definedIngresses = new ArrayList();
    private final Map<FunctionType, SerializableStatefulFunctionProvider> functionProviders = new HashMap();
    private final Map<FunctionType, HttpFunctionEndpointSpec> requestReplyFunctions = new HashMap();
    private final Set<EgressIdentifier<?>> egressesIds = new LinkedHashSet();

    @Nullable
    private StatefulFunctionsConfig config;

    public static StatefulFunctionDataStreamBuilder builder(String str) {
        return new StatefulFunctionDataStreamBuilder(str);
    }

    private StatefulFunctionDataStreamBuilder(String str) {
        this.pipelineName = (String) Objects.requireNonNull(str);
    }

    public StatefulFunctionDataStreamBuilder withDataStreamAsIngress(DataStream<RoutableMessage> dataStream) {
        Objects.requireNonNull(dataStream);
        this.definedIngresses.add(dataStream);
        return this;
    }

    public StatefulFunctionDataStreamBuilder withFunctionProvider(FunctionType functionType, SerializableStatefulFunctionProvider serializableStatefulFunctionProvider) {
        Objects.requireNonNull(functionType);
        Objects.requireNonNull(serializableStatefulFunctionProvider);
        putAndThrowIfPresent(this.functionProviders, functionType, serializableStatefulFunctionProvider);
        return this;
    }

    public StatefulFunctionDataStreamBuilder withRequestReplyRemoteFunction(RequestReplyFunctionBuilder requestReplyFunctionBuilder) {
        Objects.requireNonNull(requestReplyFunctionBuilder);
        HttpFunctionEndpointSpec spec = requestReplyFunctionBuilder.spec();
        putAndThrowIfPresent(this.requestReplyFunctions, spec.targetFunctions().asSpecificFunctionType(), spec);
        return this;
    }

    public StatefulFunctionDataStreamBuilder withEgressId(EgressIdentifier<?> egressIdentifier) {
        Objects.requireNonNull(egressIdentifier);
        putAndThrowIfPresent(this.egressesIds, egressIdentifier);
        return this;
    }

    public StatefulFunctionDataStreamBuilder withConfiguration(StatefulFunctionsConfig statefulFunctionsConfig) {
        Objects.requireNonNull(statefulFunctionsConfig);
        this.config = statefulFunctionsConfig;
        return this;
    }

    public StatefulFunctionEgressStreams build(StreamExecutionEnvironment streamExecutionEnvironment) {
        StatefulFunctionsConfig statefulFunctionsConfig = (StatefulFunctionsConfig) Optional.fromNullable(this.config).or(() -> {
            return StatefulFunctionsConfig.fromEnvironment(streamExecutionEnvironment);
        });
        this.requestReplyFunctions.forEach((functionType, httpFunctionEndpointSpec) -> {
            this.functionProviders.put(functionType, new SerializableHttpFunctionProvider(httpFunctionEndpointSpec));
        });
        return new StatefulFunctionEgressStreams(new EmbeddedTranslator(statefulFunctionsConfig, new FeedbackKey(this.pipelineName, FEEDBACK_INVOCATION_ID_SEQ.incrementAndGet())).translate(this.definedIngresses, this.egressesIds, this.functionProviders));
    }

    private static <K, V> void putAndThrowIfPresent(Map<K, V> map, K k, V v) {
        if (map.put(k, v) != null) {
            throw new IllegalStateException(String.format("A binding for the key %s was previously defined.", k));
        }
    }

    private static <K> void putAndThrowIfPresent(Set<K> set, K k) {
        if (!set.add(k)) {
            throw new IllegalStateException(String.format("A binding for the key %s was previously defined.", k));
        }
    }
}
