/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api;

import java.util.List;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.output.MergeOperatorStates;
import org.apache.flink.state.api.output.SavepointOutputFormat;
import org.apache.flink.state.api.runtime.BootstrapTransformationWithID;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
import org.apache.flink.util.Preconditions;

@PublicEvolving
public abstract class WritableSavepoint<F extends WritableSavepoint> {
    protected final SavepointMetadata metadata;
    protected final StateBackend stateBackend;

    WritableSavepoint(SavepointMetadata metadata, StateBackend stateBackend) {
        Preconditions.checkNotNull((Object)metadata, (String)"The savepoint metadata must not be null");
        Preconditions.checkNotNull((Object)stateBackend, (String)"The state backend must not be null");
        this.metadata = metadata;
        this.stateBackend = stateBackend;
    }

    public F removeOperator(String uid) {
        this.metadata.removeOperator(uid);
        return (F)this;
    }

    public <T> F withOperator(String uid, BootstrapTransformation<T> transformation) {
        this.metadata.addOperator(uid, transformation);
        return (F)this;
    }

    public final void write(String path) {
        Path savepointPath = new Path(path);
        List<BootstrapTransformationWithID<?>> newOperatorTransformations = this.metadata.getNewOperators();
        DataSet<OperatorState> newOperatorStates = this.writeOperatorStates(newOperatorTransformations, savepointPath);
        List<OperatorState> existingOperators = this.metadata.getExistingOperators();
        DataSet<OperatorState> finalOperatorStates = this.unionOperatorStates(newOperatorStates, existingOperators);
        ((GroupReduceOperator)finalOperatorStates.reduceGroup((GroupReduceFunction)new MergeOperatorStates(this.metadata.getMasterStates())).name("reduce(OperatorState)")).output((OutputFormat)new SavepointOutputFormat(savepointPath)).name(path);
    }

    private DataSet<OperatorState> unionOperatorStates(DataSet<OperatorState> newOperatorStates, List<OperatorState> existingOperators) {
        UnionOperator finalOperatorStates;
        if (existingOperators.isEmpty()) {
            finalOperatorStates = newOperatorStates;
        } else {
            DataSource wrappedCollection = newOperatorStates.getExecutionEnvironment().fromCollection(existingOperators);
            finalOperatorStates = newOperatorStates.union((DataSet)wrappedCollection);
        }
        return finalOperatorStates;
    }

    private DataSet<OperatorState> writeOperatorStates(List<BootstrapTransformationWithID<?>> newOperatorStates, Path savepointWritePath) {
        return newOperatorStates.stream().map(newOperatorState -> newOperatorState.getBootstrapTransformation().writeOperatorState(newOperatorState.getOperatorID(), this.stateBackend, this.metadata.getMaxParallelism(), savepointWritePath)).reduce(DataSet::union).orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator"));
    }
}

