package org.apache.beam.runners.samza.runtime;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.BundleManager;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.util.DoFnUtils;
import org.apache.beam.runners.samza.util.FutureUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/samza/runtime/DoFnOp.class */
public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(DoFnOp.class);
    private final TupleTag<FnOutT> mainOutputTag;
    private final DoFn<InT, FnOutT> doFn;
    private final Coder<?> keyCoder;
    private final Collection<PCollectionView<?>> sideInputs;
    private final List<TupleTag<?>> sideOutputTags;
    private final WindowingStrategy windowingStrategy;
    private final OutputManagerFactory<OutT> outputManagerFactory;
    private final HashMap<String, PCollectionView<?>> idToViewMap;
    private final String transformFullName;
    private final String transformId;
    private final Coder<InT> inputCoder;
    private final Coder<WindowedValue<InT>> windowedValueCoder;
    private final HashMap<TupleTag<?>, Coder<?>> outputCoders;
    private final PCollection.IsBounded isBounded;
    private final String bundleCheckTimerId;
    private final String bundleStateId;
    private final boolean isPortable;
    private final RunnerApi.ExecutableStagePayload stagePayload;
    private final JobInfo jobInfo;
    private final HashMap<String, TupleTag<?>> idToTupleTagMap;
    private transient SamzaTimerInternalsFactory<?> timerInternalsFactory;
    private transient DoFnRunner<InT, FnOutT> fnRunner;
    private transient PushbackSideInputDoFnRunner<InT, FnOutT> pushbackFnRunner;
    private transient SideInputHandler sideInputHandler;
    private transient DoFnInvoker<InT, FnOutT> doFnInvoker;
    private transient SamzaPipelineOptions samzaPipelineOptions;

    @SuppressWarnings(justification = "No bug", value = {"SE_TRANSIENT_FIELD_NOT_RESTORED"})
    private transient Instant pushbackWatermarkHold;
    private transient Instant inputWatermark;
    private transient BundleManager<OutT> bundleManager;
    private transient Instant sideInputWatermark;
    private transient List<WindowedValue<InT>> pushbackValues;
    private transient ExecutableStageContext stageContext;
    private transient StageBundleFactory stageBundleFactory;
    private transient boolean bundleDisabled;
    private final DoFnSchemaInformation doFnSchemaInformation;
    private final Map<?, PCollectionView<?>> sideInputMapping;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/DoFnOp$FutureCollectorImpl.class */
    public static class FutureCollectorImpl<OutT> implements FutureCollector<OutT> {
        private final List<CompletionStage<WindowedValue<OutT>>> outputFutures = Collections.synchronizedList(new ArrayList());
        private final AtomicBoolean collectorSealed = new AtomicBoolean(true);

        FutureCollectorImpl() {
        }

        @Override // org.apache.beam.runners.samza.runtime.FutureCollector
        public void add(CompletionStage<WindowedValue<OutT>> completionStage) {
            Preconditions.checkState(!this.collectorSealed.get(), "Cannot add elements to an unprepared collector. Make sure prepare() is invoked before adding elements.");
            this.outputFutures.add(completionStage);
        }

        @Override // org.apache.beam.runners.samza.runtime.FutureCollector
        public void discard() {
            this.collectorSealed.compareAndSet(false, true);
            this.outputFutures.clear();
        }

        @Override // org.apache.beam.runners.samza.runtime.FutureCollector
        public CompletionStage<Collection<WindowedValue<OutT>>> finish() {
            this.collectorSealed.compareAndSet(false, true);
            CompletionStage<Collection<WindowedValue<OutT>>> flattenFutures = FutureUtils.flattenFutures(this.outputFutures);
            this.outputFutures.clear();
            return flattenFutures;
        }

        @Override // org.apache.beam.runners.samza.runtime.FutureCollector
        public void prepare() {
            Preconditions.checkState(this.collectorSealed.compareAndSet(true, false), "Failed to prepare the collector. Collector needs to be sealed before prepare() is invoked.");
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/DoFnOp$MultiOutputManagerFactory.class */
    public static class MultiOutputManagerFactory implements OutputManagerFactory<RawUnionValue> {
        private final Map<TupleTag<?>, Integer> tagToIndexMap;

        public MultiOutputManagerFactory(Map<TupleTag<?>, Integer> map) {
            this.tagToIndexMap = map;
        }

        @Override // org.apache.beam.runners.samza.runtime.OutputManagerFactory
        public DoFnRunners.OutputManager create(OpEmitter<RawUnionValue> opEmitter) {
            return createOutputManager(opEmitter, null);
        }

        @Override // org.apache.beam.runners.samza.runtime.OutputManagerFactory
        public DoFnRunners.OutputManager create(OpEmitter<RawUnionValue> opEmitter, FutureCollector<RawUnionValue> futureCollector) {
            return createOutputManager(opEmitter, futureCollector);
        }

        private DoFnRunners.OutputManager createOutputManager(final OpEmitter<RawUnionValue> opEmitter, final FutureCollector<RawUnionValue> futureCollector) {
            return new DoFnRunners.OutputManager() { // from class: org.apache.beam.runners.samza.runtime.DoFnOp.MultiOutputManagerFactory.1
                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    int intValue = ((Integer) MultiOutputManagerFactory.this.tagToIndexMap.get(tupleTag)).intValue();
                    Object value = windowedValue.getValue();
                    if (!(value instanceof CompletionStage)) {
                        opEmitter.emitElement(windowedValue.withValue(new RawUnionValue(intValue, value)));
                    } else {
                        CompletionStage completionStage = (CompletionStage) value;
                        if (futureCollector != null) {
                            futureCollector.add(DoFnOp.createOutputFuture(windowedValue, completionStage, obj -> {
                                return new RawUnionValue(intValue, obj);
                            }));
                        }
                    }
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/DoFnOp$SingleOutputManagerFactory.class */
    public static class SingleOutputManagerFactory<OutT> implements OutputManagerFactory<OutT> {
        @Override // org.apache.beam.runners.samza.runtime.OutputManagerFactory
        public DoFnRunners.OutputManager create(OpEmitter<OutT> opEmitter) {
            return createOutputManager(opEmitter, null);
        }

        @Override // org.apache.beam.runners.samza.runtime.OutputManagerFactory
        public DoFnRunners.OutputManager create(OpEmitter<OutT> opEmitter, FutureCollector<OutT> futureCollector) {
            return createOutputManager(opEmitter, futureCollector);
        }

        private DoFnRunners.OutputManager createOutputManager(final OpEmitter<OutT> opEmitter, final FutureCollector<OutT> futureCollector) {
            return new DoFnRunners.OutputManager() { // from class: org.apache.beam.runners.samza.runtime.DoFnOp.SingleOutputManagerFactory.1
                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    if (!(windowedValue.getValue() instanceof CompletionStage)) {
                        opEmitter.emitElement(windowedValue);
                        return;
                    }
                    CompletionStage completionStage = (CompletionStage) windowedValue.getValue();
                    if (futureCollector != null) {
                        futureCollector.add(DoFnOp.createOutputFuture(windowedValue, completionStage, obj -> {
                            return obj;
                        }));
                    }
                }
            };
        }
    }

    public DoFnOp(TupleTag<FnOutT> tupleTag, DoFn<InT, FnOutT> doFn, Coder<?> coder, Coder<InT> coder2, Coder<WindowedValue<InT>> coder3, Map<TupleTag<?>, Coder<?>> map, Collection<PCollectionView<?>> collection, List<TupleTag<?>> list, WindowingStrategy windowingStrategy, Map<String, PCollectionView<?>> map2, OutputManagerFactory<OutT> outputManagerFactory, String str, String str2, PCollection.IsBounded isBounded, boolean z, RunnerApi.ExecutableStagePayload executableStagePayload, JobInfo jobInfo, Map<String, TupleTag<?>> map3, DoFnSchemaInformation doFnSchemaInformation, Map<?, PCollectionView<?>> map4) {
        this.mainOutputTag = tupleTag;
        this.doFn = doFn;
        this.sideInputs = collection;
        this.sideOutputTags = list;
        this.inputCoder = coder2;
        this.windowedValueCoder = coder3;
        this.outputCoders = new HashMap<>(map);
        this.windowingStrategy = windowingStrategy;
        this.idToViewMap = new HashMap<>(map2);
        this.outputManagerFactory = outputManagerFactory;
        this.transformFullName = str;
        this.transformId = str2;
        this.keyCoder = coder;
        this.isBounded = isBounded;
        this.isPortable = z;
        this.stagePayload = executableStagePayload;
        this.jobInfo = jobInfo;
        this.idToTupleTagMap = new HashMap<>(map3);
        this.bundleCheckTimerId = "_samza_bundle_check_" + str2;
        this.bundleStateId = "_samza_bundle_" + str2;
        this.doFnSchemaInformation = doFnSchemaInformation;
        this.sideInputMapping = map4;
    }

    @Override // org.apache.beam.runners.samza.runtime.Op
    public void open(Config config, Context context, Scheduler<KeyedTimerData<Void>> scheduler, OpEmitter<OutT> opEmitter) {
        this.inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
        this.sideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
        this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
        DoFnSignature signature = DoFnSignatures.getSignature(this.doFn.getClass());
        SamzaExecutionContext samzaExecutionContext = (SamzaExecutionContext) context.getApplicationContainerContext();
        this.samzaPipelineOptions = samzaExecutionContext.getPipelineOptions();
        this.bundleDisabled = this.samzaPipelineOptions.getMaxBundleSize() <= 1;
        String str = "pardo-" + this.transformId;
        SamzaStoreStateInternals.Factory createNonKeyedStateInternalsFactory = SamzaStoreStateInternals.createNonKeyedStateInternalsFactory(str, context.getTaskContext(), this.samzaPipelineOptions);
        FutureCollector<OutT> createFutureCollector = createFutureCollector();
        this.bundleManager = new BundleManager<>(createBundleProgressListener(), createFutureCollector, this.samzaPipelineOptions.getMaxBundleSize(), this.samzaPipelineOptions.getMaxBundleTimeMs(), scheduler, this.bundleCheckTimerId);
        this.timerInternalsFactory = SamzaTimerInternalsFactory.createTimerInternalFactory(this.keyCoder, scheduler, getTimerStateId(signature), createNonKeyedStateInternalsFactory, this.windowingStrategy, this.isBounded, this.samzaPipelineOptions);
        this.sideInputHandler = new SideInputHandler(this.sideInputs, createNonKeyedStateInternalsFactory.stateInternalsForKey(null));
        if (this.isPortable) {
            ExecutableStage fromPayload = ExecutableStage.fromPayload(this.stagePayload);
            this.stageContext = SamzaExecutableStageContextFactory.getInstance().get(this.jobInfo);
            this.stageBundleFactory = this.stageContext.getStageBundleFactory(fromPayload);
            this.fnRunner = SamzaDoFnRunners.createPortable(this.transformId, DoFnUtils.toStepName(fromPayload), this.bundleStateId, this.windowedValueCoder, fromPayload, this.sideInputMapping, this.sideInputHandler, createNonKeyedStateInternalsFactory, this.timerInternalsFactory, this.samzaPipelineOptions, this.outputManagerFactory.create(opEmitter, createFutureCollector), this.stageBundleFactory, samzaExecutionContext, this.mainOutputTag, this.idToTupleTagMap, context, this.transformFullName);
        } else {
            this.fnRunner = SamzaDoFnRunners.create(this.samzaPipelineOptions, this.doFn, this.windowingStrategy, this.transformFullName, str, context, this.mainOutputTag, this.sideInputHandler, this.timerInternalsFactory, this.keyCoder, this.outputManagerFactory.create(opEmitter, createFutureCollector), this.inputCoder, this.sideOutputTags, this.outputCoders, this.doFnSchemaInformation, this.sideInputMapping);
        }
        this.pushbackFnRunner = SimplePushbackSideInputDoFnRunner.create(this.fnRunner, this.sideInputs, this.sideInputHandler);
        this.pushbackValues = new ArrayList();
        Iterator it = ServiceLoader.load(SamzaDoFnInvokerRegistrar.class).iterator();
        if (it.hasNext()) {
            this.doFnInvoker = ((SamzaDoFnInvokerRegistrar) Iterators.getOnlyElement(it)).invokerSetupFor(this.doFn, this.samzaPipelineOptions, context);
        } else {
            this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(this.doFn, this.samzaPipelineOptions);
        }
    }

    FutureCollector<OutT> createFutureCollector() {
        return new FutureCollectorImpl();
    }

    private String getTimerStateId(DoFnSignature doFnSignature) {
        StringBuilder sb = new StringBuilder("timer");
        if (doFnSignature.usesTimers()) {
            Set keySet = doFnSignature.timerDeclarations().keySet();
            Objects.requireNonNull(sb);
            keySet.forEach(sb::append);
        }
        return sb.toString();
    }

    @Override // org.apache.beam.runners.samza.runtime.Op
    public void processElement(WindowedValue<InT> windowedValue, OpEmitter<OutT> opEmitter) {
        try {
            this.bundleManager.tryStartBundle();
            for (WindowedValue<InT> windowedValue2 : this.pushbackFnRunner.processElementInReadyWindows(windowedValue)) {
                if (windowedValue2.getTimestamp().compareTo(this.pushbackWatermarkHold) < 0) {
                    this.pushbackWatermarkHold = windowedValue2.getTimestamp();
                }
                this.pushbackValues.add(windowedValue2);
            }
            this.bundleManager.tryFinishBundle(opEmitter);
        } catch (Throwable th) {
            LOG.error("Encountered error during process element", th);
            this.bundleManager.signalFailure(th);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doProcessWatermark(Instant instant, OpEmitter<OutT> opEmitter) {
        this.inputWatermark = instant;
        if (this.sideInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            emitAllPushbackValues();
        }
        Instant instant2 = this.pushbackWatermarkHold.isBefore(this.inputWatermark) ? this.pushbackWatermarkHold : this.inputWatermark;
        this.timerInternalsFactory.setInputWatermark(instant2);
        Collection<KeyedTimerData<?>> removeReadyTimers = this.timerInternalsFactory.removeReadyTimers();
        if (!removeReadyTimers.isEmpty()) {
            this.pushbackFnRunner.startBundle();
            Iterator<KeyedTimerData<?>> it = removeReadyTimers.iterator();
            while (it.hasNext()) {
                fireTimer(it.next());
            }
            this.pushbackFnRunner.finishBundle();
        }
        if (this.timerInternalsFactory.getOutputWatermark() == null || this.timerInternalsFactory.getOutputWatermark().isBefore(instant2)) {
            this.timerInternalsFactory.setOutputWatermark(instant2);
            opEmitter.emitWatermark(this.timerInternalsFactory.getOutputWatermark());
        }
    }

    @Override // org.apache.beam.runners.samza.runtime.Op
    public void processWatermark(Instant instant, OpEmitter<OutT> opEmitter) {
        this.bundleManager.processWatermark(instant, opEmitter);
    }

    @Override // org.apache.beam.runners.samza.runtime.Op
    public void processSideInput(String str, WindowedValue<? extends Iterable<?>> windowedValue, OpEmitter<OutT> opEmitter) {
        Preconditions.checkState(this.bundleDisabled, "Side input not supported in bundling mode. Please disable bundling.");
        PCollectionView<?> pCollectionView = this.idToViewMap.get(str);
        if (pCollectionView == null) {
            throw new IllegalArgumentException("No mapping of id " + str + " to view.");
        }
        this.sideInputHandler.addSideInputValue(pCollectionView, windowedValue);
        ArrayList arrayList = new ArrayList(this.pushbackValues);
        this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
        this.pushbackValues.clear();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            processElement((WindowedValue) it.next(), opEmitter);
        }
        processWatermark(this.inputWatermark, opEmitter);
    }

    @Override // org.apache.beam.runners.samza.runtime.Op
    public void processSideInputWatermark(Instant instant, OpEmitter<OutT> opEmitter) {
        Preconditions.checkState(this.bundleDisabled, "Side input not supported in bundling mode. Please disable bundling.");
        this.sideInputWatermark = instant;
        if (this.sideInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            processWatermark(this.inputWatermark, opEmitter);
        }
    }

    @Override // org.apache.beam.runners.samza.runtime.Op
    public void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> opEmitter) {
        if (this.bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
            this.bundleManager.processTimer(keyedTimerData, opEmitter);
            return;
        }
        this.pushbackFnRunner.startBundle();
        fireTimer(keyedTimerData);
        this.pushbackFnRunner.finishBundle();
        this.timerInternalsFactory.removeProcessingTimer(keyedTimerData);
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.beam.runners.samza.runtime.Op
    public void close() {
        this.doFnInvoker.invokeTeardown();
        try {
            StageBundleFactory stageBundleFactory = this.stageBundleFactory;
            Throwable th = null;
            try {
                ExecutableStageContext executableStageContext = this.stageContext;
                Throwable th2 = null;
                if (executableStageContext != null) {
                    if (0 != 0) {
                        try {
                            executableStageContext.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        executableStageContext.close();
                    }
                }
                if (stageBundleFactory != null) {
                    if (0 != 0) {
                        try {
                            stageBundleFactory.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        stageBundleFactory.close();
                    }
                }
            } catch (Throwable th5) {
                if (stageBundleFactory != null) {
                    if (0 != 0) {
                        try {
                            stageBundleFactory.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        stageBundleFactory.close();
                    }
                }
                throw th5;
            }
        } catch (Exception e) {
            LOG.error("Failed to close stage bundle factory", e);
        }
    }

    private void fireTimer(KeyedTimerData<?> keyedTimerData) {
        TimerInternals.TimerData timerData = keyedTimerData.getTimerData();
        LOG.debug("Firing timer {}", timerData);
        this.fnRunner.onTimer(timerData.getTimerId(), timerData.getTimerFamilyId(), keyedTimerData.getKey(), timerData.getNamespace().getWindow(), timerData.getTimestamp(), timerData.getOutputTimestamp(), timerData.getDomain());
    }

    private void emitAllPushbackValues() {
        if (this.pushbackValues.isEmpty()) {
            return;
        }
        this.pushbackFnRunner.startBundle();
        ArrayList arrayList = new ArrayList(this.pushbackValues);
        this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
        this.pushbackValues.clear();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.fnRunner.processElement((WindowedValue) it.next());
        }
        this.pushbackFnRunner.finishBundle();
    }

    private BundleManager.BundleProgressListener<OutT> createBundleProgressListener() {
        return new BundleManager.BundleProgressListener<OutT>() { // from class: org.apache.beam.runners.samza.runtime.DoFnOp.1
            @Override // org.apache.beam.runners.samza.runtime.BundleManager.BundleProgressListener
            public void onBundleStarted() {
                DoFnOp.this.pushbackFnRunner.startBundle();
            }

            @Override // org.apache.beam.runners.samza.runtime.BundleManager.BundleProgressListener
            public void onBundleFinished(OpEmitter<OutT> opEmitter) {
                DoFnOp.this.pushbackFnRunner.finishBundle();
            }

            @Override // org.apache.beam.runners.samza.runtime.BundleManager.BundleProgressListener
            public void onWatermark(Instant instant, OpEmitter<OutT> opEmitter) {
                DoFnOp.this.doProcessWatermark(instant, opEmitter);
            }
        };
    }

    static <T, OutT> CompletionStage<WindowedValue<OutT>> createOutputFuture(WindowedValue<T> windowedValue, CompletionStage<T> completionStage, Function<T, OutT> function) {
        return (CompletionStage<WindowedValue<OutT>>) completionStage.thenApply(obj -> {
            return WindowedValue.of(function.apply(obj), windowedValue.getTimestamp(), windowedValue.getWindows(), windowedValue.getPane());
        });
    }
}
