package io.dstream.local.ri;

import io.dstream.DStreamConstants;
import io.dstream.DStreamExecutionGraph;
import io.dstream.DStreamOperation;
import io.dstream.SerializableStreamAssets;
import io.dstream.local.ri.ShuffleHelper;
import io.dstream.support.AbstractPartitionedStreamProducingSourceSupplier;
import io.dstream.support.Aggregators;
import io.dstream.support.Classifier;
import io.dstream.support.HashClassifier;
import io.dstream.support.PartitionIdHelper;
import io.dstream.support.SourceSupplier;
import io.dstream.support.UriSourceSupplier;
import io.dstream.utils.KVUtils;
import io.dstream.utils.ReflectionUtils;
import io.dstream.utils.SingleValueIterator;
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:io/dstream/local/ri/LocalDStreamExecutionEngine.class */
final class LocalDStreamExecutionEngine {
    private final Properties executionConfig;
    private final String executionName;
    private final Classifier classifier = determineClassifier();
    private List<List<?>> realizedStageResults;
    private final ThreadLocal<Integer> partitionIdHolder;

    public LocalDStreamExecutionEngine(String str, Properties properties) {
        this.executionName = str;
        this.executionConfig = properties;
        try {
            Field findField = ReflectionUtils.findField(PartitionIdHelper.class, "partitionIdHolder", ThreadLocal.class);
            findField.setAccessible(true);
            this.partitionIdHolder = (ThreadLocal) findField.get(null);
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    public Stream<Stream<?>> execute(DStreamExecutionGraph dStreamExecutionGraph) {
        return execute(dStreamExecutionGraph, false);
    }

    private Stream<Stream<?>> execute(DStreamExecutionGraph dStreamExecutionGraph, boolean z) {
        List<DStreamOperation> operations = dStreamExecutionGraph.getOperations();
        for (int i = 0; i < operations.size(); i++) {
            doExecuteStage(operations.get(i), z, dStreamExecutionGraph.getName());
        }
        return this.realizedStageResults.stream().map(list -> {
            return list.stream();
        });
    }

    private void doExecuteStage(DStreamOperation dStreamOperation, boolean z, String str) {
        SerializableStreamAssets.SerFunction<Stream<?>, Stream<?>> streamOperationFunction = dStreamOperation.getStreamOperationFunction();
        if (this.realizedStageResults == null) {
            List<List<?>> list = (List) Stream.of(streamOperationFunction.apply(createInitialStream(str))).map(stream -> {
                return (List) stream.collect(Collectors.toList());
            }).collect(Collectors.toList());
            if (z) {
                list = (List) unmapPartitions(partitionStream((Stream) list.stream().map(list2 -> {
                    return list2.stream();
                }).reduce((stream2, stream3) -> {
                    return Stream.concat(stream2, stream3);
                }).get())).map(stream4 -> {
                    return (List) stream4.collect(Collectors.toList());
                }).collect(Collectors.toList());
            }
            this.realizedStageResults = list;
            return;
        }
        Stream<Stream<?>> unmapPartitions = unmapPartitions(partitionStream((Stream) this.realizedStageResults.stream().map(list3 -> {
            return list3.stream();
        }).reduce((stream5, stream6) -> {
            return Stream.concat(stream5, stream6);
        }).get()));
        if (dStreamOperation.getCombinableExecutionGraphs().size() > 0) {
            List list4 = (List) unmapPartitions.collect(Collectors.toList());
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            for (int i = 0; i < list4.size(); i++) {
                linkedHashMap.merge(Integer.valueOf(i), list4.get(i), Aggregators::aggregateToList);
            }
            Iterator<DStreamExecutionGraph> it = dStreamOperation.getCombinableExecutionGraphs().iterator();
            while (it.hasNext()) {
                List list5 = (List) new LocalDStreamExecutionEngine(this.executionName, this.executionConfig).execute(it.next(), true).collect(Collectors.toList());
                for (int i2 = 0; i2 < list5.size(); i2++) {
                    linkedHashMap.merge(Integer.valueOf(i2), list5.get(i2), Aggregators::aggregateToList);
                }
            }
            unmapPartitions = linkedHashMap.values().stream().map(obj -> {
                return ((List) obj).stream();
            });
        }
        this.realizedStageResults = (List) unmapPartitions.map(stream7 -> {
            return (Stream) streamOperationFunction.apply(stream7);
        }).map(stream8 -> {
            return (List) stream8.collect(Collectors.toList());
        }).collect(Collectors.toList());
    }

    private Stream<Stream<?>> unmapPartitions(Stream<Map.Entry<Integer, List<Object>>> stream) {
        return stream.map(entry -> {
            return ((List) entry.getValue()).stream().map(obj -> {
                this.partitionIdHolder.set(entry.getKey());
                return obj;
            });
        });
    }

    private <R> Stream<R> createInitialStream(String str) {
        SourceSupplier create = SourceSupplier.create(this.executionConfig, str, null);
        if (create instanceof UriSourceSupplier) {
            return (Stream) ((UriSourceSupplier) create).get().map(this::buildStreamFromURI).reduce(Stream::concat).get();
        }
        if (create instanceof AbstractPartitionedStreamProducingSourceSupplier) {
            return ((AbstractPartitionedStreamProducingSourceSupplier) create).get();
        }
        throw new IllegalStateException("Unsupported SourceSupplier " + create.getClass().getName());
    }

    private Stream<String> buildStreamFromURI(URI uri) {
        try {
            return Files.lines(Paths.get(uri));
        } catch (Exception e) {
            throw new IllegalStateException("Failed to create Stream from URI: " + uri, e);
        }
    }

    private Stream<Map.Entry<Integer, List<Object>>> partitionStream(Stream<?> stream) {
        return Stream.of(stream.map(obj -> {
            return KVUtils.kv(this.classifier.getClassificationId(obj), obj);
        })).map(stream2 -> {
            return (Map) stream2.collect(Collectors.toMap(entry -> {
                return (Integer) entry.getKey();
            }, entry2 -> {
                return new ShuffleHelper.RefHolder(entry2.getValue());
            }, ShuffleHelper::group));
        }).flatMap(map -> {
            return map.entrySet().stream();
        }).map(entry -> {
            Map.Entry kv;
            Object value = entry.getValue();
            if (value instanceof ShuffleHelper.RefHolder) {
                Object obj2 = ((ShuffleHelper.RefHolder) value).ref;
                value = obj2 instanceof Map.Entry ? Stream.of((Map.Entry) obj2).collect(Collectors.toMap(entry -> {
                    return entry.getKey();
                }, entry2 -> {
                    return Collections.singletonList(entry2.getValue());
                })) : Stream.of(obj2).collect(Collectors.toList());
            }
            if (value instanceof Map) {
                Map map2 = (Map) value;
                map2.forEach((obj3, obj4) -> {
                    map2.replace(obj3, obj4 instanceof List ? ((List) obj4).iterator() : new SingleValueIterator(obj4));
                });
                kv = KVUtils.kv(entry.getKey(), new ArrayList(new TreeMap(map2).entrySet()));
            } else {
                kv = KVUtils.kv(entry.getKey(), (List) value);
            }
            return kv;
        });
    }

    private Classifier determineClassifier() {
        String property = this.executionConfig.getProperty(DStreamConstants.PARALLELISM);
        String property2 = this.executionConfig.getProperty(DStreamConstants.CLASSIFIER);
        int parseInt = property == null ? 1 : Integer.parseInt(property);
        return property2 != null ? (Classifier) ReflectionUtils.newInstance(property2, (Class<?>[]) new Class[]{Integer.TYPE}, new Object[]{Integer.valueOf(parseInt)}) : new HashClassifier(parseInt);
    }
}
