package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.CachedSideInputReader;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.ParDoTranslation;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.joda.time.Instant;
import scala.Serializable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.class */
public abstract class DoFnRunnerFactory<InT, T> implements Serializable {

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory$DoFnRunnerWithTeardown.class */
    interface DoFnRunnerWithTeardown<InT, T> extends DoFnRunner<InT, T> {
        void teardown();
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory$FusedRunnerFactory.class */
    private static class FusedRunnerFactory<InT, T> extends DoFnRunnerFactory<InT, T> {
        private final List<DoFnRunnerFactory<?, ?>> factories;

        /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory$FusedRunnerFactory$FusedRunner.class */
        private static class FusedRunner<InT, T> implements DoFnRunnerWithTeardown<InT, T> {
            final DoFnRunnerWithTeardown<?, ?>[] runners;

            /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory$FusedRunnerFactory$FusedRunner$FusedOutput.class */
            private static class FusedOutput implements DoFnRunners.OutputManager {
                final DoFnRunnerWithTeardown<?, ?> runner;

                FusedOutput(DoFnRunnerWithTeardown<?, ?> doFnRunnerWithTeardown) {
                    this.runner = doFnRunnerWithTeardown;
                }

                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    this.runner.processElement(windowedValue);
                }
            }

            FusedRunner(PipelineOptions pipelineOptions, MetricsAccumulator metricsAccumulator, DoFnRunners.OutputManager outputManager, List<DoFnRunnerFactory<?, ?>> list) {
                this.runners = new DoFnRunnerWithTeardown[list.size()];
                this.runners[this.runners.length - 1] = list.get(this.runners.length - 1).create(pipelineOptions, metricsAccumulator, outputManager);
                for (int length = this.runners.length - 2; length >= 0; length--) {
                    this.runners[length] = list.get(length).create(pipelineOptions, metricsAccumulator, new FusedOutput(this.runners[length + 1]));
                }
            }

            public void startBundle() {
                for (int i = 0; i < this.runners.length; i++) {
                    this.runners[i].startBundle();
                }
            }

            public void processElement(WindowedValue<InT> windowedValue) {
                this.runners[0].processElement(windowedValue);
            }

            public <KeyT> void onTimer(String str, String str2, KeyT keyt, BoundedWindow boundedWindow, Instant instant, Instant instant2, TimeDomain timeDomain) {
                throw new UnsupportedOperationException();
            }

            public <KeyT> void onWindowExpiration(BoundedWindow boundedWindow, Instant instant, KeyT keyt) {
                throw new UnsupportedOperationException();
            }

            public void finishBundle() {
                for (int i = 0; i < this.runners.length; i++) {
                    this.runners[i].finishBundle();
                }
            }

            public DoFn<InT, T> getFn() {
                throw new UnsupportedOperationException();
            }

            @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnRunnerFactory.DoFnRunnerWithTeardown
            public void teardown() {
                for (int i = 0; i < this.runners.length; i++) {
                    this.runners[i].teardown();
                }
            }
        }

