package org.apache.flink.statefun.flink.core.translation;

import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.StatefulFunctionsJobConstants;
import org.apache.flink.statefun.flink.core.common.KeyBy;
import org.apache.flink.statefun.flink.core.common.SerializableFunction;
import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
import org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator;
import org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperatorFactory;
import org.apache.flink.statefun.flink.core.functions.FunctionGroupDispatchFactory;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.MessageKeySelector;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.util.OutputTag;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/translation/StatefulFunctionTranslator.class */
final class StatefulFunctionTranslator {
    private final FeedbackKey<Message> feedbackKey;
    private final StatefulFunctionsConfig configuration;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/translation/StatefulFunctionTranslator$FeedbackKeySelector.class */
    public static final class FeedbackKeySelector implements SerializableFunction<Message, String> {
        private static final long serialVersionUID = 1;

        private FeedbackKeySelector() {
        }

        @Override // java.util.function.Function
        public String apply(Message message) {
            return KeyBy.apply(message.target());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/translation/StatefulFunctionTranslator$IsCheckpointBarrier.class */
    public static final class IsCheckpointBarrier implements SerializableFunction<Message, OptionalLong> {
        private static final long serialVersionUID = 1;

        private IsCheckpointBarrier() {
        }

        @Override // java.util.function.Function
        public OptionalLong apply(Message message) {
            return message.isBarrierMessage();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StatefulFunctionTranslator(FeedbackKey<Message> feedbackKey, StatefulFunctionsConfig statefulFunctionsConfig) {
        this.feedbackKey = (FeedbackKey) Objects.requireNonNull(feedbackKey);
        this.configuration = (StatefulFunctionsConfig) Objects.requireNonNull(statefulFunctionsConfig);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<EgressIdentifier<?>, DataStream<?>> translate(Sources sources, Sinks sinks) {
        SingleOutputStreamOperator<Message> feedbackUnionOperator = feedbackUnionOperator(sources.unionStream());
        SingleOutputStreamOperator<Message> functionOperator = functionOperator(feedbackUnionOperator, sinks.sideOutputTags());
        coLocate(feedbackUnionOperator, functionOperator, feedbackOperator(functionOperator));
        return sinks.sideOutputStreams(functionOperator);
    }

    private SingleOutputStreamOperator<Message> feedbackUnionOperator(DataStream<Message> dataStream) {
        return dataStream.keyBy(new MessageKeySelector()).transform(StatefulFunctionsJobConstants.FEEDBACK_UNION_OPERATOR_NAME, dataStream.getType(), new FeedbackUnionOperatorFactory(this.configuration, this.feedbackKey, new IsCheckpointBarrier(), new FeedbackKeySelector())).uid(StatefulFunctionsJobConstants.FEEDBACK_UNION_OPERATOR_UID);
    }

    private SingleOutputStreamOperator<Message> functionOperator(DataStream<Message> dataStream, Map<EgressIdentifier<?>, OutputTag<Object>> map) {
        return DataStreamUtils.reinterpretAsKeyedStream(dataStream, new MessageKeySelector()).transform(StatefulFunctionsJobConstants.FUNCTION_OPERATOR_NAME, dataStream.getType(), new FunctionGroupDispatchFactory(this.configuration, map)).uid(StatefulFunctionsJobConstants.FUNCTION_OPERATOR_UID);
    }

    private SingleOutputStreamOperator<Void> feedbackOperator(SingleOutputStreamOperator<Message> singleOutputStreamOperator) {
        return singleOutputStreamOperator.keyBy(new MessageKeySelector()).transform(StatefulFunctionsJobConstants.WRITE_BACK_OPERATOR_NAME, TypeInformation.of(Void.class), new FeedbackSinkOperator(this.feedbackKey, new CheckpointToMessage(this.configuration.getFactoryType()))).uid(StatefulFunctionsJobConstants.WRITE_BACK_OPERATOR_UID);
    }

    private void coLocate(DataStream<?> dataStream, DataStream<?> dataStream2, DataStream<?> dataStream3) {
        String asColocationKey = this.feedbackKey.asColocationKey();
        dataStream.getTransformation().setCoLocationGroupKey(asColocationKey);
        dataStream2.getTransformation().setCoLocationGroupKey(asColocationKey);
        dataStream3.getTransformation().setCoLocationGroupKey(asColocationKey);
        dataStream.getTransformation().setParallelism(dataStream2.getParallelism());
        dataStream3.getTransformation().setParallelism(dataStream2.getParallelism());
    }
}
