package org.apache.flink.statefun.flink.state.processor.union;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.statefun.flink.state.processor.BootstrapDataRouterProvider;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.Router;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/statefun/flink/state/processor/union/BootstrapDatasetUnion.class */
public final class BootstrapDatasetUnion {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/state/processor/union/BootstrapDatasetUnion$BootstrapRouterFlatMap.class */
    public static class BootstrapRouterFlatMap<T> extends RichFlatMapFunction<T, TaggedBootstrapData> {
        private static final long serialVersionUID = 1;
        private final BootstrapDataRouterProvider<T> routerProvider;
        private final int unionIndex;
        private transient Router<T> router;

        BootstrapRouterFlatMap(BootstrapDataRouterProvider<T> bootstrapDataRouterProvider, int i) {
            this.routerProvider = (BootstrapDataRouterProvider) Objects.requireNonNull(bootstrapDataRouterProvider);
            Preconditions.checkArgument(i >= 0);
            this.unionIndex = i;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.router = this.routerProvider.provide();
        }

        public void flatMap(T t, Collector<TaggedBootstrapData> collector) throws Exception {
            this.router.route(t, new TaggingBootstrapDataCollector(collector, this.unionIndex));
        }
    }

    /* loaded from: input_file:org/apache/flink/statefun/flink/state/processor/union/BootstrapDatasetUnion$TaggingBootstrapDataCollector.class */
    private static class TaggingBootstrapDataCollector<T> implements Router.Downstream<T> {
        private final Collector<TaggedBootstrapData> out;
        private final int unionIndex;

        TaggingBootstrapDataCollector(Collector<TaggedBootstrapData> collector, int i) {
            this.out = (Collector) Objects.requireNonNull(collector);
            this.unionIndex = i;
        }

        public void forward(FunctionType functionType, String str, T t) {
            this.out.collect(new TaggedBootstrapData(new Address(functionType, str), t, this.unionIndex));
        }

        public void forward(Address address, T t) {
            this.out.collect(new TaggedBootstrapData(address, t, this.unionIndex));
        }
    }

    public static DataSet<TaggedBootstrapData> apply(List<BootstrapDataset<?>> list) {
        Objects.requireNonNull(list);
        Preconditions.checkArgument(list.size() > 0);
        ArrayList arrayList = new ArrayList(list.size());
        TypeInformation<TaggedBootstrapData> createUnionTypeInfo = createUnionTypeInfo(list);
        int i = 0;
        Iterator<BootstrapDataset<?>> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(toTaggedFlinkDataSet(it.next(), i, createUnionTypeInfo));
            i++;
        }
        return unionTaggedBootstrapDataSets(arrayList);
    }

    private static TypeInformation<TaggedBootstrapData> createUnionTypeInfo(List<BootstrapDataset<?>> list) {
        return new TaggedBootstrapDataTypeInfo((List) list.stream().map(bootstrapDataset -> {
            return bootstrapDataset.getDataSet().getType();
        }).collect(Collectors.toList()));
    }

    private static <T> DataSet<TaggedBootstrapData> toTaggedFlinkDataSet(BootstrapDataset<T> bootstrapDataset, int i, TypeInformation<TaggedBootstrapData> typeInformation) {
        return bootstrapDataset.getDataSet().flatMap(new BootstrapRouterFlatMap(bootstrapDataset.getRouterProvider(), i)).returns(typeInformation);
    }

    private static DataSet<TaggedBootstrapData> unionTaggedBootstrapDataSets(List<DataSet<TaggedBootstrapData>> list) {
        UnionOperator unionOperator = null;
        Iterator<DataSet<TaggedBootstrapData>> it = list.iterator();
        while (it.hasNext()) {
            UnionOperator unionOperator2 = (DataSet) it.next();
            unionOperator = unionOperator != null ? unionOperator.union(unionOperator2) : unionOperator2;
        }
        return unionOperator;
    }
}
