/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.statefun.flink.state.processor.operator;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.state.api.output.SnapshotUtils;
import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.flink.core.state.FlinkState;
import org.apache.flink.statefun.flink.core.state.State;
import org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes;
import org.apache.flink.statefun.flink.core.types.StaticallyRegisteredTypes;
import org.apache.flink.statefun.flink.state.processor.operator.StateBootstrapFunctionRegistry;
import org.apache.flink.statefun.flink.state.processor.operator.StateBootstrapper;
import org.apache.flink.statefun.flink.state.processor.union.TaggedBootstrapData;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public final class FunctionsStateBootstrapOperator
extends AbstractStreamOperator<TaggedOperatorSubtaskState>
implements OneInputStreamOperator<TaggedBootstrapData, TaggedOperatorSubtaskState>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private final StateBootstrapFunctionRegistry stateBootstrapFunctionRegistry;
    private final long snapshotTimestamp;
    private final Path snapshotPath;
    private transient StateBootstrapper stateBootstrapper;

    public FunctionsStateBootstrapOperator(StateBootstrapFunctionRegistry stateBootstrapFunctionRegistry, long snapshotTimestamp, Path snapshotPath) {
        this.stateBootstrapFunctionRegistry = stateBootstrapFunctionRegistry;
        this.snapshotTimestamp = snapshotTimestamp;
        this.snapshotPath = snapshotPath;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        State stateAccessor = FunctionsStateBootstrapOperator.createStateAccessor((RuntimeContext)this.getRuntimeContext(), (KeyedStateBackend<Object>)this.getKeyedStateBackend());
        this.stateBootstrapper = new StateBootstrapper(this.stateBootstrapFunctionRegistry, stateAccessor);
    }

    public void processElement(StreamRecord<TaggedBootstrapData> streamRecord) throws Exception {
        this.stateBootstrapper.apply((TaggedBootstrapData)streamRecord.getValue());
    }

    public void endInput() throws Exception {
        TaggedOperatorSubtaskState state = SnapshotUtils.snapshot((StreamOperator)this, (int)this.getRuntimeContext().getIndexOfThisSubtask(), (long)this.snapshotTimestamp, (boolean)true, (boolean)false, (CheckpointStorageWorkerView)this.getContainingTask().getCheckpointStorage(), (Path)this.snapshotPath);
        this.output.collect((Object)new StreamRecord((Object)state));
    }

    private static State createStateAccessor(RuntimeContext runtimeContext, KeyedStateBackend<Object> keyedStateBackend) {
        return new FlinkState(runtimeContext, keyedStateBackend, new DynamicallyRegisteredTypes(new StaticallyRegisteredTypes(MessageFactoryType.WITH_RAW_PAYLOADS)));
    }
}

