package org.apache.beam.runners.direct.portable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.GroupAlsoByWindowsAggregators;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.OutputWindowedValue;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.ReduceFnRunner;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SystemReduceFn;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.RehydratedComponents;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.TriggerTranslation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.wire.WireCoders;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.ExecutableGraph;
import org.apache.beam.runners.direct.portable.DirectGroupByKey;
import org.apache.beam.runners.direct.portable.StepStateAndTimers;
import org.apache.beam.sdk.coders.IterableLikeCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/portable/GroupAlsoByWindowEvaluatorFactory.class */
public class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
    private final BundleFactory bundleFactory;
    private final ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> graph;
    private final RunnerApi.Components components;
    private final StepStateAndTimers.Provider stp;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/portable/GroupAlsoByWindowEvaluatorFactory$GroupAlsoByWindowEvaluator.class */
    public static class GroupAlsoByWindowEvaluator<K, V> implements TransformEvaluator<KeyedWorkItem<K, V>> {
        private final BundleFactory bundleFactory;
        private final PipelineNode.PTransformNode application;
        private final PipelineNode.PCollectionNode outputCollection;
        private final StructuralKey<?> key;
        private final CopyOnAccessInMemoryStateInternals<K> stateInternals;
        private final DirectTimerInternals timerInternals;
        private final WindowingStrategy<?, BoundedWindow> windowingStrategy;
        private final Collection<UncommittedBundle<?>> outputBundles;
        private final SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
        private final Counter droppedDueToLateness;

        /* JADX WARN: Multi-variable type inference failed */
        private GroupAlsoByWindowEvaluator(BundleFactory bundleFactory, StructuralKey<K> structuralKey, PipelineNode.PTransformNode pTransformNode, ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> executableGraph, RunnerApi.Components components, StepStateAndTimers<K> stepStateAndTimers) {
            this.bundleFactory = bundleFactory;
            this.application = pTransformNode;
            this.outputCollection = (PipelineNode.PCollectionNode) Iterables.getOnlyElement(executableGraph.getProduced(pTransformNode));
            this.key = structuralKey;
            this.stateInternals = stepStateAndTimers.stateInternals();
            this.timerInternals = stepStateAndTimers.timerInternals();
            try {
                this.windowingStrategy = RehydratedComponents.forComponents(components).getWindowingStrategy(((PipelineNode.PCollectionNode) Iterables.getOnlyElement(executableGraph.getPerElementInputs(pTransformNode))).getPCollection().getWindowingStrategyId());
                this.outputBundles = new ArrayList();
                try {
                    WindowedValue.WindowedValueCoder instantiateRunnerWireCoder = WireCoders.instantiateRunnerWireCoder(this.outputCollection, components);
                    Preconditions.checkArgument(instantiateRunnerWireCoder instanceof WindowedValue.WindowedValueCoder);
                    KvCoder valueCoder = instantiateRunnerWireCoder.getValueCoder();
                    Preconditions.checkArgument(valueCoder instanceof KvCoder);
                    IterableLikeCoder valueCoder2 = valueCoder.getValueCoder();
                    Preconditions.checkArgument(valueCoder2 instanceof IterableLikeCoder);
                    this.reduceFn = SystemReduceFn.buffering(valueCoder2.getElemCoder());
                    this.droppedDueToLateness = Metrics.counter(GroupAlsoByWindowEvaluator.class, GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.beam.runners.direct.portable.TransformEvaluator
        public void processElement(WindowedValue<KeyedWorkItem<K, V>> windowedValue) throws Exception {
            KeyedWorkItem keyedWorkItem = (KeyedWorkItem) windowedValue.getValue();
            UncommittedBundle<?> createKeyedBundle = this.bundleFactory.createKeyedBundle(this.key, this.outputCollection);
            this.outputBundles.add(createKeyedBundle);
            ReduceFnRunner reduceFnRunner = new ReduceFnRunner(keyedWorkItem.key(), this.windowingStrategy, ExecutableTriggerStateMachine.create(TriggerStateMachines.stateMachineForTrigger(TriggerTranslation.toProto(this.windowingStrategy.getTrigger()))), this.stateInternals, this.timerInternals, new OutputWindowedValueToBundle(createKeyedBundle), null, this.reduceFn, null);
            reduceFnRunner.processElements(dropExpiredWindows(keyedWorkItem.key(), keyedWorkItem.elementsIterable(), this.timerInternals));
            reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
            reduceFnRunner.persist();
        }

        @Override // org.apache.beam.runners.direct.portable.TransformEvaluator
        public TransformResult<KeyedWorkItem<K, V>> finishBundle() throws Exception {
            CopyOnAccessInMemoryStateInternals<K> commit = this.stateInternals.commit();
            return StepTransformResult.withHold(this.application, commit.getEarliestWatermarkHold()).withState(commit).addOutput(this.outputBundles).withTimerUpdate(this.timerInternals.getTimerUpdate()).build();
        }

        Iterable<WindowedValue<V>> dropExpiredWindows(K k, Iterable<WindowedValue<V>> iterable, TimerInternals timerInternals) {
            return (Iterable) StreamSupport.stream(iterable.spliterator(), false).flatMap(windowedValue -> {
                return StreamSupport.stream(windowedValue.explodeWindows().spliterator(), false);
            }).filter(windowedValue2 -> {
                BoundedWindow boundedWindow = (BoundedWindow) Iterables.getOnlyElement(windowedValue2.getWindows());
                boolean isBefore = boundedWindow.maxTimestamp().plus(this.windowingStrategy.getAllowedLateness()).isBefore(timerInternals.currentInputWatermarkTime());
                if (isBefore) {
                    this.droppedDueToLateness.inc();
                    WindowTracing.debug("{}: Dropping element at {} for key: {}; window: {} since it is too far behind inputWatermark: {}", new Object[]{DirectGroupByKey.DirectGroupAlsoByWindow.class.getSimpleName(), windowedValue2.getTimestamp(), k, boundedWindow, timerInternals.currentInputWatermarkTime()});
                }
                return !isBefore;
            }).collect(Collectors.toList());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/portable/GroupAlsoByWindowEvaluatorFactory$OutputWindowedValueToBundle.class */
    private static class OutputWindowedValueToBundle<K, V> implements OutputWindowedValue<KV<K, Iterable<V>>> {
        private final UncommittedBundle<KV<K, Iterable<V>>> bundle;

        private OutputWindowedValueToBundle(UncommittedBundle<KV<K, Iterable<V>>> uncommittedBundle) {
            this.bundle = uncommittedBundle;
        }

        public void outputWindowedValue(KV<K, Iterable<V>> kv, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            this.bundle.add(WindowedValue.of(kv, instant, collection, paneInfo));
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.OutputWindowedValue
        public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            throw new UnsupportedOperationException(String.format("%s should not use tagged outputs", "DirectGroupAlsoByWindow"));
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.core.OutputWindowedValue
        public /* bridge */ /* synthetic */ void outputWindowedValue(Object obj, Instant instant, Collection collection, PaneInfo paneInfo) {
            outputWindowedValue((KV) obj, instant, (Collection<? extends BoundedWindow>) collection, paneInfo);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GroupAlsoByWindowEvaluatorFactory(ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> executableGraph, RunnerApi.Components components, BundleFactory bundleFactory, StepStateAndTimers.Provider provider) {
        this.bundleFactory = bundleFactory;
        this.graph = executableGraph;
        this.components = components;
        this.stp = provider;
    }

    @Override // org.apache.beam.runners.direct.portable.TransformEvaluatorFactory
    public <InputT> TransformEvaluator<InputT> forApplication(PipelineNode.PTransformNode pTransformNode, CommittedBundle<?> committedBundle) {
        return createEvaluator(pTransformNode, committedBundle);
    }

    @Override // org.apache.beam.runners.direct.portable.TransformEvaluatorFactory
    public void cleanup() {
    }

    private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(PipelineNode.PTransformNode pTransformNode, CommittedBundle<KeyedWorkItem<K, V>> committedBundle) {
        StructuralKey<?> key = committedBundle.getKey();
        return new GroupAlsoByWindowEvaluator(this.bundleFactory, key, pTransformNode, this.graph, this.components, this.stp.forStepAndKey(pTransformNode, key));
    }
}