        FusedRunnerFactory(List<DoFnRunnerFactory<?, ?>> list) {
            this.factories = list;
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnRunnerFactory
        DoFnRunnerWithTeardown<InT, T> create(PipelineOptions pipelineOptions, MetricsAccumulator metricsAccumulator, DoFnRunners.OutputManager outputManager) {
            return new FusedRunner(pipelineOptions, metricsAccumulator, outputManager, this.factories);
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnRunnerFactory
        <T2> DoFnRunnerFactory<InT, T2> fuse(DoFnRunnerFactory<T, T2> doFnRunnerFactory) {
            this.factories.add(doFnRunnerFactory);
            return this;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory$SimpleRunnerFactory.class */
    private static class SimpleRunnerFactory<InT, T> extends DoFnRunnerFactory<InT, T> {
        private final String stepName;
        private final DoFn<InT, T> doFn;
        private final DoFnSchemaInformation doFnSchema;
        private final Coder<InT> coder;
        private final WindowingStrategy<?, ?> windowingStrategy;
        private final TupleTag<T> mainOutput;
        private final List<TupleTag<?>> additionalOutputs;
        private final Map<TupleTag<?>, Coder<?>> outputCoders;
        private final Map<String, PCollectionView<?>> sideInputs;
        private final SideInputReader sideInputReader;
        private final boolean filterMainOutput;

        /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory$SimpleRunnerFactory$FilteredOutput.class */
        private static class FilteredOutput implements DoFnRunners.OutputManager {
            final DoFnRunners.OutputManager outputManager;
            final TupleTag<?> tupleTag;

            FilteredOutput(DoFnRunners.OutputManager outputManager, TupleTag<?> tupleTag) {
                this.outputManager = outputManager;
                this.tupleTag = tupleTag;
            }

            public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                if (this.tupleTag.equals(tupleTag)) {
                    this.outputManager.output(tupleTag, windowedValue);
                }
            }
        }

        SimpleRunnerFactory(AppliedPTransform<PCollection<? extends InT>, ?, ParDo.MultiOutput<InT, T>> appliedPTransform, PCollection<InT> pCollection, SideInputReader sideInputReader, boolean z) {
            this.stepName = appliedPTransform.getFullName();
            this.doFn = appliedPTransform.getTransform().getFn();
            this.doFnSchema = ParDoTranslation.getSchemaInformation(appliedPTransform);
            this.coder = pCollection.getCoder();
            this.windowingStrategy = pCollection.getWindowingStrategy();
            this.mainOutput = appliedPTransform.getTransform().getMainOutputTag();
            this.additionalOutputs = additionalOutputs(appliedPTransform.getTransform());
            this.outputCoders = coders(appliedPTransform.getOutputs(), this.mainOutput);
            this.sideInputs = appliedPTransform.getTransform().getSideInputs();
            this.sideInputReader = sideInputReader;
            this.filterMainOutput = z;
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnRunnerFactory
        <T2> DoFnRunnerFactory<InT, T2> fuse(DoFnRunnerFactory<T, T2> doFnRunnerFactory) {
            return new FusedRunnerFactory(Lists.newArrayList(new DoFnRunnerFactory[]{this, doFnRunnerFactory}));
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnRunnerFactory
        DoFnRunnerWithTeardown<InT, T> create(PipelineOptions pipelineOptions, MetricsAccumulator metricsAccumulator, DoFnRunners.OutputManager outputManager) {
            DoFnRunnerWithMetrics doFnRunnerWithMetrics = new DoFnRunnerWithMetrics(this.stepName, DoFnRunners.simpleRunner(pipelineOptions, this.doFn, CachedSideInputReader.of(this.sideInputReader, this.sideInputs.values()), this.filterMainOutput ? new FilteredOutput(outputManager, this.mainOutput) : outputManager, this.mainOutput, this.additionalOutputs, new NoOpStepContext(), this.coder, this.outputCoders, this.windowingStrategy, this.doFnSchema, this.sideInputs), metricsAccumulator);
            DoFnInvokers.tryInvokeSetupFor(this.doFn, pipelineOptions);
            try {
                doFnRunnerWithMetrics.startBundle();
                return doFnRunnerWithMetrics;
            } catch (RuntimeException e) {
                DoFnInvokers.invokerFor(this.doFn).invokeTeardown();
                throw e;
            }
        }

        private static Map<TupleTag<?>, Coder<?>> coders(Map<TupleTag<?>, PCollection<?>> map, TupleTag<?> tupleTag) {
            if (map.size() == 1) {
                return Collections.singletonMap(tupleTag, ((PCollection) Iterables.getOnlyElement(map.values())).getCoder());
            }
            HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
            for (Map.Entry<TupleTag<?>, PCollection<?>> entry : map.entrySet()) {
                newHashMapWithExpectedSize.put(entry.getKey(), entry.getValue().getCoder());
            }
            return newHashMapWithExpectedSize;
        }

        private static List<TupleTag<?>> additionalOutputs(ParDo.MultiOutput<?, ?> multiOutput) {
            List all = multiOutput.getAdditionalOutputTags().getAll();
            return all.isEmpty() ? Collections.emptyList() : new ArrayList(all);
        }
    }

    DoFnRunnerFactory() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract DoFnRunnerWithTeardown<InT, T> create(PipelineOptions pipelineOptions, MetricsAccumulator metricsAccumulator, DoFnRunners.OutputManager outputManager);

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract <T2> DoFnRunnerFactory<InT, T2> fuse(DoFnRunnerFactory<T, T2> doFnRunnerFactory);

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <InT, T> DoFnRunnerFactory<InT, T> simple(AppliedPTransform<PCollection<? extends InT>, ?, ParDo.MultiOutput<InT, T>> appliedPTransform, PCollection<InT> pCollection, SideInputReader sideInputReader, boolean z) {
        return new SimpleRunnerFactory(appliedPTransform, pCollection, sideInputReader, z);
    }
}
