package org.apache.flink.state.api;

import java.util.List;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.WritableSavepoint;
import org.apache.flink.state.api.output.FileCopyFunction;
import org.apache.flink.state.api.output.MergeOperatorStates;
import org.apache.flink.state.api.output.SavepointOutputFormat;
import org.apache.flink.state.api.output.StatePathExtractor;
import org.apache.flink.state.api.runtime.BootstrapTransformationWithID;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
import org.apache.flink.util.Preconditions;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/state/api/WritableSavepoint.class */
public abstract class WritableSavepoint<F extends WritableSavepoint> {
    protected final SavepointMetadata metadata;
    protected final StateBackend stateBackend;
    private final Configuration configuration;

    /* JADX INFO: Access modifiers changed from: package-private */
    public WritableSavepoint(SavepointMetadata savepointMetadata, StateBackend stateBackend) {
        Preconditions.checkNotNull(savepointMetadata, "The savepoint metadata must not be null");
        Preconditions.checkNotNull(stateBackend, "The state backend must not be null");
        this.metadata = savepointMetadata;
        this.stateBackend = stateBackend;
        this.configuration = new Configuration();
    }

    public F removeOperator(String str) {
        this.metadata.removeOperator(str);
        return this;
    }

    public <T> F withOperator(String str, BootstrapTransformation<T> bootstrapTransformation) {
        this.metadata.addOperator(str, bootstrapTransformation);
        return this;
    }

    public <T> F withConfiguration(ConfigOption<T> configOption, T t) {
        this.configuration.set(configOption, t);
        return this;
    }

    public final void write(String str) {
        DataSet union;
        Path path = new Path(str);
        DataSet writeOperatorStates = writeOperatorStates(this.metadata.getNewOperators(), this.configuration, path);
        List<OperatorState> existingOperators = this.metadata.getExistingOperators();
        if (existingOperators.isEmpty()) {
            union = writeOperatorStates;
        } else {
            Operator name = writeOperatorStates.getExecutionEnvironment().fromCollection(existingOperators).name("existingOperatorStates");
            name.flatMap(new StatePathExtractor()).setParallelism(1).output(new FileCopyFunction(str));
            union = writeOperatorStates.union(name);
        }
        union.reduceGroup(new MergeOperatorStates(this.metadata.getMasterStates())).name("reduce(OperatorState)").output(new SavepointOutputFormat(path)).name(str);
    }

    private DataSet<OperatorState> writeOperatorStates(List<BootstrapTransformationWithID<?>> list, Configuration configuration, Path path) {
        return (DataSet) list.stream().map(bootstrapTransformationWithID -> {
            return bootstrapTransformationWithID.getBootstrapTransformation().writeOperatorState(bootstrapTransformationWithID.getOperatorID(), this.stateBackend, configuration, this.metadata.getMaxParallelism(), path);
        }).reduce((v0, v1) -> {
            return v0.union(v1);
        }).orElseThrow(() -> {
            return new IllegalStateException("Savepoint must contain at least one operator");
        });
    }
}
