package org.apache.beam.fn.harness;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.beam.fn.harness.DoFnPTransformRunnerFactory;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.state.FnApiStateAccessor;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.SplittableProcessElementInvoker;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.util.Timestamps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/fn/harness/SplittableProcessElementsRunner.class */
public class SplittableProcessElementsRunner<InputT, RestrictionT, OutputT> implements DoFnPTransformRunnerFactory.DoFnPTransformRunner<KV<InputT, RestrictionT>> {
    private final DoFnPTransformRunnerFactory.Context<InputT, OutputT> context;
    private final String mainInputId;
    private final Coder<WindowedValue<KV<InputT, RestrictionT>>> inputCoder;
    private final Collection<FnDataReceiver<WindowedValue<OutputT>>> mainOutputConsumers;
    private final DoFnInvoker<InputT, OutputT> doFnInvoker;
    private final ScheduledExecutorService executor;
    private FnApiStateAccessor stateAccessor;
    private final DoFn<InputT, OutputT>.StartBundleContext startBundleContext;
    private final DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext;

    /* loaded from: input_file:org/apache/beam/fn/harness/SplittableProcessElementsRunner$Factory.class */
    static class Factory<InputT, RestrictionT, OutputT> extends DoFnPTransformRunnerFactory<KV<InputT, RestrictionT>, InputT, OutputT, SplittableProcessElementsRunner<InputT, RestrictionT, OutputT>> {
        Factory() {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.beam.fn.harness.DoFnPTransformRunnerFactory
        public SplittableProcessElementsRunner<InputT, RestrictionT, OutputT> createRunner(DoFnPTransformRunnerFactory.Context<InputT, OutputT> context) {
            return new SplittableProcessElementsRunner<>(context, WindowedValue.FullWindowedValueCoder.of(context.inputCoder, context.windowCoder), context.localNameToConsumer.get((ListMultimap<String, FnDataReceiver<WindowedValue<?>>>) context.mainOutputTag.getId()), (String) Iterables.getOnlyElement(context.pTransform.getInputsMap().keySet()));
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/SplittableProcessElementsRunner$Registrar.class */
    public static class Registrar implements PTransformRunnerFactory.Registrar {
        @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of(PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN, new Factory());
        }
    }

    SplittableProcessElementsRunner(DoFnPTransformRunnerFactory.Context<InputT, OutputT> context, Coder<WindowedValue<KV<InputT, RestrictionT>>> coder, Collection<FnDataReceiver<WindowedValue<OutputT>>> collection, String str) {
        this.context = context;
        this.mainInputId = str;
        this.inputCoder = coder;
        this.mainOutputConsumers = collection;
        this.doFnInvoker = DoFnInvokers.invokerFor(context.doFn);
        this.doFnInvoker.invokeSetup();
        this.executor = Executors.newSingleThreadScheduledExecutor();
        DoFn<InputT, OutputT> doFn = context.doFn;
        Objects.requireNonNull(doFn);
        this.startBundleContext = new DoFn<InputT, OutputT>.StartBundleContext(doFn, context) { // from class: org.apache.beam.fn.harness.SplittableProcessElementsRunner.1
            final /* synthetic */ DoFnPTransformRunnerFactory.Context val$context;

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                this.val$context = context;
                Objects.requireNonNull(doFn);
            }

            @Override // org.apache.beam.sdk.transforms.DoFn.StartBundleContext
            public PipelineOptions getPipelineOptions() {
                return this.val$context.pipelineOptions;
            }
        };
        DoFn<InputT, OutputT> doFn2 = context.doFn;
        Objects.requireNonNull(doFn2);
        this.finishBundleContext = new DoFn<InputT, OutputT>.FinishBundleContext(doFn2, context) { // from class: org.apache.beam.fn.harness.SplittableProcessElementsRunner.2
            final /* synthetic */ DoFnPTransformRunnerFactory.Context val$context;

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                this.val$context = context;
                Objects.requireNonNull(doFn2);
            }

            @Override // org.apache.beam.sdk.transforms.DoFn.FinishBundleContext
            public PipelineOptions getPipelineOptions() {
                return this.val$context.pipelineOptions;
            }

            @Override // org.apache.beam.sdk.transforms.DoFn.FinishBundleContext
            public void output(OutputT outputt, Instant instant, BoundedWindow boundedWindow) {
                throw new UnsupportedOperationException();
            }

            @Override // org.apache.beam.sdk.transforms.DoFn.FinishBundleContext
            public <T> void output(TupleTag<T> tupleTag, T t, Instant instant, BoundedWindow boundedWindow) {
                throw new UnsupportedOperationException();
            }
        };
    }

    @Override // org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.DoFnPTransformRunner
    public void startBundle() {
        this.doFnInvoker.invokeStartBundle(this.startBundleContext);
    }

