package org.apache.beam.runners.samza.runtime;

import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.util.StateUtils;
import org.apache.beam.runners.samza.util.WindowUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.samza.context.Context;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.class */
public class SamzaDoFnRunners {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/samza/runtime/SamzaDoFnRunners$SdkHarnessDoFnRunner.class */
    public static class SdkHarnessDoFnRunner<InT, FnOutT> implements DoFnRunner<InT, FnOutT> {
        private static final int DEFAULT_METRIC_SAMPLE_RATE = 100;
        private final SamzaTimerInternalsFactory timerInternalsFactory;
        private final WindowingStrategy windowingStrategy;
        private final DoFnRunners.OutputManager outputManager;
        private final StageBundleFactory stageBundleFactory;
        private final Map<String, TupleTag<?>> idToTupleTagMap;
        private final LinkedBlockingQueue<KV<String, FnOutT>> outputQueue;
        private final BagState<WindowedValue<InT>> bundledEventsBag;
        private RemoteBundle remoteBundle;
        private FnDataReceiver<WindowedValue<?>> inputReceiver;
        private final StateRequestHandler stateRequestHandler;
        private final SamzaExecutionContext samzaExecutionContext;
        private long startBundleTime;
        private final String metricName;

        private SdkHarnessDoFnRunner(String str, SamzaTimerInternalsFactory<?> samzaTimerInternalsFactory, WindowingStrategy windowingStrategy, DoFnRunners.OutputManager outputManager, StageBundleFactory stageBundleFactory, Map<String, TupleTag<?>> map, BagState<WindowedValue<InT>> bagState, StateRequestHandler stateRequestHandler, SamzaExecutionContext samzaExecutionContext) {
            this.outputQueue = new LinkedBlockingQueue<>();
            this.timerInternalsFactory = samzaTimerInternalsFactory;
            this.windowingStrategy = windowingStrategy;
            this.outputManager = outputManager;
            this.stageBundleFactory = stageBundleFactory;
            this.idToTupleTagMap = map;
            this.bundledEventsBag = bagState;
            this.stateRequestHandler = stateRequestHandler;
            this.samzaExecutionContext = samzaExecutionContext;
            this.metricName = "ExecutableStage-" + str + "-process-ns";
        }

        private void timerDataConsumer(Timer<?> timer, TimerInternals.TimerData timerData) {
            TimerInternals timerInternalsForKey = this.timerInternalsFactory.timerInternalsForKey(timer.getUserKey());
            if (timer.getClearBit()) {
                timerInternalsForKey.deleteTimer(timerData);
            } else {
                timerInternalsForKey.setTimer(timerData);
            }
        }

