package com.google.cloud.dataflow.sdk.util;

import com.google.api.client.util.Preconditions;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Lists;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Maps;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Sets;
import com.google.cloud.dataflow.sdk.runners.worker.logging.DataflowWorkerLoggingMDC;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.state.StateInternals;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormat;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/util/DoFnRunner.class */
public class DoFnRunner<InputT, OutputT> {
    public final DoFn<InputT, OutputT> fn;
    public final DoFnContext<InputT, OutputT> context;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/DoFnRunner$DoFnContext.class */
    public static class DoFnContext<InputT, OutputT> extends DoFn<InputT, OutputT>.Context {
        private static final int MAX_SIDE_OUTPUTS = 1000;
        final PipelineOptions options;
        final DoFn<InputT, OutputT> fn;
        final SideInputReader sideInputReader;
        final OutputManager outputManager;
        final TupleTag<OutputT> mainOutputTag;
        final ExecutionContext.StepContext stepContext;
        final CounterSet.AddCounterMutator addCounterMutator;
        final WindowFn windowFn;
        private Set<TupleTag<?>> outputTags;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public DoFnContext(PipelineOptions pipelineOptions, DoFn<InputT, OutputT> doFn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, ExecutionContext.StepContext stepContext, CounterSet.AddCounterMutator addCounterMutator, WindowFn windowFn) {
            super();
            doFn.getClass();
            this.options = pipelineOptions;
            this.fn = doFn;
            this.sideInputReader = sideInputReader;
            this.outputManager = outputManager;
            this.mainOutputTag = tupleTag;
            this.outputTags = Sets.newHashSet();
            this.outputTags.add(tupleTag);
            Iterator<TupleTag<?>> it = list.iterator();
            while (it.hasNext()) {
                this.outputTags.add(it.next());
            }
            this.stepContext = stepContext;
            this.addCounterMutator = addCounterMutator;
            this.windowFn = windowFn;
            super.setupDelegateAggregators();
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
        public PipelineOptions getPipelineOptions() {
            return this.options;
        }

        <T> WindowedValue<T> makeWindowedValue(T t, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            if (instant == null) {
                instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
            }
            if (collection == null) {
                try {
                    WindowFn windowFn = this.windowFn;
                    WindowFn windowFn2 = this.windowFn;
                    windowFn2.getClass();
                    collection = windowFn.assignWindows(new WindowFn.AssignContext(windowFn2, instant) { // from class: com.google.cloud.dataflow.sdk.util.DoFnRunner.DoFnContext.1
                        final /* synthetic */ Instant val$inputTimestamp;

                        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                        {
                            super();
                            this.val$inputTimestamp = instant;
                            windowFn2.getClass();
                        }

                        @Override // com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn.AssignContext
                        public Object element() {
                            throw new UnsupportedOperationException("WindowFn attempted to access input element when none was available");
                        }

                        @Override // com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn.AssignContext
                        public Instant timestamp() {
                            if (this.val$inputTimestamp == null) {
                                throw new UnsupportedOperationException("WindowFn attempted to access input timestamp when none was available");
                            }
                            return this.val$inputTimestamp;
                        }

                        @Override // com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn.AssignContext
                        public Collection<? extends BoundedWindow> windows() {
                            throw new UnsupportedOperationException("WindowFn attempted to access input windows when none were available");
                        }
                    });
                } catch (Exception e) {
                    Throwables.propagateIfInstanceOf(e, UserCodeException.class);
                    throw new UserCodeException(e);
                }
            }
            return WindowedValue.of(t, instant, collection, paneInfo);
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v6, types: [com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow] */
        public <T> T sideInput(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
            if (!this.sideInputReader.contains(pCollectionView)) {
                throw new IllegalArgumentException("calling sideInput() with unknown view");
            }
            return (T) this.sideInputReader.get(pCollectionView, pCollectionView.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(boundedWindow));
        }

        void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            outputWindowedValue(makeWindowedValue(outputt, instant, collection, paneInfo));
        }

        void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
            this.outputManager.output(this.mainOutputTag, windowedValue);
            if (this.stepContext != null) {
                this.stepContext.noteOutput(windowedValue);
            }
        }

        protected <T> void sideOutputWindowedValue(TupleTag<T> tupleTag, T t, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            sideOutputWindowedValue(tupleTag, makeWindowedValue(t, instant, collection, paneInfo));
        }

