/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.functions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.flink.translation.functions.AbstractFlinkCombineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;

public class HashingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> {
    @Override
    public void combine(AbstractFlinkCombineRunner.FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner, WindowingStrategy<Object, W> windowingStrategy, SideInputReader sideInputReader, PipelineOptions options, Iterable<WindowedValue<KV<K, InputT>>> elements, Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
        TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
        WindowFn windowFn = windowingStrategy.getWindowFn();
        ArrayList<WindowedValue<KV<K, InputT>>> inputs = new ArrayList<WindowedValue<KV<K, InputT>>>();
        Iterables.addAll(inputs, elements);
        Set<W> windows = this.collectWindows(inputs);
        Map<W, W> windowToMergeResult = this.mergeWindows(windowingStrategy, windows);
        HashMap<BoundedWindow, Tuple2> mapState = new HashMap<BoundedWindow, Tuple2>();
        Iterator iterator = inputs.iterator();
        WindowedValue currentValue = (WindowedValue)iterator.next();
        Object key = ((KV)currentValue.getValue()).getKey();
        while (true) {
            for (BoundedWindow boundedWindow : currentValue.getWindows()) {
                BoundedWindow currentWindow = boundedWindow;
                BoundedWindow mergedWindow = (BoundedWindow)windowToMergeResult.get(currentWindow);
                mergedWindow = mergedWindow == null ? currentWindow : mergedWindow;
                Set<BoundedWindow> singletonW = Collections.singleton(mergedWindow);
                Tuple2 accumAndInstant = (Tuple2)mapState.get(mergedWindow);
                if (accumAndInstant == null) {
                    Object accumT = flinkCombiner.firstInput(key, ((KV)currentValue.getValue()).getValue(), options, sideInputReader, singletonW);
                    Instant windowTimestamp = timestampCombiner.assign(mergedWindow, windowFn.getOutputTime(currentValue.getTimestamp(), mergedWindow));
                    accumAndInstant = new Tuple2(accumT, (Object)windowTimestamp);
                    mapState.put(mergedWindow, accumAndInstant);
                    continue;
                }
                accumAndInstant.f0 = flinkCombiner.addInput(key, accumAndInstant.f0, ((KV)currentValue.getValue()).getValue(), options, sideInputReader, singletonW);
                accumAndInstant.f1 = timestampCombiner.combine(new Instant[]{(Instant)accumAndInstant.f1, timestampCombiner.assign(mergedWindow, windowingStrategy.getWindowFn().getOutputTime(currentValue.getTimestamp(), mergedWindow))});
            }
            if (!iterator.hasNext()) break;
            currentValue = (WindowedValue)iterator.next();
        }
        for (Map.Entry entry : mapState.entrySet()) {
            Object accumulator = ((Tuple2)entry.getValue()).f0;
            Instant windowTimestamp = (Instant)((Tuple2)entry.getValue()).f1;
            out.collect((Object)WindowedValue.of((Object)KV.of((Object)key, flinkCombiner.extractOutput(key, accumulator, options, sideInputReader, Collections.singleton((BoundedWindow)entry.getKey()))), (Instant)windowTimestamp, (BoundedWindow)((BoundedWindow)entry.getKey()), (PaneInfo)PaneInfo.NO_FIRING));
        }
    }

    private Map<W, W> mergeWindows(WindowingStrategy<Object, W> windowingStrategy, Set<W> windows) throws Exception {
        WindowFn windowFn = windowingStrategy.getWindowFn();
        if (windowingStrategy.getWindowFn().isNonMerging()) {
            return Collections.emptyMap();
        }
        HashMap windowToMergeResult = new HashMap();
        windowFn.mergeWindows((WindowFn.MergeContext)new MergeContextImpl(windowFn, windows, windowToMergeResult));
        return windowToMergeResult;
    }

    private Set<W> collectWindows(Iterable<WindowedValue<KV<K, InputT>>> values) {
        HashSet<BoundedWindow> windows = new HashSet<BoundedWindow>();
        for (WindowedValue<KV<K, InputT>> value : values) {
            Iterator iterator = value.getWindows().iterator();
            while (iterator.hasNext()) {
                BoundedWindow untypedWindow;
                BoundedWindow window = untypedWindow = (BoundedWindow)iterator.next();
                windows.add(window);
            }
        }
        return windows;
    }

    private class MergeContextImpl
    extends WindowFn.MergeContext {
        private Set<W> windows;
        private Map<W, W> windowToMergeResult;

        MergeContextImpl(WindowFn<Object, W> windowFn, Set<W> windows, Map<W, W> windowToMergeResult) {
            super(windowFn);
            this.windows = windows;
            this.windowToMergeResult = windowToMergeResult;
        }

        public Collection<W> windows() {
            return this.windows;
        }

        public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
            for (BoundedWindow w : toBeMerged) {
                this.windowToMergeResult.put(w, mergeResult);
            }
        }
    }
}

