package org.apache.flink.statefun.flink.state.processor.operator;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.state.api.output.SnapshotUtils;
import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
import org.apache.flink.statefun.flink.core.message.MessageFactoryKey;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.flink.core.state.FlinkState;
import org.apache.flink.statefun.flink.core.state.State;
import org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes;
import org.apache.flink.statefun.flink.core.types.StaticallyRegisteredTypes;
import org.apache.flink.statefun.flink.state.processor.union.TaggedBootstrapData;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:org/apache/flink/statefun/flink/state/processor/operator/FunctionsStateBootstrapOperator.class */
public final class FunctionsStateBootstrapOperator extends AbstractStreamOperator<TaggedOperatorSubtaskState> implements OneInputStreamOperator<TaggedBootstrapData, TaggedOperatorSubtaskState>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private final StateBootstrapFunctionRegistry stateBootstrapFunctionRegistry;
    private final long snapshotTimestamp;
    private final Path snapshotPath;
    private transient StateBootstrapper stateBootstrapper;

    public FunctionsStateBootstrapOperator(StateBootstrapFunctionRegistry stateBootstrapFunctionRegistry, long j, Path path) {
        this.stateBootstrapFunctionRegistry = stateBootstrapFunctionRegistry;
        this.snapshotTimestamp = j;
        this.snapshotPath = path;
    }

    public void open() throws Exception {
        super.open();
        if (this.stateBootstrapper == null) {
            this.stateBootstrapper = new StateBootstrapper(this.stateBootstrapFunctionRegistry, createStateAccessor(getRuntimeContext(), getKeyedStateBackend()));
        }
    }

    public void processElement(StreamRecord<TaggedBootstrapData> streamRecord) throws Exception {
        this.stateBootstrapper.apply((TaggedBootstrapData) streamRecord.getValue());
    }

    public void endInput() throws Exception {
        this.output.collect(new StreamRecord(SnapshotUtils.snapshot(this, getRuntimeContext().getIndexOfThisSubtask(), this.snapshotTimestamp, true, false, getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), this.snapshotPath)));
    }

    private static State createStateAccessor(RuntimeContext runtimeContext, KeyedStateBackend<Object> keyedStateBackend) {
        return new FlinkState(runtimeContext, keyedStateBackend, new DynamicallyRegisteredTypes(new StaticallyRegisteredTypes(MessageFactoryKey.forType(MessageFactoryType.WITH_RAW_PAYLOADS, (String) null))));
    }
}