        protected <T> void sideOutputWindowedValue(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            if (!this.outputTags.contains(tupleTag)) {
                if (this.outputTags.size() >= 1000) {
                    throw new IllegalArgumentException("the number of side outputs has exceeded a limit of 1000");
                }
                this.outputTags.add(tupleTag);
            }
            this.outputManager.output(tupleTag, windowedValue);
            if (this.stepContext != null) {
                this.stepContext.noteSideOutput(tupleTag, windowedValue);
            }
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
        public void output(OutputT outputt) {
            outputWindowedValue(outputt, null, null, PaneInfo.NO_FIRING);
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
        public void outputWithTimestamp(OutputT outputt, Instant instant) {
            outputWindowedValue(outputt, instant, null, PaneInfo.NO_FIRING);
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
        public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
            Preconditions.checkNotNull(tupleTag, "TupleTag passed to sideOutput cannot be null");
            sideOutputWindowedValue(tupleTag, t, null, null, PaneInfo.NO_FIRING);
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
        public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
            Preconditions.checkNotNull(tupleTag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
            sideOutputWindowedValue(tupleTag, t, instant, null, PaneInfo.NO_FIRING);
        }

        private String generateInternalAggregatorName(String str) {
            String str2 = this.fn.getClass().isAnnotationPresent(SystemDoFnInternal.class) ? "" : "user-";
            String stepName = this.stepContext.getStepName();
            return new StringBuilder(1 + String.valueOf(str2).length() + String.valueOf(stepName).length() + String.valueOf(str).length()).append(str2).append(stepName).append("-").append(str).toString();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
        public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String str, Combine.CombineFn<AggInputT, ?, AggOutputT> combineFn) {
            Preconditions.checkNotNull(combineFn, "Combiner passed to createAggregator cannot be null");
            return new CounterAggregator(generateInternalAggregatorName(str), combineFn, this.addCounterMutator);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/DoFnRunner$DoFnProcessContext.class */
    public static class DoFnProcessContext<InputT, OutputT> extends DoFn<InputT, OutputT>.ProcessContext {
        final DoFn<InputT, OutputT> fn;
        final DoFnContext<InputT, OutputT> context;
        final WindowedValue<InputT> windowedValue;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public DoFnProcessContext(DoFn<InputT, OutputT> doFn, DoFnContext<InputT, OutputT> doFnContext, WindowedValue<InputT> windowedValue) {
            super();
            doFn.getClass();
            this.fn = doFn;
            this.context = doFnContext;
            this.windowedValue = windowedValue;
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
        public PipelineOptions getPipelineOptions() {
            return this.context.getPipelineOptions();
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext
        public InputT element() {
            return this.windowedValue.getValue();
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext
        public <T> T sideInput(PCollectionView<T> pCollectionView) {
            BoundedWindow next;
            Preconditions.checkNotNull(pCollectionView, "View passed to sideInput cannot be null");
            Iterator<? extends BoundedWindow> it = windows().iterator();
            if (it.hasNext()) {
                next = it.next();
                if (it.hasNext()) {
                    throw new IllegalStateException("sideInput called when main input element is in multiple windows");
                }
            } else {
                if (!(this.context.windowFn instanceof GlobalWindows)) {
                    throw new IllegalStateException("sideInput called when main input element is not in any windows");
                }
                next = GlobalWindow.INSTANCE;
            }
            return (T) this.context.sideInput(pCollectionView, next);
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext
        public BoundedWindow window() {
            if (this.fn instanceof DoFn.RequiresWindowAccess) {
                return (BoundedWindow) Iterables.getOnlyElement(windows());
            }
            throw new UnsupportedOperationException("window() is only available in the context of a DoFn marked as RequiresWindow.");
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext
        public PaneInfo pane() {
            return this.windowedValue.getPane();
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
        public void output(OutputT outputt) {
            this.context.outputWindowedValue(this.windowedValue.withValue(outputt));
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
        public void outputWithTimestamp(OutputT outputt, Instant instant) {
            checkTimestamp(instant);
            this.context.outputWindowedValue(outputt, instant, this.windowedValue.getWindows(), this.windowedValue.getPane());
        }

        void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            this.context.outputWindowedValue(outputt, instant, collection, paneInfo);
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
        public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
            Preconditions.checkNotNull(tupleTag, "Tag passed to sideOutput cannot be null");
            this.context.sideOutputWindowedValue(tupleTag, this.windowedValue.withValue(t));
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
        public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
            Preconditions.checkNotNull(tupleTag, "Tag passed to sideOutputWithTimestamp cannot be null");
            checkTimestamp(instant);
            this.context.sideOutputWindowedValue(tupleTag, t, instant, this.windowedValue.getWindows(), this.windowedValue.getPane());
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext
        public Instant timestamp() {
            return this.windowedValue.getTimestamp();
        }

        public Collection<? extends BoundedWindow> windows() {
            return this.windowedValue.getWindows();
        }

        private void checkTimestamp(Instant instant) {
            if (instant.isBefore(this.windowedValue.getTimestamp().minus(this.fn.getAllowedTimestampSkew()))) {
                throw new IllegalArgumentException(String.format("Cannot output with timestamp %s. Output timestamps must be no earlier than the timestamp of the current input (%s) minus the allowed skew (%s). See the DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.", instant, this.windowedValue.getTimestamp(), PeriodFormat.getDefault().print(this.fn.getAllowedTimestampSkew().toPeriod())));
            }
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.ProcessContext
        public WindowingInternals<InputT, OutputT> windowingInternals() {
            return new WindowingInternals<InputT, OutputT>() { // from class: com.google.cloud.dataflow.sdk.util.DoFnRunner.DoFnProcessContext.1
                @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
                public void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                    DoFnProcessContext.this.context.outputWindowedValue(outputt, instant, collection, paneInfo);
                }

                @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
                public Collection<? extends BoundedWindow> windows() {
                    return DoFnProcessContext.this.windowedValue.getWindows();
                }

                @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
                public PaneInfo pane() {
                    return DoFnProcessContext.this.windowedValue.getPane();
                }

                @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
                public TimerInternals timerInternals() {
                    return DoFnProcessContext.this.context.stepContext.timerInternals();
                }

                @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
                public <T> void writePCollectionViewData(TupleTag<?> tupleTag, Iterable<WindowedValue<T>> iterable, Coder<T> coder) throws IOException {
                    Coder windowCoder = DoFnProcessContext.this.context.windowFn.windowCoder();
                    DoFnProcessContext.this.context.stepContext.writePCollectionViewData(tupleTag, iterable, IterableCoder.of(WindowedValue.getFullCoder(coder, windowCoder)), DoFnProcessContext.this.window(), windowCoder);
                }

                @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
                public StateInternals stateInternals() {
                    return DoFnProcessContext.this.context.stepContext.stateInternals();
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn.Context
        public <InputT, OutputT> Aggregator<InputT, OutputT> createAggregatorInternal(String str, Combine.CombineFn<InputT, ?, OutputT> combineFn) {
            return (Aggregator<InputT, OutputT>) this.context.createAggregatorInternal(str, combineFn);
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/DoFnRunner$ListOutputManager.class */
    public static class ListOutputManager implements OutputManager {
        private Map<TupleTag<?>, List<WindowedValue<?>>> outputLists = Maps.newHashMap();

        @Override // com.google.cloud.dataflow.sdk.util.DoFnRunner.OutputManager
        public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            List<WindowedValue<?>> list = this.outputLists.get(tupleTag);
            if (list == null) {
                list = Lists.newArrayList();
                this.outputLists.put(tupleTag, list);
            }
            list.add(windowedValue);
        }

        public <T> List<WindowedValue<T>> getOutput(TupleTag<T> tupleTag) {
            List<WindowedValue<T>> list = (List) this.outputLists.get(tupleTag);
            return list != null ? list : Collections.emptyList();
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/DoFnRunner$OutputManager.class */
    public interface OutputManager {
        <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DoFnRunner(PipelineOptions pipelineOptions, DoFn<InputT, OutputT> doFn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, ExecutionContext.StepContext stepContext, CounterSet.AddCounterMutator addCounterMutator, WindowingStrategy<?, ?> windowingStrategy) {
        this.fn = doFn;
        this.context = new DoFnContext<>(pipelineOptions, doFn, sideInputReader, outputManager, tupleTag, list, stepContext, addCounterMutator, windowingStrategy == null ? null : windowingStrategy.getWindowFn());
    }

    public static <InputT, OutputT> DoFnRunner<InputT, OutputT> create(PipelineOptions pipelineOptions, DoFn<InputT, OutputT> doFn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, ExecutionContext.StepContext stepContext, CounterSet.AddCounterMutator addCounterMutator, WindowingStrategy<?, ?> windowingStrategy) {
        return new DoFnRunner<>(pipelineOptions, doFn, sideInputReader, outputManager, tupleTag, list, stepContext, addCounterMutator, windowingStrategy);
    }

    public void startBundle() {
        try {
            this.fn.startBundle(this.context);
        } catch (Throwable th) {
            Throwables.propagateIfInstanceOf(th, UserCodeException.class);
            throw new UserCodeException(th);
        }
    }

    public void processElement(WindowedValue<InputT> windowedValue) {
        String stepName = DataflowWorkerLoggingMDC.getStepName();
        DataflowWorkerLoggingMDC.setStepName(this.context.stepContext.getStepName());
        try {
            if (windowedValue.getWindows().size() <= 1 || (!DoFn.RequiresWindowAccess.class.isAssignableFrom(this.fn.getClass()) && this.context.sideInputReader.isEmpty())) {
                invokeProcessElement(windowedValue);
            } else {
                Iterator<? extends BoundedWindow> it = windowedValue.getWindows().iterator();
                while (it.hasNext()) {
                    invokeProcessElement(WindowedValue.of(windowedValue.getValue(), windowedValue.getTimestamp(), it.next(), windowedValue.getPane()));
                }
            }
        } finally {
            DataflowWorkerLoggingMDC.setStepName(stepName);
        }
    }

    protected void invokeProcessElement(WindowedValue<InputT> windowedValue) {
        try {
            this.fn.processElement(createProcessContext(windowedValue));
        } catch (Throwable th) {
            Throwables.propagateIfInstanceOf(th, UserCodeException.class);
            throw new UserCodeException(th);
        }
    }

    public void finishBundle() {
        try {
            this.fn.finishBundle(this.context);
        } catch (Throwable th) {
            Throwables.propagateIfInstanceOf(th, UserCodeException.class);
            throw new UserCodeException(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DoFn<InputT, OutputT>.ProcessContext createProcessContext(WindowedValue<InputT> windowedValue) {
        return new DoFnProcessContext(this.fn, this.context, windowedValue);
    }
}
