package org.apache.flink.ml.common.broadcast;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
import org.apache.flink.ml.common.broadcast.operator.BroadcastVariableReceiverOperatorFactory;
import org.apache.flink.ml.common.broadcast.operator.BroadcastWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/ml/common/broadcast/BroadcastUtils.class */
public class BroadcastUtils {
    @Internal
    public static <OUT> DataStream<OUT> withBroadcastStream(List<DataStream<?>> list, Map<String, DataStream<?>> map, Function<List<DataStream<?>>, DataStream<OUT>> function) {
        Preconditions.checkArgument(list.size() > 0);
        StreamExecutionEnvironment executionEnvironment = list.get(0).getExecutionEnvironment();
        String[] strArr = new String[map.size()];
        DataStream[] dataStreamArr = new DataStream[map.size()];
        TypeInformation[] typeInformationArr = new TypeInformation[map.size()];
        int i = 0;
        String hexString = new AbstractID().toHexString();
        for (String str : map.keySet()) {
            strArr[i] = hexString + "-" + str;
            dataStreamArr[i] = map.get(str);
            typeInformationArr[i] = dataStreamArr[i].getType();
            i++;
        }
        DataStream resultStream = getResultStream(executionEnvironment, list, strArr, function);
        TypeInformation type = resultStream.getType();
        String str2 = "broadcast-co-location-" + UUID.randomUUID();
        DataStream cacheBroadcastVariables = cacheBroadcastVariables(executionEnvironment, strArr, dataStreamArr, typeInformationArr, resultStream.getParallelism(), type);
        if (!((cacheBroadcastVariables.getTransformation() instanceof PhysicalTransformation) && (resultStream.getTransformation() instanceof PhysicalTransformation))) {
            throw new UnsupportedOperationException("cannot set chaining strategy on " + cacheBroadcastVariables.getTransformation() + " and " + resultStream.getTransformation() + ".");
        }
        cacheBroadcastVariables.getTransformation().setChainingStrategy(ChainingStrategy.HEAD);
        resultStream.getTransformation().setChainingStrategy(ChainingStrategy.HEAD);
        cacheBroadcastVariables.getTransformation().setCoLocationGroupKey(str2);
        resultStream.getTransformation().setCoLocationGroupKey(str2);
        return cacheBroadcastVariables.union(new DataStream[]{resultStream});
    }

    private static <OUT> DataStream<OUT> cacheBroadcastVariables(StreamExecutionEnvironment streamExecutionEnvironment, String[] strArr, DataStream<?>[] dataStreamArr, TypeInformation<?>[] typeInformationArr, int i, TypeInformation<OUT> typeInformation) {
        MultipleInputTransformation multipleInputTransformation = new MultipleInputTransformation("broadcastInputs", new BroadcastVariableReceiverOperatorFactory(strArr, typeInformationArr), typeInformation, i);
        for (DataStream<?> dataStream : dataStreamArr) {
            multipleInputTransformation.addInput(dataStream.broadcast().getTransformation());
        }
        streamExecutionEnvironment.addOperator(multipleInputTransformation);
        return new MultipleConnectedStreams(streamExecutionEnvironment).transform(multipleInputTransformation);
    }

    private static <OUT> DataStream<OUT> getResultStream(StreamExecutionEnvironment streamExecutionEnvironment, List<DataStream<?>> list, String[] strArr, Function<List<DataStream<?>>, DataStream<OUT>> function) {
        TypeInformation[] typeInformationArr = new TypeInformation[list.size()];
        for (int i = 0; i < list.size(); i++) {
            typeInformationArr[i] = list.get(i).getType();
        }
        boolean[] zArr = new boolean[list.size()];
        Arrays.fill(zArr, false);
        DraftExecutionEnvironment draftExecutionEnvironment = new DraftExecutionEnvironment(streamExecutionEnvironment, new BroadcastWrapper(strArr, typeInformationArr, zArr));
        ArrayList arrayList = new ArrayList();
        for (DataStream<?> dataStream : list) {
            arrayList.add(draftExecutionEnvironment.addDraftSource(dataStream, dataStream.getType()));
        }
        DataStream<OUT> apply = function.apply(arrayList);
        Preconditions.checkState(draftExecutionEnvironment.getStreamGraph(false).getStreamNodes().size() == 1 + list.size(), "cannot add more than one operator in withBroadcastStream's lambda function.");
        draftExecutionEnvironment.copyToActualEnvironment();
        return draftExecutionEnvironment.getActualStream(apply.getId());
    }
}
