package org.apache.beam.sdk.util;

import org.apache.beam.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdk.repackaged.com.google.common.base.Function;
import org.apache.beam.sdk.repackaged.com.google.common.base.Predicate;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;

/* loaded from: input_file:org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.class */
public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWindow> implements DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
    private final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner;
    private final LateDataFilter lateDataFilter;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/util/LateDataDroppingDoFnRunner$LateDataFilter.class */
    public static class LateDataFilter {
        private final WindowingStrategy<?, ?> windowingStrategy;
        private final TimerInternals timerInternals;
        private final Aggregator<Long, Long> droppedDueToLateness;

        public LateDataFilter(WindowingStrategy<?, ?> windowingStrategy, TimerInternals timerInternals, Aggregator<Long, Long> aggregator) {
            this.windowingStrategy = windowingStrategy;
            this.timerInternals = timerInternals;
            this.droppedDueToLateness = aggregator;
        }

        public <K, InputT> Iterable<WindowedValue<InputT>> filter(final K k, Iterable<WindowedValue<InputT>> iterable) {
            return Iterables.filter(Iterables.concat(Iterables.transform(iterable, new Function<WindowedValue<InputT>, Iterable<WindowedValue<InputT>>>() { // from class: org.apache.beam.sdk.util.LateDataDroppingDoFnRunner.LateDataFilter.1
                @Override // org.apache.beam.sdk.repackaged.com.google.common.base.Function
                public Iterable<WindowedValue<InputT>> apply(final WindowedValue<InputT> windowedValue) {
                    return Iterables.transform(windowedValue.getWindows(), new Function<BoundedWindow, WindowedValue<InputT>>() { // from class: org.apache.beam.sdk.util.LateDataDroppingDoFnRunner.LateDataFilter.1.1
                        @Override // org.apache.beam.sdk.repackaged.com.google.common.base.Function
                        public WindowedValue<InputT> apply(BoundedWindow boundedWindow) {
                            return WindowedValue.of(windowedValue.getValue(), windowedValue.getTimestamp(), boundedWindow, windowedValue.getPane());
                        }
                    });
                }
            })), new Predicate<WindowedValue<InputT>>() { // from class: org.apache.beam.sdk.util.LateDataDroppingDoFnRunner.LateDataFilter.2
                @Override // org.apache.beam.sdk.repackaged.com.google.common.base.Predicate
                public boolean apply(WindowedValue<InputT> windowedValue) {
                    BoundedWindow boundedWindow = (BoundedWindow) Iterables.getOnlyElement(windowedValue.getWindows());
                    if (!LateDataFilter.this.canDropDueToExpiredWindow(boundedWindow)) {
                        return true;
                    }
                    LateDataFilter.this.droppedDueToLateness.addValue(1L);
                    WindowTracing.debug("ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} since too far behind inputWatermark:{}; outputWatermark:{}", windowedValue.getTimestamp(), k, boundedWindow, LateDataFilter.this.timerInternals.currentInputWatermarkTime(), LateDataFilter.this.timerInternals.currentOutputWatermarkTime());
                    return false;
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean canDropDueToExpiredWindow(BoundedWindow boundedWindow) {
            return boundedWindow.maxTimestamp().plus(this.windowingStrategy.getAllowedLateness()).isBefore(this.timerInternals.currentInputWatermarkTime());
        }
    }

    public LateDataDroppingDoFnRunner(DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner, WindowingStrategy<?, ?> windowingStrategy, TimerInternals timerInternals, Aggregator<Long, Long> aggregator) {
        this.doFnRunner = doFnRunner;
        this.lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals, aggregator);
    }

    @Override // org.apache.beam.sdk.util.DoFnRunner
    public void startBundle() {
        this.doFnRunner.startBundle();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.sdk.util.DoFnRunner
    public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> windowedValue) {
        this.doFnRunner.processElement(windowedValue.withValue(KeyedWorkItems.workItem(windowedValue.getValue().key(), windowedValue.getValue().timersIterable(), this.lateDataFilter.filter(windowedValue.getValue().key(), windowedValue.getValue().elementsIterable()))));
    }

    @Override // org.apache.beam.sdk.util.DoFnRunner
    public void finishBundle() {
        this.doFnRunner.finishBundle();
    }
}
