package org.apache.beam.runners.flink.translation.wrappers.streaming;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.class */
public class SplittableDoFnOperator<InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>> extends DoFnOperator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> {
    private transient ScheduledExecutorService executorService;

    public SplittableDoFnOperator(DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> doFn, String str, Coder<WindowedValue<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>> coder, Coder<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> coder2, Map<TupleTag<?>, Coder<?>> map, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, DoFnOperator.OutputManagerFactory<OutputT> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> map2, Collection<PCollectionView<?>> collection, PipelineOptions pipelineOptions, Coder<?> coder3, KeySelector<WindowedValue<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, ?> keySelector) {
        super(doFn, str, coder, coder2, map, tupleTag, list, outputManagerFactory, windowingStrategy, map2, collection, pipelineOptions, coder3, keySelector, DoFnSchemaInformation.create());
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    protected DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> createWrappingDoFnRunner(DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> doFnRunner) {
        return doFnRunner;
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        Preconditions.checkState(this.doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn);
        StateInternalsFactory stateInternalsFactory = bArr -> {
            return this.keyedStateInternals;
        };
        TimerInternalsFactory timerInternalsFactory = bArr2 -> {
            return this.timerInternals;
        };
        this.executorService = Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory());
        this.doFn.setStateInternalsFactory(stateInternalsFactory);
        this.doFn.setTimerInternalsFactory(timerInternalsFactory);
        this.doFn.setProcessElementInvoker(new OutputAndTimeBoundedSplittableProcessElementInvoker(this.doFn, this.serializedOptions.get(), new OutputWindowedValue<OutputT>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.1
            public void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                SplittableDoFnOperator.this.outputManager.output(SplittableDoFnOperator.this.mainOutputTag, WindowedValue.of(outputt, instant, collection, paneInfo));
            }

            public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                SplittableDoFnOperator.this.outputManager.output(tupleTag, WindowedValue.of(additionaloutputt, instant, collection, paneInfo));
            }
        }, this.sideInputReader, this.executorService, 10000, Duration.standardSeconds(10L)));
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void fireTimer(InternalTimer<?, TimerInternals.TimerData> internalTimer) {
        this.timerInternals.cleanupPendingTimer((TimerInternals.TimerData) internalTimer.getNamespace());
        if (((TimerInternals.TimerData) internalTimer.getNamespace()).getDomain().equals(TimeDomain.EVENT_TIME)) {
            return;
        }
        this.doFnRunner.processElement(WindowedValue.valueInGlobalWindow(KeyedWorkItems.timersWorkItem((byte[]) this.keyedStateInternals.getKey(), Collections.singletonList((TimerInternals.TimerData) internalTimer.getNamespace()))));
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void close() throws Exception {
        super.close();
        this.executorService.shutdown();
        try {
            if (!this.executorService.awaitTermination(Duration.standardSeconds(10L).getMillis(), TimeUnit.MILLISECONDS)) {
                LOG.debug("The scheduled executor service did not properly terminate. Shutting it down now.");
                this.executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            LOG.debug("Could not properly await the termination of the scheduled executor service.", e);
            this.executorService.shutdownNow();
        }
    }
}
