package org.apache.beam.runners.core;

import java.util.ArrayList;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/core/LateDataDroppingDoFnRunner.class */
public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWindow> implements DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
    private final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner;
    private final LateDataFilter lateDataFilter;
    public static final String DROPPED_DUE_TO_LATENESS = "droppedDueToLateness";

    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/runners/core/LateDataDroppingDoFnRunner$LateDataFilter.class */
    static class LateDataFilter {
        private final WindowingStrategy<?, ?> windowingStrategy;
        private final TimerInternals timerInternals;
        private final Counter droppedDueToLateness = Metrics.counter(LateDataDroppingDoFnRunner.class, LateDataDroppingDoFnRunner.DROPPED_DUE_TO_LATENESS);

        public LateDataFilter(WindowingStrategy<?, ?> windowingStrategy, TimerInternals timerInternals) {
            this.windowingStrategy = windowingStrategy;
            this.timerInternals = timerInternals;
        }

        public <K, InputT> Iterable<WindowedValue<InputT>> filter(K k, Iterable<WindowedValue<InputT>> iterable) {
            ArrayList arrayList = new ArrayList();
            for (WindowedValue<InputT> windowedValue : iterable) {
                for (BoundedWindow boundedWindow : windowedValue.getWindows()) {
                    if (canDropDueToExpiredWindow(boundedWindow)) {
                        this.droppedDueToLateness.inc();
                        WindowTracing.debug("{}: Dropping element at {} for key:{}; window:{} since too far behind inputWatermark:{}; outputWatermark:{}", new Object[]{LateDataFilter.class.getSimpleName(), windowedValue.getTimestamp(), k, boundedWindow, this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime()});
                    } else {
                        arrayList.add(WindowedValue.of(windowedValue.getValue(), windowedValue.getTimestamp(), boundedWindow, windowedValue.getPane()));
                    }
                }
            }
            return arrayList;
        }

        private boolean canDropDueToExpiredWindow(BoundedWindow boundedWindow) {
            return LateDataUtils.garbageCollectionTime(boundedWindow, this.windowingStrategy).isBefore(this.timerInternals.currentInputWatermarkTime());
        }
    }

    public LateDataDroppingDoFnRunner(DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner, WindowingStrategy<?, ?> windowingStrategy, TimerInternals timerInternals) {
        this.doFnRunner = doFnRunner;
        this.lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals);
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getFn() {
        return this.doFnRunner.getFn();
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public void startBundle() {
        this.doFnRunner.startBundle();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.runners.core.DoFnRunner
    public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> windowedValue) {
        this.doFnRunner.processElement(windowedValue.withValue(KeyedWorkItems.workItem(((KeyedWorkItem) windowedValue.getValue()).key(), ((KeyedWorkItem) windowedValue.getValue()).timersIterable(), this.lateDataFilter.filter(((KeyedWorkItem) windowedValue.getValue()).key(), ((KeyedWorkItem) windowedValue.getValue()).elementsIterable()))));
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public <KeyT> void onTimer(String str, String str2, KeyT keyt, BoundedWindow boundedWindow, Instant instant, Instant instant2, TimeDomain timeDomain) {
        this.doFnRunner.onTimer(str, str2, keyt, boundedWindow, instant, instant2, timeDomain);
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public void finishBundle() {
        this.doFnRunner.finishBundle();
    }

    @Override // org.apache.beam.runners.core.DoFnRunner
    public <KeyT> void onWindowExpiration(BoundedWindow boundedWindow, Instant instant, KeyT keyt) {
        this.doFnRunner.onWindowExpiration(boundedWindow, instant, keyt);
    }
}
