package org.apache.beam.runners.direct;

import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.beam.repackaged.direct_java.runners.core.DoFnRunners;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.direct_java.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
import org.apache.beam.repackaged.direct_java.runners.core.OutputWindowedValue;
import org.apache.beam.repackaged.direct_java.runners.core.ProcessFnRunner;
import org.apache.beam.repackaged.direct_java.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.repackaged.direct_java.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.ParDoEvaluator;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.class */
public class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> implements TransformEvaluatorFactory {
    private final ParDoEvaluatorFactory<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> delegateFactory;
    private final ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setThreadFactory(MoreExecutors.platformThreadFactory()).setNameFormat("direct-splittable-process-element-checkpoint-executor_" + hashCode()).build());
    private final EvaluationContext evaluationContext;
    private final PipelineOptions options;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext, final PipelineOptions pipelineOptions) {
        this.evaluationContext = evaluationContext;
        this.options = pipelineOptions;
        this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext, processFnRunnerFactory(), new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() { // from class: org.apache.beam.runners.direct.SplittableProcessElementsEvaluatorFactory.1
            @Override // org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader
            public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> appliedPTransform) {
                Preconditions.checkArgument(SplittableParDoViaKeyedWorkItems.ProcessElements.class.isInstance(appliedPTransform.getTransform()), "No know extraction of the fn from " + appliedPTransform);
                SplittableParDoViaKeyedWorkItems.ProcessElements processElements = (SplittableParDoViaKeyedWorkItems.ProcessElements) appliedPTransform.getTransform();
                return DoFnLifecycleManager.of(processElements.newProcessFn(processElements.getFn()), pipelineOptions);
            }
        }, pipelineOptions);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public <T> TransformEvaluator<T> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, CommittedBundle<?> committedBundle) throws Exception {
        return createEvaluator(appliedPTransform, committedBundle);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() throws Exception {
        this.ses.shutdownNow();
        this.delegateFactory.cleanup();
    }

    private TransformEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> createEvaluator(AppliedPTransform<PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple, SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>> appliedPTransform, CommittedBundle<InputT> committedBundle) throws Exception {
        final SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> transform = appliedPTransform.getTransform();
        DoFnLifecycleManagerRemovingTransformEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> createEvaluator = this.delegateFactory.createEvaluator(appliedPTransform, committedBundle.getPCollection(), committedBundle.getKey(), appliedPTransform.getTransform().getSideInputs(), appliedPTransform.getTransform().getMainOutputTag(), appliedPTransform.getTransform().getAdditionalOutputTags().getAll(), DoFnSchemaInformation.create(), appliedPTransform.getTransform().getSideInputMapping());
        final ParDoEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> parDoEvaluator = createEvaluator.getParDoEvaluator();
        SplittableParDoViaKeyedWorkItems.ProcessFn processFn = (SplittableParDoViaKeyedWorkItems.ProcessFn) ((ProcessFnRunner) ProcessFnRunner.class.cast(parDoEvaluator.getFnRunner())).getFn();
        DirectExecutionContext.DirectStepContext stepContext = parDoEvaluator.getStepContext();
        processFn.setStateInternalsFactory(bArr -> {
            return stepContext.stateInternals();
        });
        processFn.setTimerInternalsFactory(bArr2 -> {
            return stepContext.timerInternals();
        });
        OutputWindowedValue<OutputT> outputWindowedValue = new OutputWindowedValue<OutputT>() { // from class: org.apache.beam.runners.direct.SplittableProcessElementsEvaluatorFactory.2
            private final DoFnRunners.OutputManager outputManager;

            {
                this.outputManager = parDoEvaluator.getOutputManager();
            }

            @Override // org.apache.beam.repackaged.direct_java.runners.core.OutputWindowedValue
            public void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                this.outputManager.output(transform.getMainOutputTag(), WindowedValue.of(outputt, instant, collection, paneInfo));
            }

            @Override // org.apache.beam.repackaged.direct_java.runners.core.OutputWindowedValue
            public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                this.outputManager.output(tupleTag, WindowedValue.of(additionaloutputt, instant, collection, paneInfo));
            }
        };
        ReadyCheckingSideInputReader createSideInputReader = this.evaluationContext.createSideInputReader(transform.getSideInputs());
        processFn.setSideInputReader(createSideInputReader);
        DoFn<InputT, OutputT> fn = transform.getFn();
        PipelineOptions pipelineOptions = this.options;
        ScheduledExecutorService scheduledExecutorService = this.ses;
        Duration standardSeconds = Duration.standardSeconds(1L);
        Objects.requireNonNull(stepContext);
        processFn.setProcessElementInvoker(new OutputAndTimeBoundedSplittableProcessElementInvoker(fn, pipelineOptions, outputWindowedValue, createSideInputReader, scheduledExecutorService, 100, standardSeconds, stepContext::bundleFinalizer));
        return createEvaluator;
    }

    private static <InputT, OutputT, RestrictionT> ParDoEvaluator.DoFnRunnerFactory<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> processFnRunnerFactory() {
        return (pipelineOptions, doFn, list, readyCheckingSideInputReader, outputManager, tupleTag, list2, directStepContext, coder, map, windowingStrategy, doFnSchemaInformation, map2) -> {
            return DoFnRunners.newProcessFnRunner((SplittableParDoViaKeyedWorkItems.ProcessFn) doFn, pipelineOptions, list, readyCheckingSideInputReader, outputManager, tupleTag, list2, directStepContext, coder, map, windowingStrategy, doFnSchemaInformation, map2);
        };
    }
}