    @Override // org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.DoFnPTransformRunner
    public void processElement(WindowedValue<KV<InputT, RestrictionT>> windowedValue) {
        processElementTyped(windowedValue);
    }

    private <PositionT> void processElementTyped(WindowedValue<KV<InputT, RestrictionT>> windowedValue) {
        Preconditions.checkArgument(windowedValue.getWindows().size() == 1, "SPLITTABLE_PROCESS_ELEMENTS expects its input to be in 1 window, but got %s windows", windowedValue.getWindows().size());
        WindowedValue<NewT> withValue = windowedValue.withValue(windowedValue.getValue().getKey());
        BoundedWindow next = windowedValue.getWindows().iterator().next();
        this.stateAccessor = new FnApiStateAccessor(this.context.pipelineOptions, this.context.ptransformId, this.context.processBundleInstructionId, this.context.tagToSideInputSpecMap, this.context.beamFnStateClient, this.context.keyCoder, this.context.windowCoder, () -> {
            return windowedValue;
        }, () -> {
            return next;
        });
        RestrictionTracker<RestrictionT, PositionT> invokeNewTracker = this.doFnInvoker.invokeNewTracker(windowedValue.getValue().getValue());
        SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT>.Result invokeProcessElement = new OutputAndTimeBoundedSplittableProcessElementInvoker(this.context.doFn, this.context.pipelineOptions, new OutputWindowedValue<OutputT>() { // from class: org.apache.beam.fn.harness.SplittableProcessElementsRunner.3
            @Override // org.apache.beam.runners.core.OutputWindowedValue
            public void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                SplittableProcessElementsRunner.this.outputTo(SplittableProcessElementsRunner.this.mainOutputConsumers, WindowedValue.of(outputt, instant, collection, paneInfo));
            }

            @Override // org.apache.beam.runners.core.OutputWindowedValue
            public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                List<FnDataReceiver<WindowedValue<?>>> list = SplittableProcessElementsRunner.this.context.localNameToConsumer.get((ListMultimap<String, FnDataReceiver<WindowedValue<?>>>) tupleTag.getId());
                if (list == null) {
                    throw new IllegalArgumentException(String.format("Unknown output tag %s", tupleTag));
                }
                SplittableProcessElementsRunner.this.outputTo(list, WindowedValue.of(additionaloutputt, instant, collection, paneInfo));
            }
        }, this.stateAccessor, this.executor, 10000, Duration.standardSeconds(10L)).invokeProcessElement(this.doFnInvoker, withValue, invokeNewTracker);
        this.stateAccessor = null;
        if (invokeProcessElement.getContinuation().shouldResume()) {
            WindowedValue<KV<InputT, RestrictionT>> withValue2 = withValue.withValue(KV.of(withValue.getValue(), invokeNewTracker.currentRestriction()));
            WindowedValue<KV<InputT, RestrictionT>> withValue3 = withValue.withValue(KV.of(withValue.getValue(), invokeProcessElement.getResidualRestriction()));
            ByteString.Output newOutput = ByteString.newOutput();
            ByteString.Output newOutput2 = ByteString.newOutput();
            try {
                this.inputCoder.encode(withValue2, newOutput);
                this.inputCoder.encode(withValue3, newOutput2);
                this.context.splitListener.split(ImmutableList.of(BeamFnApi.BundleApplication.newBuilder().setTransformId(this.context.ptransformId).setInputId(this.mainInputId).setElement(newOutput.toByteString()).build()), ImmutableList.of(BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(BeamFnApi.BundleApplication.newBuilder().setTransformId(this.context.ptransformId).setInputId(this.mainInputId).setElement(newOutput2.toByteString()).build()).setRequestedExecutionTime(Timestamps.fromMillis(System.currentTimeMillis() + invokeProcessElement.getContinuation().resumeDelay().getMillis())).build()));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override // org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.DoFnPTransformRunner
    public void processTimer(String str, TimeDomain timeDomain, WindowedValue<KV<Object, Timer>> windowedValue) {
        throw new UnsupportedOperationException("Timers are unsupported in a SplittableDoFn.");
    }

    @Override // org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.DoFnPTransformRunner
    public void finishBundle() {
        this.doFnInvoker.invokeFinishBundle(this.finishBundleContext);
    }

    @Override // org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.DoFnPTransformRunner
    public void tearDown() {
        this.doFnInvoker.invokeTeardown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void outputTo(Collection<FnDataReceiver<WindowedValue<T>>> collection, WindowedValue<T> windowedValue) {
        try {
            Iterator<FnDataReceiver<WindowedValue<T>>> it = collection.iterator();
            while (it.hasNext()) {
                it.next().accept(windowedValue);
            }
        } catch (Throwable th) {
            throw UserCodeException.wrap(th);
        }
    }
}
