package org.apache.nemo.compiler.frontend.beam.transform;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.common.punctuation.LatencyMark;
import org.apache.nemo.compiler.frontend.beam.InMemorySideInputReader;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.class */
public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractDoFnTransform.class.getName());
    private final TupleTag<OutputT> mainOutputTag;
    private final List<TupleTag<?>> additionalOutputTags;
    private final Map<Integer, PCollectionView<?>> sideInputs;
    private final WindowingStrategy<?, ?> windowingStrategy;
    private final DoFn<InterT, OutputT> doFn;
    private final SerializablePipelineOptions serializedOptions;
    private final Coder<InputT> inputCoder;
    private final Map<TupleTag<?>, Coder<?>> outputCoders;
    private transient OutputCollector<WindowedValue<OutputT>> outputCollector;
    private transient DoFnRunner<InterT, OutputT> doFnRunner;
    private transient PushbackSideInputDoFnRunner<InterT, OutputT> pushBackRunner;
    private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
    private transient DoFnRunners.OutputManager outputManager;
    private transient InMemorySideInputReader sideInputReader;
    private transient long bundleSize;
    private transient long bundleMillis;
    private long prevBundleStartTime;
    private long currBundleCount = 0;
    private boolean bundleFinished = true;
    private final DisplayData displayData;
    private final DoFnSchemaInformation doFnSchemaInformation;
    private final Map<String, PCollectionView<?>> sideInputMapping;

    public AbstractDoFnTransform(DoFn<InterT, OutputT> doFn, Coder<InputT> coder, Map<TupleTag<?>, Coder<?>> map, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> map2, PipelineOptions pipelineOptions, DisplayData displayData, DoFnSchemaInformation doFnSchemaInformation, Map<String, PCollectionView<?>> map3) {
        this.doFn = doFn;
        this.inputCoder = coder;
        this.outputCoders = map;
        this.mainOutputTag = tupleTag;
        this.additionalOutputTags = list;
        this.sideInputs = map2;
        this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
        this.windowingStrategy = windowingStrategy;
        this.displayData = displayData;
        this.doFnSchemaInformation = doFnSchemaInformation;
        this.sideInputMapping = map3;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final Map<Integer, PCollectionView<?>> getSideInputs() {
        return this.sideInputs;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final DoFnRunners.OutputManager getOutputManager() {
        return this.outputManager;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final WindowingStrategy getWindowingStrategy() {
        return this.windowingStrategy;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final TupleTag<OutputT> getMainOutputTag() {
        return this.mainOutputTag;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final DoFnRunner<InterT, OutputT> getDoFnRunner() {
        return this.doFnRunner;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final PushbackSideInputDoFnRunner<InterT, OutputT> getPushBackRunner() {
        return this.pushBackRunner;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final InMemorySideInputReader getSideInputReader() {
        return this.sideInputReader;
    }

    public final DoFn<InterT, OutputT> getDoFn() {
        return this.doFn;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void checkAndInvokeBundle() {
        if (this.bundleFinished) {
            this.bundleFinished = false;
            if (this.pushBackRunner == null) {
                this.doFnRunner.startBundle();
            } else {
                this.pushBackRunner.startBundle();
            }
            this.prevBundleStartTime = System.currentTimeMillis();
            this.currBundleCount = 0L;
        }
        this.currBundleCount++;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void checkAndFinishBundle() {
        if (this.bundleFinished) {
            return;
        }
        if (this.currBundleCount >= this.bundleSize || System.currentTimeMillis() - this.prevBundleStartTime >= this.bundleMillis) {
            this.bundleFinished = true;
            if (this.pushBackRunner == null) {
                this.doFnRunner.finishBundle();
            } else {
                this.pushBackRunner.finishBundle();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void forceFinishBundle() {
        if (this.bundleFinished) {
            return;
        }
        this.bundleFinished = true;
        if (this.pushBackRunner == null) {
            this.doFnRunner.finishBundle();
        } else {
            this.pushBackRunner.finishBundle();
        }
    }

    public final void prepare(Transform.Context context, OutputCollector<WindowedValue<OutputT>> outputCollector) {
        NemoPipelineOptions nemoPipelineOptions = (NemoPipelineOptions) this.serializedOptions.get().as(NemoPipelineOptions.class);
        this.outputCollector = wrapOutputCollector(outputCollector);
        this.bundleMillis = nemoPipelineOptions.getMaxBundleTimeMills().longValue();
        this.bundleSize = nemoPipelineOptions.getMaxBundleSize().longValue();
        this.outputManager = new DefaultOutputManager(this.outputCollector, this.mainOutputTag);
        this.sideInputReader = new InMemorySideInputReader(new ArrayList(this.sideInputs.values()));
        StepContext stepContext = new StepContext() { // from class: org.apache.nemo.compiler.frontend.beam.transform.AbstractDoFnTransform.1
            public StateInternals stateInternals() {
                throw new UnsupportedOperationException("Not support stateInternals in DoFnTransform");
            }

            public TimerInternals timerInternals() {
                throw new UnsupportedOperationException("Not support timerInternals in DoFnTransform");
            }
        };
        DoFn wrapDoFn = wrapDoFn(this.doFn);
        this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(wrapDoFn);
        this.doFnRunner = DoFnRunners.simpleRunner(nemoPipelineOptions, wrapDoFn, this.sideInputReader, this.outputManager, this.mainOutputTag, this.additionalOutputTags, stepContext, this.inputCoder, this.outputCoders, this.windowingStrategy, this.doFnSchemaInformation, this.sideInputMapping);
        this.pushBackRunner = this.sideInputs.isEmpty() ? null : SimplePushbackSideInputDoFnRunner.create(this.doFnRunner, this.sideInputs.values(), this.sideInputReader);
    }

    public final OutputCollector<WindowedValue<OutputT>> getOutputCollector() {
        return this.outputCollector;
    }

    public void onLatencymark(LatencyMark latencyMark) {
        getOutputCollector().emitLatencymark(latencyMark);
    }

    public final void close() {
        beforeClose();
        forceFinishBundle();
        this.doFnInvoker.invokeTeardown();
    }

    public final String toString() {
        return getClass().getSimpleName() + " / " + this.displayData.toString().replace(":", " / ");
    }

    abstract DoFn wrapDoFn(DoFn doFn);

    abstract OutputCollector wrapOutputCollector(OutputCollector outputCollector);

    abstract void beforeClose();
}