        public void startBundle() {
            try {
                this.remoteBundle = this.stageBundleFactory.getBundle(new OutputReceiverFactory() { // from class: org.apache.beam.runners.samza.runtime.SamzaDoFnRunners.SdkHarnessDoFnRunner.1
                    public FnDataReceiver<FnOutT> create(String str) {
                        return obj -> {
                            SdkHarnessDoFnRunner.this.outputQueue.put(KV.of(str, obj));
                        };
                    }
                }, new TimerReceiverFactory(this.stageBundleFactory, this::timerDataConsumer, this.windowingStrategy.getWindowFn().windowCoder()), this.stateRequestHandler, BundleProgressHandler.ignored());
                this.startBundleTime = getStartBundleTime();
                this.inputReceiver = (FnDataReceiver) Iterables.getOnlyElement(this.remoteBundle.getInputReceivers().values());
                this.bundledEventsBag.read().forEach(windowedValue -> {
                    try {
                        this.inputReceiver.accept(windowedValue);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private long getStartBundleTime() {
            if (ThreadLocalRandom.current().nextInt() % DEFAULT_METRIC_SAMPLE_RATE == 0) {
                return System.nanoTime();
            }
            return 0L;
        }

        public void processElement(WindowedValue<InT> windowedValue) {
            try {
                this.bundledEventsBag.add(windowedValue);
                this.inputReceiver.accept(windowedValue);
                emitResults();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private void emitResults() {
            while (true) {
                KV<String, FnOutT> poll = this.outputQueue.poll();
                if (poll == null) {
                    return;
                } else {
                    this.outputManager.output(this.idToTupleTagMap.get(poll.getKey()), (WindowedValue) poll.getValue());
                }
            }
        }

        private void emitMetrics() {
            if (this.startBundleTime <= 0) {
                return;
            }
            long size = Iterables.size(this.bundledEventsBag.read());
            if (size <= 0) {
                return;
            }
            this.samzaExecutionContext.getMetricsContainer().updateExecutableStageBundleMetric(this.metricName, (System.nanoTime() - this.startBundleTime) / size);
        }

        public <KeyT> void onTimer(String str, String str2, KeyT keyt, BoundedWindow boundedWindow, Instant instant, Instant instant2, TimeDomain timeDomain) {
            FnDataReceiver fnDataReceiver = (FnDataReceiver) this.remoteBundle.getTimerReceivers().get(TimerReceiverFactory.decodeTimerDataTimerId(str2));
            try {
                fnDataReceiver.accept(Timer.of(keyt, str, Collections.singletonList(boundedWindow), instant, instant2, PaneInfo.NO_FIRING));
            } catch (Exception e) {
                throw new RuntimeException(String.format(Locale.ENGLISH, "Failed to process timer %s", fnDataReceiver), e);
            }
        }

        public void finishBundle() {
            try {
                try {
                    this.remoteBundle.close();
                    emitResults();
                    emitMetrics();
                    this.bundledEventsBag.clear();
                    this.remoteBundle = null;
                    this.inputReceiver = null;
                } catch (Exception e) {
                    throw new RuntimeException("Failed to finish remote bundle", e);
                }
            } catch (Throwable th) {
                this.remoteBundle = null;
                this.inputReceiver = null;
                throw th;
            }
        }

        public <KeyT> void onWindowExpiration(BoundedWindow boundedWindow, Instant instant, KeyT keyt) {
        }

        public DoFn<InT, FnOutT> getFn() {
            throw new UnsupportedOperationException();
        }
    }

    public static <InT, FnOutT> DoFnRunner<InT, FnOutT> create(SamzaPipelineOptions samzaPipelineOptions, DoFn<InT, FnOutT> doFn, WindowingStrategy<?, ?> windowingStrategy, String str, String str2, Context context, TupleTag<FnOutT> tupleTag, SideInputHandler sideInputHandler, SamzaTimerInternalsFactory<?> samzaTimerInternalsFactory, Coder<?> coder, DoFnRunners.OutputManager outputManager, Coder<InT> coder2, List<TupleTag<?>> list, Map<TupleTag<?>, Coder<?>> map, DoFnSchemaInformation doFnSchemaInformation, Map<String, PCollectionView<?>> map2) {
        KeyedInternals keyedInternals;
        StateInternals stateInternalsForKey;
        TimerInternals timerInternalsForKey;
        SamzaStoreStateInternals.Factory createStateInternalsFactory = SamzaStoreStateInternals.createStateInternalsFactory(str2, coder, context.getTaskContext(), samzaPipelineOptions, DoFnSignatures.getSignature(doFn.getClass()));
        SamzaExecutionContext samzaExecutionContext = (SamzaExecutionContext) context.getApplicationContainerContext();
        if (StateUtils.isStateful((DoFn<?, ?>) doFn)) {
            keyedInternals = new KeyedInternals(createStateInternalsFactory, samzaTimerInternalsFactory);
            stateInternalsForKey = keyedInternals.stateInternals();
            timerInternalsForKey = keyedInternals.timerInternals();
        } else {
            keyedInternals = null;
            stateInternalsForKey = createStateInternalsFactory.stateInternalsForKey(null);
            timerInternalsForKey = samzaTimerInternalsFactory.timerInternalsForKey(null);
        }
        StepContext createStepContext = createStepContext(stateInternalsForKey, timerInternalsForKey);
        DoFnRunner<InT, FnOutT> simpleRunner = DoFnRunners.simpleRunner(samzaPipelineOptions, doFn, sideInputHandler, outputManager, tupleTag, list, createStepContext, coder2, map, windowingStrategy, doFnSchemaInformation, map2);
        DoFnRunner<InT, FnOutT> wrap = samzaPipelineOptions.getEnableMetrics().booleanValue() ? DoFnRunnerWithMetrics.wrap(simpleRunner, samzaExecutionContext.getMetricsContainer(), str) : simpleRunner;
        return keyedInternals != null ? new DoFnRunnerWithKeyedInternals(DoFnRunners.defaultStatefulDoFnRunner(doFn, coder2, wrap, createStepContext, windowingStrategy, new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternalsForKey, windowingStrategy), createStateCleaner(doFn, windowingStrategy, keyedInternals.stateInternals())), keyedInternals) : wrap;
    }

    private static StepContext createStepContext(final StateInternals stateInternals, final TimerInternals timerInternals) {
        return new StepContext() { // from class: org.apache.beam.runners.samza.runtime.SamzaDoFnRunners.1
            public StateInternals stateInternals() {
                return stateInternals;
            }

            public TimerInternals timerInternals() {
                return timerInternals;
            }
        };
    }

    private static <InT, FnOutT> StatefulDoFnRunner.StateCleaner<?> createStateCleaner(DoFn<InT, FnOutT> doFn, WindowingStrategy<?, ?> windowingStrategy, StateInternals stateInternals) {
        if (windowingStrategy.getWindowFn().getWindowTypeDescriptor().isSubtypeOf(TypeDescriptor.of(BoundedWindow.class))) {
            return new StatefulDoFnRunner.StateInternalsStateCleaner(doFn, stateInternals, windowingStrategy.getWindowFn().windowCoder());
        }
        return null;
    }

    public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(String str, String str2, String str3, Coder<WindowedValue<InT>> coder, ExecutableStage executableStage, Map<?, PCollectionView<?>> map, SideInputHandler sideInputHandler, SamzaStoreStateInternals.Factory<?> factory, SamzaTimerInternalsFactory<?> samzaTimerInternalsFactory, SamzaPipelineOptions samzaPipelineOptions, DoFnRunners.OutputManager outputManager, StageBundleFactory stageBundleFactory, SamzaExecutionContext samzaExecutionContext, TupleTag<FnOutT> tupleTag, Map<String, TupleTag<?>> map2, Context context, String str4) {
        BagState state = factory.stateInternalsForKey(null).state(StateNamespaces.global(), StateTags.bag(str3, coder));
        StateRequestHandler of = SamzaStateRequestHandlers.of(str, context.getTaskContext(), samzaPipelineOptions, executableStage, stageBundleFactory, map, sideInputHandler);
        SamzaExecutionContext samzaExecutionContext2 = (SamzaExecutionContext) context.getApplicationContainerContext();
        SdkHarnessDoFnRunner sdkHarnessDoFnRunner = new SdkHarnessDoFnRunner(str2, samzaTimerInternalsFactory, WindowUtils.getWindowStrategy(executableStage.getInputPCollection().getId(), executableStage.getComponents()), outputManager, stageBundleFactory, map2, state, of, samzaExecutionContext);
        return samzaPipelineOptions.getEnableMetrics().booleanValue() ? DoFnRunnerWithMetrics.wrap(sdkHarnessDoFnRunner, samzaExecutionContext2.getMetricsContainer(), str4) : sdkHarnessDoFnRunner;
    }
}
