package org.apache.beam.runners.samza.runtime;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Executors;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.class */
public class SplittableParDoProcessKeyedElementsOp<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> implements Op<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, RawUnionValue, byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(SplittableParDoProcessKeyedElementsOp.class);
    private static final String TIMER_STATE_ID = "timer";
    private final TupleTag<OutputT> mainOutputTag;
    private final WindowingStrategy<?, BoundedWindow> windowingStrategy;
    private final OutputManagerFactory<RawUnionValue> outputManagerFactory;
    private final SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> processElements;
    private final String transformId;
    private final PCollection.IsBounded isBounded;
    private transient StateInternalsFactory<byte[]> stateInternalsFactory;
    private transient SamzaTimerInternalsFactory<byte[]> timerInternalsFactory;
    private transient DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> fnRunner;
    private transient SamzaPipelineOptions pipelineOptions;

    public SplittableParDoProcessKeyedElementsOp(TupleTag<OutputT> tupleTag, SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT, WatermarkEstimatorStateT> processKeyedElements, WindowingStrategy<?, BoundedWindow> windowingStrategy, OutputManagerFactory<RawUnionValue> outputManagerFactory, String str, String str2, PCollection.IsBounded isBounded) {
        this.mainOutputTag = tupleTag;
        this.windowingStrategy = windowingStrategy;
        this.outputManagerFactory = outputManagerFactory;
        this.transformId = str2;
        this.isBounded = isBounded;
        this.processElements = new SplittableParDoViaKeyedWorkItems.ProcessElements<>(processKeyedElements);
    }

    @Override // org.apache.beam.runners.samza.runtime.Op
    public void open(Config config, Context context, Scheduler<KeyedTimerData<byte[]>> scheduler, OpEmitter<RawUnionValue> opEmitter) {
        this.pipelineOptions = (SamzaPipelineOptions) ((SerializablePipelineOptions) Base64Serializer.deserializeUnchecked((String) config.get("beamPipelineOptions"), SerializablePipelineOptions.class)).get().as(SamzaPipelineOptions.class);
        SamzaStoreStateInternals.Factory createNonKeyedStateInternalsFactory = SamzaStoreStateInternals.createNonKeyedStateInternalsFactory(this.transformId, context.getTaskContext(), this.pipelineOptions);
        final DoFnRunners.OutputManager create = this.outputManagerFactory.create(opEmitter);
        this.stateInternalsFactory = new SamzaStoreStateInternals.Factory(this.transformId, Collections.singletonMap("beamStore", SamzaStoreStateInternals.getBeamStore(context.getTaskContext())), ByteArrayCoder.of(), this.pipelineOptions.getStoreBatchGetSize());
        this.timerInternalsFactory = SamzaTimerInternalsFactory.createTimerInternalFactory(ByteArrayCoder.of(), scheduler, TIMER_STATE_ID, createNonKeyedStateInternalsFactory, this.windowingStrategy, this.isBounded, this.pipelineOptions);
        final KeyedInternals keyedInternals = new KeyedInternals(this.stateInternalsFactory, this.timerInternalsFactory);
        SplittableParDoViaKeyedWorkItems.ProcessFn newProcessFn = this.processElements.newProcessFn(this.processElements.getFn());
        DoFnInvokers.tryInvokeSetupFor(newProcessFn, this.pipelineOptions);
        newProcessFn.setStateInternalsFactory(this.stateInternalsFactory);
        newProcessFn.setTimerInternalsFactory(this.timerInternalsFactory);
        newProcessFn.setSideInputReader(NullSideInputReader.empty());
        newProcessFn.setProcessElementInvoker(new OutputAndTimeBoundedSplittableProcessElementInvoker(this.processElements.getFn(), this.pipelineOptions, new OutputWindowedValue<OutputT>() { // from class: org.apache.beam.runners.samza.runtime.SplittableParDoProcessKeyedElementsOp.1
            public void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                outputWindowedValue(SplittableParDoProcessKeyedElementsOp.this.mainOutputTag, outputt, instant, collection, paneInfo);
            }

            public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                create.output(tupleTag, WindowedValue.of(additionaloutputt, instant, collection, paneInfo));
            }
        }, NullSideInputReader.empty(), Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()), 10000, Duration.standardSeconds(10L), () -> {
            throw new UnsupportedOperationException("BundleFinalizer unsupported in Samza");
        }));
        this.fnRunner = DoFnRunners.simpleRunner(this.pipelineOptions, newProcessFn, NullSideInputReader.of(Collections.emptyList()), create, this.mainOutputTag, Collections.emptyList(), new StepContext() { // from class: org.apache.beam.runners.samza.runtime.SplittableParDoProcessKeyedElementsOp.2
            public StateInternals stateInternals() {
                return keyedInternals.stateInternals();
            }

            public TimerInternals timerInternals() {
                return keyedInternals.timerInternals();
            }
        }, (Coder) null, Collections.emptyMap(), this.windowingStrategy, DoFnSchemaInformation.create(), Collections.emptyMap());
    }

    @Override // org.apache.beam.runners.samza.runtime.Op
    public void processElement(WindowedValue<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> windowedValue, OpEmitter<RawUnionValue> opEmitter) {
        this.fnRunner.startBundle();
        this.fnRunner.processElement(windowedValue);
        this.fnRunner.finishBundle();
    }

    @Override // org.apache.beam.runners.samza.runtime.Op
    public void processWatermark(Instant instant, OpEmitter<RawUnionValue> opEmitter) {
        this.timerInternalsFactory.setInputWatermark(instant);
        Collection<KeyedTimerData<byte[]>> removeReadyTimers = this.timerInternalsFactory.removeReadyTimers();
        if (!removeReadyTimers.isEmpty()) {
            this.fnRunner.startBundle();
            for (KeyedTimerData<byte[]> keyedTimerData : removeReadyTimers) {
                fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
            }
            this.fnRunner.finishBundle();
        }
        if (this.timerInternalsFactory.getOutputWatermark() == null || this.timerInternalsFactory.getOutputWatermark().isBefore(instant)) {
            this.timerInternalsFactory.setOutputWatermark(instant);
            opEmitter.emitWatermark(this.timerInternalsFactory.getOutputWatermark());
        }
    }

    @Override // org.apache.beam.runners.samza.runtime.Op
    public void processTimer(KeyedTimerData<byte[]> keyedTimerData, OpEmitter<RawUnionValue> opEmitter) {
        this.fnRunner.startBundle();
        fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
        this.fnRunner.finishBundle();
        this.timerInternalsFactory.removeProcessingTimer(keyedTimerData);
    }

    private void fireTimer(byte[] bArr, TimerInternals.TimerData timerData) {
        LOG.debug("Firing timer {} for key {}", timerData, bArr);
        this.fnRunner.processElement(WindowedValue.valueInGlobalWindow(KeyedWorkItems.timersWorkItem(bArr, Collections.singletonList(timerData))));
    }
}
